From 079f36338281d0fda99be6286b2e2b2218349263 Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 2 Apr 2026 17:55:21 +0800 Subject: [PATCH] =?UTF-8?q?Reapply=20"=E5=AE=8C=E5=96=84=E8=A1=A8=E6=83=85?= =?UTF-8?q?=E8=B5=84=E4=BA=A7=E5=90=8E=E5=8F=B0=E8=83=BD=E5=8A=9B=E5=B9=B6?= =?UTF-8?q?=E8=A1=A5=E5=85=85=E7=BE=A4=E6=80=BB=E7=BB=93=E8=90=BD=E5=BA=93?= =?UTF-8?q?"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 57bb46bb21888a439d4428dea67142a10105d1df. --- admin/dashboard/blueprints/contacts.py | 37 +++++ admin/dashboard/server.py | 2 + .../templates/contacts_management.html | 128 +++++++++++++++- admin/dashboard/templates/message_list.html | 17 ++- db/emoji_asset_db.py | 141 ++++++++++++++++++ db/message_storage.py | 16 ++ db/message_summary_db.py | 113 ++++++++++++++ main.py | 5 + plugins/message_summary/main.py | 57 +++++++ utils/wechat/message_to_db.py | 119 ++++++++++++++- wechat_ipad/client/tools.py | 53 ++++++- 11 files changed, 671 insertions(+), 17 deletions(-) create mode 100644 db/emoji_asset_db.py create mode 100644 db/message_summary_db.py diff --git a/admin/dashboard/blueprints/contacts.py b/admin/dashboard/blueprints/contacts.py index 4a513d9..98de0bb 100644 --- a/admin/dashboard/blueprints/contacts.py +++ b/admin/dashboard/blueprints/contacts.py @@ -403,9 +403,46 @@ def api_send_message(): 'message': '消息发送中' }) + elif msg_type == 'emoji': + md5 = content.get('md5') if isinstance(content, dict) else None + total_length = int((content or {}).get('total_length') or 0) if isinstance(content, dict) else 0 + if not md5 or total_length <= 0: + return jsonify({'success': False, 'message': '缺少表情参数'}) + send_message_in_thread(server.client.send_emoji_message, wxid, md5, total_length) + server.emoji_asset_db.mark_sent(md5) + return jsonify({ + 'success': True, + 'message': '表情发送中' + }) + else: return jsonify({'success': False, 'message': '不支持的消息类型'}) except Exception as e: logger.exception(f"发送消息失败: {e}") return jsonify({'success': False, 'message': str(e)}), 500 + + +@contacts_bp.route('/api/emoji_assets', methods=['GET']) +@login_required +def api_emoji_assets(): + """获取表情资产列表API""" + try: + server = current_app.dashboard_server + limit = min(max(int(request.args.get("limit", 60) or 60), 1), 200) + roomid = request.args.get("roomid", "").strip() + assets = server.emoji_asset_db.list_assets(limit=limit, chatroom_id=roomid) + for asset in assets: + source_wxid = asset.get("source_wxid", "") + asset["source_name"] = server.contact_manager.get_nickname(source_wxid) or source_wxid + source_chatroom_id = asset.get("source_chatroom_id", "") + asset["source_chatroom_name"] = server.contact_manager.get_nickname(source_chatroom_id) or source_chatroom_id + return jsonify({ + "success": True, + "data": { + "assets": assets + } + }) + except Exception as e: + logger.error(f"获取表情资产列表失败: {e}") + return jsonify({"success": False, "error": str(e)}), 500 diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py index f2ea481..11f41a5 100644 --- a/admin/dashboard/server.py +++ b/admin/dashboard/server.py @@ -11,6 +11,7 @@ from flask import Flask, send_from_directory from loguru import logger from db.contacts_db import ContactsDBOperator +from db.emoji_asset_db import EmojiAssetDBOperator from db.member_context_db import MemberContextDBOperator from db.message_storage import MessageStorageDB from db.stats_db import StatsDBOperator @@ -43,6 +44,7 @@ class DashboardServer: self.db_manager = robot_instance.db_manager self.stats_db = StatsDBOperator(self.db_manager) self.message_storage = MessageStorageDB(self.db_manager) + self.emoji_asset_db = EmojiAssetDBOperator(self.db_manager) self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager) self.member_context_db = MemberContextDBOperator(self.db_manager) self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager) diff --git a/admin/dashboard/templates/contacts_management.html b/admin/dashboard/templates/contacts_management.html index 834a1a1..572ad4d 100644 --- a/admin/dashboard/templates/contacts_management.html +++ b/admin/dashboard/templates/contacts_management.html @@ -190,9 +190,9 @@ 刷新本群摘要 - + - + - + - + - + - + @@ -390,6 +390,7 @@
+

@@ -399,12 +400,31 @@ {% endraw %}
+
+
+ 后台表情资产 + 刷新 +
+ +
+ {% raw %} +
+ +
+
{{ asset.source_name || asset.source_wxid || '未知来源' }}
+
{{ asset.source_chatroom_name || asset.source_chatroom_id || '全局资产' }}
+
+
+ {% endraw %} +
+
图片 语音 视频 链接 + 表情 发送
@@ -451,6 +471,7 @@ memberContextDialogVisible: false, memberContextLoading: false, memberContext: null, currentContextMember: {}, memberContextEnabled: false, chatDialogVisible: false, currentChatUser: null, messageInput: '', chatMessages: [], + emojiPanelVisible: false, emojiAssetsLoading: false, emojiAssets: [], linkDialogVisible: false, linkForm: { url: '', title: '', description: '' } }; @@ -587,7 +608,13 @@ this.$message.error('刷新成员交互摘要失败'); }).finally(() => { this.memberContextLoading = false; }); }, - openChatDialog(user) { this.currentChatUser = user; this.chatDialogVisible = true; this.chatMessages = []; }, + openChatDialog(user) { + this.currentChatUser = user; + this.chatDialogVisible = true; + this.chatMessages = []; + this.emojiPanelVisible = false; + this.loadEmojiAssets(); + }, async sendTextMessage() { if (!this.messageInput.trim()) return; try { @@ -607,6 +634,56 @@ const formData = new FormData(); formData.append('file', options.file); formData.append('wxid', this.currentChatUser.wxid); formData.append('type', 'video'); try { const response = await axios.post('/contacts/api/send_message', formData); if (response.data.success) { this.chatMessages.push({ type: 'video', content: response.data.url, isSelf: true, time: new Date().toLocaleTimeString() }); this.$nextTick(() => { this.scrollToBottom(); }); } } catch (error) { this.$message.error('发送视频失败'); } }, + toggleEmojiPanel() { + this.emojiPanelVisible = !this.emojiPanelVisible; + if (this.emojiPanelVisible && !this.emojiAssets.length) { + this.loadEmojiAssets(); + } + }, + async loadEmojiAssets() { + this.emojiAssetsLoading = true; + const roomid = this.currentChatUser && this.currentChatUser.wxid && this.currentChatUser.wxid.endsWith('@chatroom') + ? this.currentChatUser.wxid + : ''; + try { + const response = await axios.get('/contacts/api/emoji_assets', { params: { limit: 80, roomid } }); + if (response.data.success) { + this.emojiAssets = response.data.data.assets || []; + } else { + this.$message.error('加载表情资产失败'); + } + } catch (error) { + console.error('加载表情资产失败:', error); + this.$message.error('加载表情资产失败'); + } finally { + this.emojiAssetsLoading = false; + } + }, + async sendEmojiMessage(asset) { + if (!asset || !asset.md5 || !asset.total_length) { + this.$message.warning('表情资产参数不完整'); + return; + } + try { + const response = await axios.post('/contacts/api/send_message', { + wxid: this.currentChatUser.wxid, + type: 'emoji', + content: { + md5: asset.md5, + total_length: asset.total_length + } + }); + if (response.data.success) { + this.chatMessages.push({ type: 'emoji', content: asset.file_path, isSelf: true, time: new Date().toLocaleTimeString() }); + this.$nextTick(() => { this.scrollToBottom(); }); + } else { + this.$message.error(response.data.message || '发送表情失败'); + } + } catch (error) { + console.error('发送表情失败:', error); + this.$message.error('发送表情失败'); + } + }, showLinkDialog() { this.linkForm = { url: '', title: '', description: '' }; this.linkDialogVisible = true; }, async sendLinkMessage() { if (!this.linkForm.url) { this.$message.warning('请输入链接'); return; } @@ -653,6 +730,14 @@ .action-row { display: flex; align-items: center; gap: 8px; flex-wrap: wrap; } .pagination-container { margin-top: 20px; text-align: right; } .group-members-section { margin-top: 20px; } + .group-members-table .entity-cell { min-width: 0; } + .group-members-table .entity-copy { min-width: 0; overflow: hidden; } + .group-members-table .entity-title, + .group-members-table .entity-subtitle { + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + } .section-title { margin: 20px 0 15px 0; border-bottom: 1px solid rgba(148,163,184,0.12); padding-bottom: 10px; display: flex; justify-content: space-between; align-items: center; @@ -682,5 +767,34 @@ .message-time { font-size: 12px; color: #94a3b8; margin-top: 5px; } .input-area { padding: 20px 0 0; } .toolbar { margin-top: 10px; display: flex; gap: 10px; flex-wrap: wrap; } + .chat-emoji-preview { width: 96px; height: 96px; object-fit: contain; display: block; } + .emoji-panel { + margin-bottom: 12px; padding: 12px; border: 1px solid rgba(148,163,184,0.14); + border-radius: 16px; background: rgba(248,250,252,0.86); + } + .emoji-panel-header { + display: flex; align-items: center; justify-content: space-between; margin-bottom: 10px; + color: #334155; font-size: 13px; font-weight: 600; + } + .emoji-grid { + display: grid; grid-template-columns: repeat(auto-fill, minmax(110px, 1fr)); gap: 10px; + max-height: 240px; overflow-y: auto; + } + .emoji-card { + padding: 10px; border-radius: 14px; background: #ffffff; border: 1px solid rgba(148,163,184,0.12); + cursor: pointer; transition: all .2s ease; + } + .emoji-card:hover { + transform: translateY(-1px); + box-shadow: 0 10px 24px rgba(15, 23, 42, 0.08); + border-color: rgba(79,70,229,0.24); + } + .emoji-thumb { width: 72px; height: 72px; display: block; margin: 0 auto 8px; object-fit: contain; } + .emoji-meta { font-size: 11px; color: #64748b; line-height: 1.4; } + .emoji-meta-line { + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + } {% endblock %} diff --git a/admin/dashboard/templates/message_list.html b/admin/dashboard/templates/message_list.html index c207f4a..edae990 100644 --- a/admin/dashboard/templates/message_list.html +++ b/admin/dashboard/templates/message_list.html @@ -92,6 +92,12 @@ +
+
【表情消息】
+ +
表情下载中或暂无预览
+
+
{% raw %}{{ scope.row.content || `【消息类型: ${scope.row.message_type}】` }}{% endraw %}
@@ -127,9 +133,10 @@ {% raw %}{{ getMessageTypeName(selectedMessage.message_type) }}{% endraw %} {% raw %}{{ selectedMessage.content }}{% endraw %} - + + @@ -313,12 +320,18 @@ this.selectedMessage = message; this.detailDialogVisible = true; }, + isEmojiMessage(message) { + if (!message) return false; + return String(message.message_type) === '47' || String(message.message_type) === '1090519089'; + }, getMessageTypeName(type) { const typeMap = { 1: '文本消息', 3: '图片消息', 43: '视频消息', - 49: '链接消息' + 47: '动画表情', + 49: '链接消息', + 1090519089: '大表情' }; return typeMap[type] || `未知类型(${type})`; }, diff --git a/db/emoji_asset_db.py b/db/emoji_asset_db.py new file mode 100644 index 0000000..ef697c0 --- /dev/null +++ b/db/emoji_asset_db.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +from datetime import datetime +from typing import Dict, List, Optional + +from db.base import BaseDBOperator +from db.connection import DBConnectionManager + + +class EmojiAssetDBOperator(BaseDBOperator): + """表情资产数据库操作""" + + def __init__(self, db_manager: DBConnectionManager): + super().__init__(db_manager) + self._create_tables() + + def _create_tables(self): + try: + self.execute_update(""" + CREATE TABLE IF NOT EXISTS t_emoji_asset ( + id INT AUTO_INCREMENT PRIMARY KEY, + md5 VARCHAR(64) NOT NULL COMMENT '表情MD5', + total_length INT NOT NULL DEFAULT 0 COMMENT '表情长度', + file_path VARCHAR(255) NOT NULL COMMENT '本地访问路径', + file_ext VARCHAR(16) DEFAULT '' COMMENT '文件扩展名', + source_message_id BIGINT DEFAULT NULL COMMENT '来源消息ID', + source_chatroom_id VARCHAR(64) DEFAULT '' COMMENT '来源群ID', + source_wxid VARCHAR(64) DEFAULT '' COMMENT '来源发送人', + usage_count INT NOT NULL DEFAULT 0 COMMENT '使用次数', + last_used_at DATETIME DEFAULT NULL COMMENT '最近采集时间', + last_sent_at DATETIME DEFAULT NULL COMMENT '最近发送时间', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + UNIQUE KEY idx_emoji_asset_md5 (md5), + KEY idx_emoji_asset_recent (update_time), + KEY idx_emoji_asset_group (source_chatroom_id, update_time) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='表情资产表'; + """) + except Exception as e: + self.LOG.error(f"创建表情资产表失败: {e}") + + def save_asset(self, asset: Dict) -> bool: + try: + sql = """ + INSERT INTO t_emoji_asset ( + md5, total_length, file_path, file_ext, + source_message_id, source_chatroom_id, source_wxid, + usage_count, last_used_at, last_sent_at + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + total_length = VALUES(total_length), + file_path = VALUES(file_path), + file_ext = VALUES(file_ext), + source_message_id = COALESCE(VALUES(source_message_id), source_message_id), + source_chatroom_id = CASE + WHEN VALUES(source_chatroom_id) IS NULL OR VALUES(source_chatroom_id) = '' THEN source_chatroom_id + ELSE VALUES(source_chatroom_id) + END, + source_wxid = CASE + WHEN VALUES(source_wxid) IS NULL OR VALUES(source_wxid) = '' THEN source_wxid + ELSE VALUES(source_wxid) + END, + usage_count = usage_count + 1, + last_used_at = VALUES(last_used_at) + """ + now = asset.get("last_used_at") or datetime.now().strftime("%Y-%m-%d %H:%M:%S") + params = ( + asset.get("md5", ""), + int(asset.get("total_length", 0) or 0), + asset.get("file_path", ""), + asset.get("file_ext", ""), + asset.get("source_message_id"), + asset.get("source_chatroom_id", ""), + asset.get("source_wxid", ""), + int(asset.get("usage_count", 1) or 1), + now, + asset.get("last_sent_at"), + ) + return self.execute_update(sql, params) + except Exception as e: + self.LOG.error(f"保存表情资产失败: {e}") + return False + + def list_assets(self, limit: int = 60, chatroom_id: str = "") -> List[Dict]: + try: + sql = """ + SELECT id, md5, total_length, file_path, file_ext, source_message_id, + source_chatroom_id, source_wxid, usage_count, last_used_at, + last_sent_at, create_time, update_time + FROM t_emoji_asset + WHERE file_path IS NOT NULL AND file_path <> '' + """ + params = [] + if chatroom_id: + sql += " AND source_chatroom_id = %s " + params.append(chatroom_id) + sql += " ORDER BY COALESCE(last_sent_at, last_used_at, update_time) DESC LIMIT %s " + params.append(limit) + rows = self.execute_query(sql, tuple(params)) or [] + return [self._serialize_row(row) for row in rows] + except Exception as e: + self.LOG.error(f"查询表情资产失败: {e}") + return [] + + def get_asset_by_md5(self, md5: str) -> Optional[Dict]: + try: + sql = """ + SELECT id, md5, total_length, file_path, file_ext, source_message_id, + source_chatroom_id, source_wxid, usage_count, last_used_at, + last_sent_at, create_time, update_time + FROM t_emoji_asset + WHERE md5 = %s + LIMIT 1 + """ + row = self.execute_query(sql, (md5,), fetch_one=True) + return self._serialize_row(row) if row else None + except Exception as e: + self.LOG.error(f"查询表情资产详情失败: {e}") + return None + + def mark_sent(self, md5: str) -> bool: + try: + sql = """ + UPDATE t_emoji_asset + SET last_sent_at = NOW() + WHERE md5 = %s + """ + return self.execute_update(sql, (md5,)) + except Exception as e: + self.LOG.error(f"更新表情发送时间失败: {e}") + return False + + @staticmethod + def _serialize_row(row: Dict) -> Dict: + if not row: + return row + for key in ("last_used_at", "last_sent_at", "create_time", "update_time"): + value = row.get(key) + if isinstance(value, datetime): + row[key] = value.strftime("%Y-%m-%d %H:%M:%S") + return row diff --git a/db/message_storage.py b/db/message_storage.py index 56d20ab..e4a24b0 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -338,6 +338,22 @@ class MessageStorageDB(BaseDBOperator): print(f"更新消息图片文件路径出错: {e}") return False + def get_pending_emoji_messages(self, minutes_ago: int = 1440, limit: int = 50) -> List[Dict]: + """获取最近N分钟内未处理表情的消息""" + sql = """ + SELECT message_id, group_id, sender, message_xml, timestamp, attachment_url, message_type + FROM messages + WHERE message_type IN ('47', '1090519089') + AND image_path IS NULL + AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE) + AND attachment_url IS NOT NULL + AND attachment_url != '' + ORDER BY timestamp ASC + LIMIT %s + """ + params = (minutes_ago, limit) + return self.execute_query(sql, params) or [] + def get_hourly_message_trend(self, group_id: str = None, days: int = 1) -> List[Dict]: """获取指定群组的按小时消息趋势数据 diff --git a/db/message_summary_db.py b/db/message_summary_db.py new file mode 100644 index 0000000..f7b92d7 --- /dev/null +++ b/db/message_summary_db.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +import json +from datetime import datetime +from typing import Dict, Optional + +from db.base import BaseDBOperator +from db.connection import DBConnectionManager + + +class MessageSummaryDBOperator(BaseDBOperator): + """群消息总结数据库操作""" + + def __init__(self, db_manager: DBConnectionManager): + super().__init__(db_manager) + self._create_tables() + + def _create_tables(self): + try: + self.execute_update(""" + CREATE TABLE IF NOT EXISTS t_message_summary ( + id INT AUTO_INCREMENT PRIMARY KEY, + chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID', + group_name VARCHAR(128) DEFAULT '' COMMENT '群名称', + summary_type VARCHAR(16) NOT NULL COMMENT '总结类型 daily|manual', + period_key VARCHAR(32) NOT NULL COMMENT '周期主键,如 2026-04-01', + period_start DATETIME NULL COMMENT '总结周期开始时间', + period_end DATETIME NULL COMMENT '总结周期结束时间', + source_message_count INT NOT NULL DEFAULT 0 COMMENT '源消息数量', + summary_text LONGTEXT COMMENT '总结文本', + image_path VARCHAR(255) DEFAULT NULL COMMENT '总结图片路径', + meta_json LONGTEXT COMMENT '附加元数据JSON', + last_generated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '最后生成时间', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + UNIQUE KEY idx_message_summary (chatroom_id, summary_type, period_key), + KEY idx_message_summary_lookup (chatroom_id, period_end) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='群消息总结表'; + """) + except Exception as e: + self.LOG.error(f"创建群消息总结表失败: {e}") + + def save_summary(self, summary: Dict) -> bool: + try: + data = { + "chatroom_id": summary.get("chatroom_id", ""), + "group_name": summary.get("group_name", ""), + "summary_type": summary.get("summary_type", "daily"), + "period_key": summary.get("period_key", ""), + "period_start": summary.get("period_start"), + "period_end": summary.get("period_end"), + "source_message_count": int(summary.get("source_message_count", 0) or 0), + "summary_text": summary.get("summary_text", ""), + "image_path": summary.get("image_path"), + "meta_json": json.dumps(summary.get("meta", {}), ensure_ascii=False), + "last_generated_at": summary.get( + "last_generated_at", + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ), + } + fields = ", ".join(data.keys()) + placeholders = ", ".join(["%s"] * len(data)) + update_clause = ", ".join( + [ + f"{key}=VALUES({key})" + for key in data.keys() + if key not in ("chatroom_id", "summary_type", "period_key") + ] + ) + sql = f""" + INSERT INTO t_message_summary ({fields}) + VALUES ({placeholders}) + ON DUPLICATE KEY UPDATE {update_clause} + """ + return self.execute_update(sql, tuple(data.values())) + except Exception as e: + self.LOG.error(f"保存群消息总结失败: {e}") + return False + + def get_summary(self, chatroom_id: str, summary_type: str, period_key: str) -> Optional[Dict]: + try: + sql = """ + SELECT * + FROM t_message_summary + WHERE chatroom_id = %s AND summary_type = %s AND period_key = %s + LIMIT 1 + """ + row = self.execute_query(sql, (chatroom_id, summary_type, period_key), fetch_one=True) + return self._deserialize_row(row) + except Exception as e: + self.LOG.error(f"获取群消息总结失败: {e}") + return None + + @staticmethod + def _deserialize_row(row: Optional[Dict]) -> Optional[Dict]: + if not row: + return row + + meta_json = row.get("meta_json") + if meta_json: + try: + row["meta_json"] = json.loads(meta_json) + except Exception: + row["meta_json"] = {} + else: + row["meta_json"] = {} + + for key in ("period_start", "period_end", "last_generated_at", "create_time", "update_time"): + value = row.get(key) + if isinstance(value, datetime): + row[key] = value.strftime("%Y-%m-%d %H:%M:%S") + + row["meta"] = row.get("meta_json", {}) + return row diff --git a/main.py b/main.py index a3a89bf..8cd2f65 100644 --- a/main.py +++ b/main.py @@ -150,6 +150,11 @@ def jobs(robot: Robot): if hasattr(robot, 'message_storage') and robot.message_storage: await robot.message_storage.process_pending_images(minutes_ago=10, batch_size=20) + @async_job.every_minutes(5) + async def process_pending_emojis_job(): + if hasattr(robot, 'message_storage') and robot.message_storage: + await robot.message_storage.process_pending_emojis(minutes_ago=60 * 24 * 7, batch_size=30) + if __name__ == "__main__": diff --git a/plugins/message_summary/main.py b/plugins/message_summary/main.py index df3384f..80bf401 100644 --- a/plugins/message_summary/main.py +++ b/plugins/message_summary/main.py @@ -24,6 +24,7 @@ from utils.string_utils import remove_trailing_content from utils.wechat.contact_manager import ContactManager from utils.wechat.message_to_db import MessageStorage from wechat_ipad import WechatAPIClient +from db.message_summary_db import MessageSummaryDBOperator class MessageSummaryPlugin(MessagePluginInterface): @@ -68,6 +69,7 @@ class MessageSummaryPlugin(MessagePluginInterface): def __init__(self): super().__init__() self.message_storage = None + self.summary_db = None self.revoke = None # 注册功能权限 self.feature = self.register_feature() @@ -82,6 +84,7 @@ class MessageSummaryPlugin(MessagePluginInterface): self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo") self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages") self.message_storage = MessageStorage() + self.summary_db = MessageSummaryDBOperator(context["db_manager"]) self.LOG.debug(f"初始化 {self.name} 插件成功") return True @@ -196,6 +199,7 @@ class MessageSummaryPlugin(MessagePluginInterface): def _sanitize_group_name(self, group_name: str) -> str: """处理群名,去除特殊字符并限制长度""" + group_name = group_name or "" # 去除特殊字符,只保留字母、数字、中文和基本标点 sanitized_name = re.sub(r'[^\w\s\u4e00-\u9fff,.,。]', '', group_name) # 限制长度为15个字符 @@ -206,6 +210,47 @@ class MessageSummaryPlugin(MessagePluginInterface): sanitized_name = "群聊" return sanitized_name + def _save_daily_summary( + self, + group_id: str, + group_name: str, + summary_text: str, + image_path: Optional[str], + yesterday_start: datetime, + yesterday_end: datetime, + message_count: int, + chat_content_length: int + ) -> bool: + """保存每日总结结果,便于后续知识库提取。""" + if not self.summary_db: + self.LOG.warning("消息总结数据库未初始化,跳过保存每日总结") + return False + + period_key = yesterday_start.strftime("%Y-%m-%d") + meta = { + "source": "message_summary_plugin", + "api_url": self._api_url, + "has_image": bool(image_path), + "chat_content_length": int(chat_content_length or 0), + } + saved = self.summary_db.save_summary({ + "chatroom_id": group_id, + "group_name": group_name, + "summary_type": "daily", + "period_key": period_key, + "period_start": yesterday_start.strftime("%Y-%m-%d %H:%M:%S"), + "period_end": yesterday_end.strftime("%Y-%m-%d %H:%M:%S"), + "source_message_count": message_count, + "summary_text": summary_text, + "image_path": image_path, + "meta": meta, + }) + if saved: + self.LOG.info(f"已保存群 {group_id} 的每日总结到数据库: {period_key}") + else: + self.LOG.error(f"保存群 {group_id} 的每日总结失败: {period_key}") + return saved + async def _generate_summary(self, chat_content: str, group_name: str) -> Tuple[str, Optional[str]]: """生成总结""" # Dify API配置 @@ -377,6 +422,18 @@ class MessageSummaryPlugin(MessagePluginInterface): # 生成总结 summary, image_path = await self._generate_summary(chat_content, group_name) + if summary and len(summary.strip()) > 0: + self._save_daily_summary( + group_id=group_id, + group_name=group_name, + summary_text=summary, + image_path=image_path, + yesterday_start=yesterday_start, + yesterday_end=yesterday_end, + message_count=message_count, + chat_content_length=len(chat_content), + ) + if image_path: # 图片生成成功,发送图片 await self.bot.send_image_message(group_id, Path(image_path)) diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index 46096fe..c8a38c8 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -5,14 +5,13 @@ import xml.etree.ElementTree as ET import concurrent.futures # 添加线程池支持 import os import base64 -import imghdr - import re from threading import Lock from typing import Dict from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator +from db.emoji_asset_db import EmojiAssetDBOperator from db.levels_db import LevelsDBOperator from db.message_storage import MessageStorageDB # 导入积分系统 @@ -33,6 +32,7 @@ class MessageStorage: self.db_manager = DBConnectionManager.get_instance() self.message_db = MessageStorageDB(self.db_manager) self.contacts_db = ContactsDBOperator(self.db_manager) + self.emoji_asset_db = EmojiAssetDBOperator(self.db_manager) self.points_db = PointsDBOperator(self.db_manager) # 初始化本地缓存字典,使用 group_id 作为键 @@ -60,6 +60,9 @@ class MessageStorage: # 正则(替代 XML 解析) self._aeskey_re = re.compile(r'aeskey="(.*?)"') self._cdn_re = re.compile(r'cdnthumburl="(.*?)"') + self._emoji_cdn_re = re.compile(r'cdnurl="(.*?)"') + self._emoji_encrypt_re = re.compile(r'encrypturl="(.*?)"') + self._emoji_thumb_re = re.compile(r'thumburl="(.*?)"') # 修改为项目根目录下的 static/images self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", "images") @@ -119,6 +122,118 @@ class MessageStorage: 'error': str(e) } + def _extract_emoji_download_info(self, xml_content: str) -> Dict: + if not xml_content: + return {} + + aeskey_match = self._aeskey_re.search(xml_content) + if not aeskey_match: + return {} + + url_match = ( + self._emoji_cdn_re.search(xml_content) + or self._emoji_encrypt_re.search(xml_content) + or self._emoji_thumb_re.search(xml_content) + ) + if not url_match: + return {} + + md5_match = re.search(r'md5="(.*?)"', xml_content) + length_match = re.search(r'len="(\d+)"', xml_content) + + return { + "aeskey": aeskey_match.group(1), + "url": url_match.group(1), + "md5": md5_match.group(1) if md5_match else "", + "length": int(length_match.group(1)) if length_match else 0, + } + + async def _process_emoji_record(self, msg_record: Dict) -> bool: + if not self.client: + logger.warning("表情消息未处理,微信客户端未初始化") + return False + + message_id = int(msg_record.get("message_id") or 0) + room_id = msg_record.get("group_id") or "unknown" + sender = msg_record.get("sender", "") + xml_content = msg_record.get("attachment_url") or msg_record.get("message_xml") or "" + emoji_info = self._extract_emoji_download_info(xml_content) + if not emoji_info: + logger.warning(f"表情消息解析失败,未提取到下载参数: msg_id={message_id}") + return False + + try: + base64_str = await self.client.download_cdn_file(emoji_info["aeskey"], emoji_info["url"]) + if not base64_str: + logger.warning(f"表情下载返回为空: msg_id={message_id}") + return False + + group_dir = os.path.join(self.image_dir, room_id) + file_stem = f"{message_id}_emoji" + file_path = await self.client.base64_to_file_autoext( + base64_str, + file_stem=file_stem, + file_path=group_dir, + default_ext=".bin", + ) + ext = os.path.splitext(file_path)[1] or ".bin" + file_name = os.path.basename(file_path) + + web_path = f"/static/images/{room_id}/{file_name}" + updated = self.message_db.update_message_image_file_path(message_id, web_path) + if updated: + if emoji_info.get("md5"): + self.emoji_asset_db.save_asset({ + "md5": emoji_info.get("md5", ""), + "total_length": emoji_info.get("length", 0), + "file_path": web_path, + "file_ext": ext, + "source_message_id": message_id, + "source_chatroom_id": room_id, + "source_wxid": sender, + }) + else: + logger.warning(f"表情已落盘但缺少md5,跳过资产入库: msg_id={message_id}") + logger.info( + f"表情处理成功: msg_id={message_id}, roomid={room_id}, ext={ext}, path={web_path}" + ) + else: + logger.warning( + f"表情文件已落盘但数据库未更新: msg_id={message_id}, roomid={room_id}, path={web_path}" + ) + return updated + except Exception as e: + logger.exception(f"处理表情消息出错: msg_id={message_id}, error={e}") + return False + + async def process_pending_emojis(self, minutes_ago: int = 1440, batch_size: int = 20): + """定时处理最近未落盘的表情消息""" + if not self.client: + logger.warning("微信客户端未初始化,跳过表情处理") + return + + try: + pending_messages = self.message_db.get_pending_emoji_messages(minutes_ago, batch_size) + if not pending_messages: + logger.debug(f"未发现待处理的表情消息(最近{minutes_ago}分钟)") + return + + logger.info(f"开始处理 {len(pending_messages)} 条待处理表情消息") + success_count = 0 + fail_count = 0 + + for msg_record in pending_messages: + if await self._process_emoji_record(msg_record): + success_count += 1 + else: + fail_count += 1 + + logger.info( + f"表情处理完成: 成功={success_count}, 失败={fail_count}, 总计={len(pending_messages)}" + ) + except Exception as e: + logger.exception(f"定时处理表情任务出错: {e}") + def process_image(self, msg: WxMessage): """图片消息已通过 archive_message 存入数据库,不再实时处理 改为定时任务批量处理,减少对主流程的影响和数据库锁竞争 diff --git a/wechat_ipad/client/tools.py b/wechat_ipad/client/tools.py index 4b9b5bd..804cacf 100644 --- a/wechat_ipad/client/tools.py +++ b/wechat_ipad/client/tools.py @@ -1,6 +1,7 @@ import base64 import io import os +import imghdr import aiofiles import aiohttp @@ -12,19 +13,19 @@ from wechat_ipad.client.base import WechatAPIClientBase, Proxy class ToolMixin(WechatAPIClientBase): - async def download_image(self, aeskey: str, cdnmidimgurl: str) -> str: - """CDN下载高清图片。 + async def download_cdn_file(self, aeskey: str, file_url: str) -> str: + """通用 CDN 文件下载。 { "Wxid": "string", "FileNo": "string", "FileAesKey": "string" } Args: - aeskey (str): 图片的AES密钥 - cdnmidimgurl (str): 图片的CDN URL + aeskey (str): 文件的AES密钥 + file_url (str): 文件的CDN URL Returns: - str: 图片的base64编码字符串 + str: 文件的base64编码字符串 Raises: UserLoggedOut: 未登录时调用 @@ -34,7 +35,7 @@ class ToolMixin(WechatAPIClientBase): raise UserLoggedOut("请先登录") async with aiohttp.ClientSession() as session: - json_param = {"Wxid": self.wxid, "FileAesKey": aeskey, "FileNo": cdnmidimgurl} + json_param = {"Wxid": self.wxid, "FileAesKey": aeskey, "FileNo": file_url} response = await session.post(f'http://{self.ip}:{self.port}/api/Tools/CdnDownloadImage', json=json_param) json_resp = await response.json() @@ -43,6 +44,10 @@ class ToolMixin(WechatAPIClientBase): else: self.error_handler(json_resp) + async def download_image(self, aeskey: str, cdnmidimgurl: str) -> str: + """CDN下载高清图片。""" + return await self.download_cdn_file(aeskey, cdnmidimgurl) + async def download_voice(self, msg_id: str, voiceurl: str, length: int) -> str: """下载语音文件。 @@ -265,6 +270,42 @@ class ToolMixin(WechatAPIClientBase): except Exception as e: return False + @staticmethod + def guess_file_extension(file_bytes: bytes, default_ext: str = ".bin") -> str: + """根据文件头猜测扩展名。""" + if not file_bytes: + return default_ext + if file_bytes.startswith(b"GIF87a") or file_bytes.startswith(b"GIF89a"): + return ".gif" + if file_bytes.startswith(b"\x89PNG\r\n\x1a\n"): + return ".png" + if file_bytes.startswith(b"RIFF") and file_bytes[8:12] == b"WEBP": + return ".webp" + if file_bytes.startswith(b"\xff\xd8\xff"): + return ".jpg" + detected = imghdr.what(None, h=file_bytes) + if detected: + return f".{detected}" + return default_ext + + @staticmethod + async def base64_to_file_autoext(base64_str: str, file_stem: str, file_path: str, + default_ext: str = ".bin") -> str: + """将base64写入文件,并自动识别扩展名。""" + os.makedirs(file_path, exist_ok=True) + + if ',' in base64_str: + base64_str = base64_str.split(',')[1] + + file_bytes = base64.b64decode(base64_str) + ext = ToolMixin.guess_file_extension(file_bytes, default_ext=default_ext) + full_path = os.path.join(file_path, f"{file_stem}{ext}") + + async with aiofiles.open(full_path, 'wb') as f: + await f.write(file_bytes) + + return full_path + @staticmethod async def file_to_base64(file_path: str) -> str: """将文件转换为base64字符串。