import logging from datetime import datetime, timedelta import pymysql from db.connection import DBConnectionManager from db.message_storage import MessageStorageDB # 获取数据库连接管理器的单例 db_manager = DBConnectionManager() message_db = MessageStorageDB(db_manager) def write_to_db(): """将消息统计写入数据库""" try: # 获取昨天的日期 yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") # 获取昨天的消息统计 message_counts = message_db.get_message_count_by_date(yesterday) if not message_counts: logging.info(f"没有找到 {yesterday} 的消息记录") return logging.info(f"成功统计 {yesterday} 的消息记录: {len(message_counts)} 条") # 这里可以添加其他处理逻辑,如发送统计报告等 except Exception as e: logging.error(f"写入数据库出错: {e}") def generate_and_send_ranking(groupId, allContacts: dict): """生成并发送群聊发言排名""" try: yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') # 使用数据库操作类获取排名数据 results = message_db.get_speech_ranking(yesterday, groupId, limit=20) if not results: logging.info(f"没有找到 {yesterday} 的群聊 {groupId} 发言记录") return f"{yesterday} 没有发言记录" # 格式化输出字符串 ranking_str = yesterday + "发言数量前20的用户排名:\n" for rank, result in enumerate(results, start=1): username = result['wx_id'] speech_count = result['count'] ranking_str += f"{rank}. {allContacts.get(username, username)}: {speech_count} 次发言\n" logging.info(f"成功生成 {yesterday} 的群聊 {groupId} 发言排名") return ranking_str except Exception as e: logging.error(f"生成发言排名出错: {e}") return f"生成发言排名出错: {e}" if __name__ == '__main__': write_to_db()