import schedule import time from datetime import datetime, timedelta import redis import pymysql # 连接到Redis r = redis.Redis(host='192.168.2.32', port=6379, db=0) # 配置数据库连接 db_config = { 'host': '192.168.2.32', # 替换为你的MariaDB服务器地址 'user': 'root', # 替换为你的MariaDB用户名 'password': 'lw123456', # 替换为你的MariaDB密码 'database': 'message_archive' } def write_to_db(): # 连接到数据库 connection = pymysql.connect(**db_config) # 获取当前日期的前一天 yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') # 遍历Redis中所有与昨天日期相关的key,并写入数据库 for key in r.keys(f"*:*:{yesterday}:count"): parts = key.decode('utf-8').split(':') group_id, wx_id, _date = parts[0], parts[1], parts[2] # _date应该是yesterday,但这里为了完整性还是保留 count = int(r.hget(key, 'count')) if isinstance(r.hget(key, 'count'), bytes) else 0 # 处理可能的None或空值情况 # 插入消息信息 try: with connection.cursor() as cursor: # 插入消息信息 sql = """ INSERT INTO speech_counts (group_id, wx_id, date, count) VALUES (%s, %s, %s, %s) """ cursor.execute(sql, (group_id, wx_id, yesterday, count)) # 提交事务 connection.commit() print("write_to_db successfully.") except Exception as e: print(f"write_to_db message: {e}") connection.rollback() def generate_and_send_ranking(groupId, allContacts: dict): # 连接到SQLite数据库(假设数据库文件名为database.db) connection = pymysql.connect(**db_config) cursor = connection.cursor() yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') # 编写SQL查询来获取发言数量前20的用户 query = """ SELECT wx_id, count AS speech_count FROM speech_counts WHERE date = %s AND group_id = %s GROUP BY wx_id ORDER BY count DESC LIMIT 20 """ # 执行查询并获取结果 cursor.execute(query, (yesterday,groupId,)) results = cursor.fetchall() # 格式化输出字符串 ranking_str = yesterday + "发言数量前20的用户排名:\n" for rank, (username, speech_count) in enumerate(results, start=1): ranking_str += f"{rank}. {allContacts[username]}: {speech_count} 次发言\n" # 关闭数据库连接 connection.close() print(ranking_str) # 这里我们没有实际“发送”排名,只是返回字符串 # 如果需要发送,可以在此添加发送逻辑 return ranking_str if __name__ == '__main__': write_to_db()