Files
abot/message_report/write_db.py

83 lines
2.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import schedule
import time
from datetime import datetime, timedelta
import redis
import pymysql
# 连接到Redis
r = redis.Redis(host='192.168.2.32', port=6379, db=0)
# 配置数据库连接
db_config = {
'host': '192.168.2.32', # 替换为你的MariaDB服务器地址
'user': 'root', # 替换为你的MariaDB用户名
'password': 'lw123456', # 替换为你的MariaDB密码
'database': 'message_archive'
}
def write_to_db():
# 连接到数据库
connection = pymysql.connect(**db_config)
# 获取当前日期的前一天
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# 遍历Redis中所有与昨天日期相关的key并写入数据库
for key in r.keys(f"*:*:{yesterday}:count"):
parts = key.decode('utf-8').split(':')
group_id, wx_id, _date = parts[0], parts[1], parts[2] # _date应该是yesterday但这里为了完整性还是保留
count = int(r.hget(key, 'count')) if isinstance(r.hget(key, 'count'), bytes) else 0 # 处理可能的None或空值情况
# 插入消息信息
try:
with connection.cursor() as cursor:
# 插入消息信息
sql = """
INSERT INTO speech_counts (group_id, wx_id, date, count) VALUES (%s, %s, %s, %s)
"""
cursor.execute(sql, (group_id, wx_id, yesterday, count))
# 提交事务
connection.commit()
print("write_to_db successfully.")
except Exception as e:
print(f"write_to_db message: {e}")
connection.rollback()
def generate_and_send_ranking(groupId, allContacts: dict):
# 连接到SQLite数据库假设数据库文件名为database.db
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# 编写SQL查询来获取发言数量前20的用户
query = """
SELECT wx_id, count AS speech_count
FROM speech_counts
WHERE date = %s
AND group_id = %s
GROUP BY wx_id
ORDER BY count DESC
LIMIT 20
"""
# 执行查询并获取结果
cursor.execute(query, (yesterday,groupId,))
results = cursor.fetchall()
# 格式化输出字符串
ranking_str = yesterday + "发言数量前20的用户排名:\n"
for rank, (username, speech_count) in enumerate(results, start=1):
ranking_str += f"{rank}. {allContacts[username]}: {speech_count} 次发言\n"
# 关闭数据库连接
connection.close()
print(ranking_str)
# 这里我们没有实际“发送”排名,只是返回字符串
# 如果需要发送,可以在此添加发送逻辑
return ranking_str
if __name__ == '__main__':
write_to_db()