import pymysql from datetime import datetime, timedelta # 配置数据库连接 db_config = { 'host': '192.168.2.32', # 替换为你的MariaDB服务器地址 'user': 'root', # 替换为你的MariaDB用户名 'password': 'lw123456', # 替换为你的MariaDB密码 'database': 'message_archive' } def archive_message(group_id, timestamp_str, sender, content, message_type, attachment_url=None): # 连接到数据库 connection = pymysql.connect(**db_config) try: with connection.cursor() as cursor: # 插入消息信息 sql = """ INSERT INTO messages (group_id,timestamp, sender, content, message_type, attachment_url) VALUES (%s, %s, %s, %s, %s, %s) """ cursor.execute(sql, (group_id, timestamp_str, sender, content, message_type, attachment_url)) # 提交事务 connection.commit() print("Message archived successfully.") except Exception as e: print(f"Error archiving message: {e}") connection.rollback() finally: # 关闭连接 connection.close() def get_messages(all_contacts: dict): # 连接到数据库 connection = pymysql.connect(**db_config) try: with connection.cursor() as cursor: # 获取当前时间并计算8小时前的时间 current_time = datetime.now() eight_hours_ago = current_time - timedelta(hours=8) # 转换为数据库中存储的时间格式 (假设timestamp是DATETIME类型) eight_hours_ago_str = eight_hours_ago.strftime('%Y-%m-%d %H:%M:%S') # 执行查询,获取最近8小时的消息 query = """ SELECT group_id, timestamp, sender, content FROM messages WHERE timestamp >= %s AND message_type =1 """ cursor.execute(query, (eight_hours_ago_str,)) # 提取结果并组成带逗号的字符串 result = [] for row in cursor.fetchall(): group_id = row[0] timestamp = row[1] sender = row[2] content = row[3] result.append(f"{group_id},{timestamp},{all_contacts[sender]},{content}") # 将列表中的字符串连接成一个最终的结果 result_str = "\n".join(result) print(result_str) # 输出带逗号的字符串 return result_str finally: connection.close() # 关闭数据库连接 # 示例用法 if __name__ == "__main__": group_id = 'XXX@123123' timestamp_str = "2025-02-06 11:15:28" sender = "XXX" content = "This is a test message with a string timestamp." message_type = "text" attachment_url = "http://example.com/attachment.pdf" # 可以为None如果没有附件 archive_message(group_id, timestamp_str, sender, content, message_type, attachment_url)