diff --git a/db/message_storage.py b/db/message_storage.py index 3c6380b..ec397cf 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -51,7 +51,6 @@ class MessageStorageDB(BaseDBOperator): """ return self.execute_query(sql, (date,)) or [] - # 在 MessageStorageDB 类中添加以下方法 def get_speech_ranking(self, date: str, group_id: str, limit: int = 20) -> List[Dict]: """获取指定日期和群组的发言排名""" @@ -66,4 +65,24 @@ class MessageStorageDB(BaseDBOperator): """ params = (date, group_id, limit) results = self.execute_query(sql, params) - return results or [] \ No newline at end of file + return results or [] + + def insert_speech_count(self, group_id: str, wx_id: str, date: str, count: int) -> bool: + """插入发言统计数据 + + Args: + group_id: 群组ID + wx_id: 微信ID + date: 日期,格式为 YYYY-MM-DD + count: 发言计数 + + Returns: + 是否成功插入 + """ + sql = """ + INSERT INTO speech_counts (group_id, wx_id, date, count) + VALUES (%s, %s, %s, %s) + ON DUPLICATE KEY UPDATE count = VALUES(count) + """ + params = (group_id, wx_id, date, count) + return self.execute_update(sql, params) \ No newline at end of file diff --git a/message_report/write_db.py b/message_report/write_db.py index 0b17816..63a93dd 100644 --- a/message_report/write_db.py +++ b/message_report/write_db.py @@ -1,58 +1,74 @@ import logging from datetime import datetime, timedelta -import pymysql from db.connection import DBConnectionManager from db.message_storage import MessageStorageDB +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + # 获取数据库连接管理器的单例 db_manager = DBConnectionManager() message_db = MessageStorageDB(db_manager) + def write_to_db(): - """将消息统计写入数据库""" - try: - # 获取昨天的日期 - yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") - - # 获取昨天的消息统计 - message_counts = message_db.get_message_count_by_date(yesterday) - - if not message_counts: - logging.info(f"没有找到 {yesterday} 的消息记录") - return - - logging.info(f"成功统计 {yesterday} 的消息记录: {len(message_counts)} 条") - - # 这里可以添加其他处理逻辑,如发送统计报告等 - - except Exception as e: - logging.error(f"写入数据库出错: {e}") + """从Redis读取发言统计数据并写入数据库""" + # 获取Redis连接 + redis_conn = db_manager.get_redis_connection() + + # 获取当前日期的前一天 + yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') + + # 遍历Redis中所有与昨天日期相关的key,并写入数据库 + for key_bytes in redis_conn.keys(f"*:*:{yesterday}:count"): + key = key_bytes.decode('utf-8') + parts = key.split(':') + group_id, wx_id, _date = parts[0], parts[1], parts[2] # _date应该是yesterday + + # 获取计数值 + count_bytes = redis_conn.hget(key, 'count') + count = int(count_bytes) if isinstance(count_bytes, bytes) else 0 + + # 使用MessageStorageDB插入数据 + try: + result = message_db.insert_speech_count(group_id, wx_id, yesterday, count) + if result: + logging.info(f"成功写入发言统计: {group_id}, {wx_id}, {yesterday}, {count}") + else: + logging.error(f"写入发言统计失败: {group_id}, {wx_id}, {yesterday}, {count}") + except Exception as e: + logging.error(f"写入发言统计出错: {e}") + def generate_and_send_ranking(groupId, allContacts: dict): """生成并发送群聊发言排名""" try: yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') - + # 使用数据库操作类获取排名数据 results = message_db.get_speech_ranking(yesterday, groupId, limit=20) - + if not results: logging.info(f"没有找到 {yesterday} 的群聊 {groupId} 发言记录") return f"{yesterday} 没有发言记录" - + # 格式化输出字符串 ranking_str = yesterday + "发言数量前20的用户排名:\n" for rank, result in enumerate(results, start=1): username = result['wx_id'] - speech_count = result['count'] + speech_count = result['speech_count'] ranking_str += f"{rank}. {allContacts.get(username, username)}: {speech_count} 次发言\n" - + logging.info(f"成功生成 {yesterday} 的群聊 {groupId} 发言排名") return ranking_str - + except Exception as e: logging.error(f"生成发言排名出错: {e}") return f"生成发言排名出错: {e}" + if __name__ == '__main__': write_to_db()