import schedule import time import sqlite3 from datetime import datetime, timedelta import redis # 连接到Redis r = redis.Redis(host='192.168.2.32', port=6379, db=0) def write_to_db(): # 连接到SQLite数据库 conn = sqlite3.connect('message_stats.db') c = conn.cursor() # 创建表(如果不存在) c.execute('''CREATE TABLE IF NOT EXISTS speech_counts ( group_id TEXT, wx_id TEXT, date TEXT, count INTEGER, PRIMARY KEY (group_id, wx_id, date) )''') # 获取当前日期的前一天 yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') # 遍历Redis中所有与昨天日期相关的key,并写入数据库 for key in r.keys(f"*:*:{yesterday}:count"): parts = key.decode('utf-8').split(':') group_id, wx_id, _date = parts[0], parts[1], parts[2] # _date应该是yesterday,但这里为了完整性还是保留 count = int(r.hget(key, 'count')) if isinstance(r.hget(key, 'count'), bytes) else 0 # 处理可能的None或空值情况 # 插入或更新数据库记录 c.execute("INSERT OR REPLACE INTO speech_counts (group_id, wx_id, date, count) VALUES (?, ?, ?, ?)", (group_id, wx_id, yesterday, count)) conn.commit() conn.close() def generate_and_send_ranking(groupId, allContacts: dict): # 连接到SQLite数据库(假设数据库文件名为database.db) conn = sqlite3.connect('message_stats.db') cursor = conn.cursor() yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') # 编写SQL查询来获取发言数量前20的用户 query = """ SELECT wx_id, count AS speech_count FROM speech_counts WHERE DATE(date) = DATE('now', '-1 day') AND group_id = ? GROUP BY wx_id ORDER BY count DESC LIMIT 20 """ # 执行查询并获取结果 cursor.execute(query, (groupId,)) results = cursor.fetchall() # 格式化输出字符串 ranking_str = yesterday + "发言数量前20的用户排名:\n" for rank, (username, speech_count) in enumerate(results, start=1): ranking_str += f"{rank}. {allContacts[username]}: {speech_count} 次发言\n" # 关闭数据库连接 conn.close() print(ranking_str) # 这里我们没有实际“发送”排名,只是返回字符串 # 如果需要发送,可以在此添加发送逻辑 return ranking_str if __name__ == '__main__': write_to_db()