From d64d11a384f34b97c89d81c69e431f5b71415b71 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 21 Apr 2026 14:10:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86@=E5=85=B3=E7=B3=BB=E6=89=B9=E5=A4=84?= =?UTF-8?q?=E7=90=86=E4=B8=9A=E5=8A=A1=E8=BF=81=E7=A7=BB=E5=88=B0=20value?= =?UTF-8?q?=5Frank=20=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 从 MessageStorageDB 移除@抽取与社交图写入逻辑,消息层仅保留归档职责 - 从系统级任务移除 process_pending_mentions,取消 message_to_db 中对应入口 - 在 value_rank 插件新增定时动作 value_rank_mentions_extract(每10分钟) - 在插件内实现窗口化批处理(默认10~20分钟前)、@提取、幂等写入明细/边表/日汇总及 unique_interactors 回填 - 新增插件侧可配置参数 mention_batch_size / mention_window_start_minutes / mention_window_end_minutes --- db/message_storage.py | 396 --------------------------------- plugins/value_rank/config.toml | 5 + plugins/value_rank/main.py | 355 ++++++++++++++++++++++++++++- utils/system_jobs.py | 20 -- utils/wechat/message_to_db.py | 43 ---- 5 files changed, 359 insertions(+), 460 deletions(-) diff --git a/db/message_storage.py b/db/message_storage.py index 6aa0931..0e071e3 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -2,8 +2,6 @@ from datetime import datetime import json -import re -import xml.etree.ElementTree as ET from typing import Dict, List, Optional from db.base import BaseDBOperator @@ -91,400 +89,6 @@ class MessageStorageDB(BaseDBOperator): # 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。 return str(msg) - def _extract_mentioned_user_ids(self, raw_xml: str) -> List[str]: - """从消息 XML 中提取被@用户ID列表,并返回去重后的列表。 - - 解析策略: - 1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点; - 2. 若 XML 解析失败,则退化为正则提取 `atuserlist` 文本; - 3. 去重并过滤空值,保证输出稳定。 - - 返回值示例:`["wxid_a", "wxid_b"]` - """ - raw_xml = str(raw_xml or "") - if not raw_xml: - return [] - - at_user_list_text = "" - try: - root = ET.fromstring(raw_xml) - node = root.find(".//atuserlist") - if node is not None and node.text: - at_user_list_text = str(node.text).strip() - except Exception: - # 兼容异常格式 XML,采用正则兜底,确保尽量不丢数据。 - match = re.search(r"", raw_xml, flags=re.IGNORECASE | re.DOTALL) - if match: - at_user_list_text = str(match.group(1) or "").strip() - - if not at_user_list_text: - return [] - - # 微信 atuserlist 常见分隔符为 ',',但实际环境可能混入 ';' 或空白,这里统一兼容。 - raw_ids = re.split(r"[,\s;]+", at_user_list_text) - seen = set() - result = [] - for uid in raw_ids: - normalized_uid = str(uid or "").strip() - if not normalized_uid or normalized_uid in seen: - continue - seen.add(normalized_uid) - result.append(normalized_uid) - - return result - - def get_pending_mention_extract_messages( - self, - limit: int = 200, - window_start_minutes: int = 20, - window_end_minutes: int = 10, - ) -> List[Dict]: - """获取待处理 @ 抽取的消息批次。 - - 筛选规则: - 1. 群消息; - 2. mentioned_user_ids 为空(表示还未处理); - 3. message_xml 非空; - 4. 仅处理固定时间窗口(默认:10~20分钟前),降低扫描压力与热数据竞争。 - """ - # 兜底修正窗口参数,确保窗口有效:start > end >= 0 - start_m = max(int(window_start_minutes), 1) - end_m = max(int(window_end_minutes), 0) - if start_m <= end_m: - start_m = end_m + 10 - self.LOG.warning( - f"@抽取窗口参数异常,已自动修正: window_start_minutes={window_start_minutes}, " - f"window_end_minutes={window_end_minutes}, 修正后=[NOW-{start_m}m, NOW-{end_m}m)" - ) - - sql = """ - SELECT message_id, group_id, sender, message_xml, timestamp - FROM messages - WHERE group_id IS NOT NULL - AND group_id <> '' - AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '') - AND message_xml IS NOT NULL - AND message_xml <> '' - AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE) - AND timestamp < DATE_SUB(NOW(), INTERVAL %s MINUTE) - ORDER BY timestamp ASC - LIMIT %s - """ - rows = self.execute_query(sql, (start_m, end_m, limit)) or [] - self.LOG.debug( - f"查询待抽取@消息: window=[NOW-{start_m}m, NOW-{end_m}m), limit={limit}, 命中={len(rows)}" - ) - return rows - - def process_pending_mentions( - self, - batch_size: int = 200, - window_start_minutes: int = 20, - window_end_minutes: int = 10, - ) -> Dict[str, int]: - """批量处理待抽取 @ 的消息,并同步社交图数据。 - - 返回统计: - - total: 本批读取条数 - - processed: 成功处理条数 - - with_mentions: 提取到 @ 的消息条数 - - failed: 失败条数 - """ - started_at = datetime.now() - self.LOG.info( - "开始执行@批处理: " - f"batch_size={batch_size}, window_start_minutes={window_start_minutes}, " - f"window_end_minutes={window_end_minutes}" - ) - - rows = self.get_pending_mention_extract_messages( - limit=batch_size, - window_start_minutes=window_start_minutes, - window_end_minutes=window_end_minutes, - ) - if not rows: - elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) - self.LOG.info(f"@批处理结束: 命中0条, 耗时={elapsed_ms}ms") - return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} - - processed = 0 - with_mentions = 0 - failed = 0 - # 记录少量失败样本,便于快速定位问题,不刷屏。 - fail_samples: List[str] = [] - - for idx, row in enumerate(rows, start=1): - try: - message_id = str(row.get("message_id") or "").strip() - group_id = str(row.get("group_id") or "").strip() - sender_id = str(row.get("sender") or "").strip() - raw_xml = str(row.get("message_xml") or "") - ts_raw = row.get("timestamp") - msg_time = self._safe_parse_message_time(ts_raw) - - mentioned_user_ids = self._extract_mentioned_user_ids(raw_xml) - mentioned_user_ids_json = json.dumps(mentioned_user_ids, ensure_ascii=False) - - self._update_message_mentioned_user_ids( - message_id=message_id, - group_id=group_id, - sender_id=sender_id, - mentioned_user_ids_json=mentioned_user_ids_json, - ) - self._persist_mention_graph_data( - group_id=group_id, - sender_id=sender_id, - message_id=message_id, - mentioned_user_ids=mentioned_user_ids, - msg_time=msg_time, - ) - - processed += 1 - if mentioned_user_ids: - with_mentions += 1 - if idx <= 3: - # 前3条打 debug 明细,便于确认当前批处理真实在工作。 - self.LOG.debug( - f"@批处理样本[{idx}]: message_id={message_id}, group_id={group_id}, " - f"sender={sender_id}, mentioned_count={len(mentioned_user_ids)}" - ) - except Exception as e: - failed += 1 - self.LOG.error(f"处理待抽取@消息失败: message_id={row.get('message_id')}, error={e}") - if len(fail_samples) < 5: - fail_samples.append(str(row.get("message_id") or "")) - - elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) - stats = { - "total": len(rows), - "processed": processed, - "with_mentions": with_mentions, - "failed": failed, - } - self.LOG.info( - f"@批处理结束: total={stats['total']}, processed={stats['processed']}, " - f"with_mentions={stats['with_mentions']}, failed={stats['failed']}, 耗时={elapsed_ms}ms, " - f"fail_samples={fail_samples}" - ) - return stats - - @staticmethod - def _safe_parse_message_time(value) -> datetime: - """安全解析消息时间,失败时回退到当前时间。""" - if isinstance(value, datetime): - return value - text = str(value or "").strip() - if not text: - return datetime.now() - try: - return datetime.strptime(text, "%Y-%m-%d %H:%M:%S") - except Exception: - return datetime.now() - - def _update_message_mentioned_user_ids( - self, - message_id: str, - group_id: str, - sender_id: str, - mentioned_user_ids_json: str, - ) -> None: - """回填消息表的 mentioned_user_ids 字段。""" - self.execute_update( - """ - UPDATE messages - SET mentioned_user_ids = %s - WHERE message_id = %s - AND group_id = %s - AND sender = %s - """, - (mentioned_user_ids_json, message_id, group_id, sender_id), - ) - - def _persist_mention_graph_data( - self, - group_id: str, - sender_id: str, - message_id: str, - mentioned_user_ids: List[str], - msg_time: datetime, - ) -> None: - """落盘社交图增量数据(明细 + 边 + 个人日汇总)。 - - 设计原则: - 1. 只在群消息中处理(group_id 为空直接忽略); - 2. 过滤无效 @ 目标(空值、@所有人、自己@自己); - 3. 统计写入失败不抛异常,不影响主消息归档。 - """ - # 非群消息或缺少关键字段时直接跳过,避免写入脏数据。 - if not group_id or not sender_id or not message_id or not mentioned_user_ids: - return - - # 统一清洗被@列表,避免重复统计。 - invalid_mentions = {"notify@all", "all", "@all"} - clean_mentioned_ids: List[str] = [] - seen = set() - for uid in mentioned_user_ids: - normalized_uid = str(uid or "").strip() - if (not normalized_uid or normalized_uid in invalid_mentions or - normalized_uid == sender_id or normalized_uid in seen): - continue - seen.add(normalized_uid) - clean_mentioned_ids.append(normalized_uid) - - if not clean_mentioned_ids: - return - - try: - stat_date = msg_time.strftime("%Y-%m-%d") - msg_time_str = msg_time.strftime("%Y-%m-%d %H:%M:%S") - - # 幂等控制:只处理“该消息中尚未写入明细表”的新增 @ 目标。 - existed_rows = self.execute_query( - """ - SELECT mentioned_user_id - FROM t_message_mentions - WHERE message_id = %s - AND group_id = %s - AND sender_id = %s - """, - (message_id, group_id, sender_id), - ) or [] - existed_ids = {str(r.get("mentioned_user_id") or "").strip() for r in existed_rows} - newly_mentioned_ids = [uid for uid in clean_mentioned_ids if uid not in existed_ids] - if not newly_mentioned_ids: - self.LOG.debug( - f"社交图写入跳过(无新增@关系): message_id={message_id}, group_id={group_id}, sender={sender_id}" - ) - return - - # 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。 - mention_rows = [ - (message_id, group_id, sender_id, mentioned_uid, stat_date, msg_time_str) - for mentioned_uid in newly_mentioned_ids - ] - self.execute_batch( - """ - INSERT IGNORE INTO t_message_mentions - (message_id, group_id, sender_id, mentioned_user_id, stat_date, msg_time) - VALUES (%s, %s, %s, %s, %s, %s) - """, - mention_rows, - ) - - # 2) 写社交日边表:一条 @ 关系视为 sender -> mentioned_uid 的一条有向边增量。 - edge_rows = [ - (stat_date, group_id, sender_id, mentioned_uid, 1, 1.0) - for mentioned_uid in newly_mentioned_ids - ] - self.execute_batch( - """ - INSERT INTO t_social_edges_daily - (stat_date, group_id, from_user_id, to_user_id, mention_count, interaction_score) - VALUES (%s, %s, %s, %s, %s, %s) - ON DUPLICATE KEY UPDATE - mention_count = mention_count + VALUES(mention_count), - interaction_score = interaction_score + VALUES(interaction_score), - update_time = CURRENT_TIMESTAMP - """, - edge_rows, - ) - - # 3) 写个人日汇总:更新被@次数/主动@次数,供 value_rank 直接读取。 - sender_social_row = ( - stat_date, group_id, sender_id, - 0, len(newly_mentioned_ids), 0, - float(len(newly_mentioned_ids)), - ) - self.execute_update( - """ - INSERT INTO t_value_rank_social_daily - (stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score) - VALUES (%s, %s, %s, %s, %s, %s, %s) - ON DUPLICATE KEY UPDATE - mention_others_count = mention_others_count + VALUES(mention_others_count), - interaction_score = interaction_score + VALUES(interaction_score), - update_time = CURRENT_TIMESTAMP - """, - sender_social_row, - ) - - receiver_social_rows = [ - (stat_date, group_id, mentioned_uid, 1, 0, 0, 1.0) - for mentioned_uid in newly_mentioned_ids - ] - self.execute_batch( - """ - INSERT INTO t_value_rank_social_daily - (stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score) - VALUES (%s, %s, %s, %s, %s, %s, %s) - ON DUPLICATE KEY UPDATE - mentioned_count = mentioned_count + VALUES(mentioned_count), - interaction_score = interaction_score + VALUES(interaction_score), - update_time = CURRENT_TIMESTAMP - """, - receiver_social_rows, - ) - - # 4) 回填 unique_interactors:针对本条消息受影响的用户实时重算“去重互动人数”。 - affected_user_ids = [sender_id, *newly_mentioned_ids] - self._refresh_unique_interactors(stat_date, group_id, affected_user_ids) - self.LOG.debug( - f"社交图写入完成: message_id={message_id}, group_id={group_id}, sender={sender_id}, " - f"new_mentions={len(newly_mentioned_ids)}, affected_users={len(affected_user_ids)}" - ) - except Exception as e: - # 社交图统计属于增强链路,不能反向影响主消息入库稳定性。 - self.LOG.error(f"写入社交图增量数据失败: {e}") - - def _refresh_unique_interactors(self, stat_date: str, group_id: str, user_ids: List[str]) -> None: - """重算并回填用户在指定日期内的去重互动人数。 - - 定义: - - 某用户当天主动@过的人 + 被谁@过(去重并集) - """ - if not user_ids: - return - - deduped_user_ids = [] - seen = set() - for uid in user_ids: - normalized_uid = str(uid or "").strip() - if not normalized_uid or normalized_uid in seen: - continue - seen.add(normalized_uid) - deduped_user_ids.append(normalized_uid) - - for uid in deduped_user_ids: - try: - row = self.execute_query( - """ - SELECT COUNT(DISTINCT partner_id) AS partner_count - FROM ( - SELECT mentioned_user_id AS partner_id - FROM t_message_mentions - WHERE stat_date = %s AND group_id = %s AND sender_id = %s - UNION - SELECT sender_id AS partner_id - FROM t_message_mentions - WHERE stat_date = %s AND group_id = %s AND mentioned_user_id = %s - ) t - """, - (stat_date, group_id, uid, stat_date, group_id, uid), - fetch_one=True, - ) or {} - partner_count = int(row.get("partner_count") or 0) - - self.execute_update( - """ - UPDATE t_value_rank_social_daily - SET unique_interactors = %s, update_time = CURRENT_TIMESTAMP - WHERE stat_date = %s AND group_id = %s AND user_id = %s - """, - (partner_count, stat_date, group_id, uid), - ) - except Exception as e: - self.LOG.error(f"回填 unique_interactors 失败: group={group_id}, user={uid}, err={e}") - def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]: """获取最近的消息""" sql = """ diff --git a/plugins/value_rank/config.toml b/plugins/value_rank/config.toml index 497a56f..997d9ef 100644 --- a/plugins/value_rank/config.toml +++ b/plugins/value_rank/config.toml @@ -28,3 +28,8 @@ base_score_scale = 1000 # 排行默认展示数量 default_rank_limit = 10 max_rank_limit = 50 + +# @关系批处理(插件定时任务)参数 +mention_batch_size = 200 +mention_window_start_minutes = 20 +mention_window_end_minutes = 10 diff --git a/plugins/value_rank/main.py b/plugins/value_rank/main.py index 79ce737..e399b0c 100644 --- a/plugins/value_rank/main.py +++ b/plugins/value_rank/main.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- import json import math +import re +import xml.etree.ElementTree as ET from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple @@ -268,6 +270,145 @@ class ValueRankDB(BaseDBOperator): result[user_id] = float(row.get("score") or 0.0) return result + def get_pending_mention_extract_messages_for_group( + self, + group_id: str, + limit: int, + window_start_minutes: int, + window_end_minutes: int, + ) -> List[Dict[str, Any]]: + """按群读取待处理@抽取消息。""" + sql = """ + SELECT message_id, group_id, sender, message_xml, timestamp + FROM messages + WHERE group_id = %s + AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '') + AND message_xml IS NOT NULL + AND message_xml <> '' + AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE) + AND timestamp < DATE_SUB(NOW(), INTERVAL %s MINUTE) + ORDER BY timestamp ASC + LIMIT %s + """ + return self.execute_query(sql, (group_id, window_start_minutes, window_end_minutes, limit)) or [] + + def update_message_mentioned_user_ids( + self, + message_id: str, + group_id: str, + sender_id: str, + mentioned_user_ids_json: str, + ) -> bool: + """回填消息表的 mentioned_user_ids 字段。""" + return self.execute_update( + """ + UPDATE messages + SET mentioned_user_ids = %s + WHERE message_id = %s + AND group_id = %s + AND sender = %s + """, + (mentioned_user_ids_json, message_id, group_id, sender_id), + ) + + def get_existing_mentions(self, message_id: str, group_id: str, sender_id: str) -> List[str]: + """查询某条消息已经入库的@关系,避免重复累加。""" + rows = self.execute_query( + """ + SELECT mentioned_user_id + FROM t_message_mentions + WHERE message_id = %s + AND group_id = %s + AND sender_id = %s + """, + (message_id, group_id, sender_id), + ) or [] + return [str(r.get("mentioned_user_id") or "").strip() for r in rows if str(r.get("mentioned_user_id") or "").strip()] + + def insert_message_mentions(self, rows: List[Tuple[Any, ...]]) -> bool: + """批量写入@明细。""" + if not rows: + return True + return self.execute_batch( + """ + INSERT IGNORE INTO t_message_mentions + (message_id, group_id, sender_id, mentioned_user_id, stat_date, msg_time) + VALUES (%s, %s, %s, %s, %s, %s) + """, + rows, + ) + + def upsert_social_edges_daily(self, rows: List[Tuple[Any, ...]]) -> bool: + """批量累加社交边。""" + if not rows: + return True + return self.execute_batch( + """ + INSERT INTO t_social_edges_daily + (stat_date, group_id, from_user_id, to_user_id, mention_count, interaction_score) + VALUES (%s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + mention_count = mention_count + VALUES(mention_count), + interaction_score = interaction_score + VALUES(interaction_score), + update_time = CURRENT_TIMESTAMP + """, + rows, + ) + + def upsert_social_daily_row(self, row: Tuple[Any, ...]) -> bool: + """写入或更新单个用户的社交日汇总。""" + return self.execute_update( + """ + INSERT INTO t_value_rank_social_daily + (stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + mentioned_count = mentioned_count + VALUES(mentioned_count), + mention_others_count = mention_others_count + VALUES(mention_others_count), + interaction_score = interaction_score + VALUES(interaction_score), + update_time = CURRENT_TIMESTAMP + """, + row, + ) + + def refresh_unique_interactors(self, stat_date: str, group_id: str, user_ids: List[str]) -> None: + """回填去重互动人数。""" + deduped = [] + seen = set() + for uid in user_ids: + normalized = str(uid or "").strip() + if not normalized or normalized in seen: + continue + seen.add(normalized) + deduped.append(normalized) + + for uid in deduped: + row = self.execute_query( + """ + SELECT COUNT(DISTINCT partner_id) AS partner_count + FROM ( + SELECT mentioned_user_id AS partner_id + FROM t_message_mentions + WHERE stat_date = %s AND group_id = %s AND sender_id = %s + UNION + SELECT sender_id AS partner_id + FROM t_message_mentions + WHERE stat_date = %s AND group_id = %s AND mentioned_user_id = %s + ) t + """, + (stat_date, group_id, uid, stat_date, group_id, uid), + fetch_one=True, + ) or {} + partner_count = int(row.get("partner_count") or 0) + self.execute_update( + """ + UPDATE t_value_rank_social_daily + SET unique_interactors = %s, update_time = CURRENT_TIMESTAMP + WHERE stat_date = %s AND group_id = %s AND user_id = %s + """, + (partner_count, stat_date, group_id, uid), + ) + class ValueRankPlugin(MessagePluginInterface): """群成员身价排行插件。 @@ -337,6 +478,9 @@ class ValueRankPlugin(MessagePluginInterface): self.default_rank_limit = 10 self.max_rank_limit = 50 + self.mention_batch_size = 200 + self.mention_window_start_minutes = 20 + self.mention_window_end_minutes = 10 def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件与配置。""" @@ -361,6 +505,9 @@ class ValueRankPlugin(MessagePluginInterface): self.default_rank_limit = int(cfg.get("default_rank_limit", self.default_rank_limit)) self.max_rank_limit = int(cfg.get("max_rank_limit", self.max_rank_limit)) + self.mention_batch_size = int(cfg.get("mention_batch_size", self.mention_batch_size)) + self.mention_window_start_minutes = int(cfg.get("mention_window_start_minutes", self.mention_window_start_minutes)) + self.mention_window_end_minutes = int(cfg.get("mention_window_end_minutes", self.mention_window_end_minutes)) # 权重归一化:避免配置误差导致总权重不为 1。 weight_sum = self.points_weight + self.message_weight + self.active_days_weight + self.social_weight @@ -470,6 +617,17 @@ class ValueRankPlugin(MessagePluginInterface): "payload": {}, "default_enabled": True, }, + { + "action_key": "value_rank_mentions_extract", + "name": "@关系批处理", + "description": "每10分钟批量抽取10-20分钟前的@关系并更新社交图", + "trigger_type": "every_seconds", + "trigger_config": {"seconds": 600}, + "target_scope": "all_enabled_groups", + "target_config": {}, + "payload": {}, + "default_enabled": True, + }, { "action_key": "value_rank_weekly_report_push", "name": "身价周报推送", @@ -486,7 +644,7 @@ class ValueRankPlugin(MessagePluginInterface): async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]: """执行调度动作。""" - if action_key not in {"value_rank_daily_recompute", "value_rank_weekly_report_push"}: + if action_key not in {"value_rank_daily_recompute", "value_rank_weekly_report_push", "value_rank_mentions_extract"}: return {"success": False, "summary": f"不支持动作: {action_key}", "detail": {}} target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()] @@ -504,6 +662,35 @@ class ValueRankPlugin(MessagePluginInterface): stat_date = datetime.now().strftime("%Y-%m-%d") bot = context.get("bot") or getattr(self, "bot", None) + # @抽取任务不依赖 bot。 + if action_key == "value_rank_mentions_extract": + total_stats = {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} + for gid in target_groups: + try: + stats = self._process_pending_mentions_for_group(gid) + total_stats["total"] += int(stats.get("total", 0)) + total_stats["processed"] += int(stats.get("processed", 0)) + total_stats["with_mentions"] += int(stats.get("with_mentions", 0)) + total_stats["failed"] += int(stats.get("failed", 0)) + success_groups.append(gid) + except Exception as e: + failed_groups[gid] = str(e) + + return { + "success": len(failed_groups) == 0, + "summary": ( + f"@关系批处理完成: 读取{total_stats['total']}条, " + f"处理{total_stats['processed']}条, 含@{total_stats['with_mentions']}条, " + f"失败{total_stats['failed']}条, 异常群{len(failed_groups)}个" + ), + "detail": { + "window": f"[NOW-{self.mention_window_start_minutes}m, NOW-{self.mention_window_end_minutes}m)", + "batch_size": self.mention_batch_size, + "stats": total_stats, + "failed_groups": failed_groups, + }, + } + # 周报任务先确保当日快照存在,再执行推送,避免“有报表命令但无数据”。 if action_key == "value_rank_weekly_report_push" and not bot: return {"success": False, "summary": "周报推送失败:bot 未注入", "detail": {}} @@ -864,6 +1051,172 @@ class ValueRankPlugin(MessagePluginInterface): lines.append("提示:分数由积分/发言/活跃/社交影响力综合计算。") return "\n".join(lines) + def _process_pending_mentions_for_group(self, group_id: str) -> Dict[str, int]: + """处理单群待抽取@消息(插件内定时业务)。""" + if not self.db: + return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} + + started_at = datetime.now() + window_start = max(int(self.mention_window_start_minutes), 1) + window_end = max(int(self.mention_window_end_minutes), 0) + if window_start <= window_end: + window_start = window_end + 10 + self.LOG.warning( + f"[{self.name}] @窗口参数异常已修正: group={group_id}, " + f"window_start={self.mention_window_start_minutes}, " + f"window_end={self.mention_window_end_minutes}, fixed=[{window_start},{window_end}]" + ) + + rows = self.db.get_pending_mention_extract_messages_for_group( + group_id=group_id, + limit=self.mention_batch_size, + window_start_minutes=window_start, + window_end_minutes=window_end, + ) + if not rows: + return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} + + processed, with_mentions, failed = 0, 0, 0 + fail_samples: List[str] = [] + + for idx, row in enumerate(rows, start=1): + try: + message_id = str(row.get("message_id") or "").strip() + sender_id = str(row.get("sender") or "").strip() + raw_xml = str(row.get("message_xml") or "") + msg_time = self._safe_parse_message_time(row.get("timestamp")) + + mentioned_ids = self._extract_mentioned_user_ids(raw_xml) + mentioned_ids_json = json.dumps(mentioned_ids, ensure_ascii=False) + self.db.update_message_mentioned_user_ids( + message_id=message_id, + group_id=group_id, + sender_id=sender_id, + mentioned_user_ids_json=mentioned_ids_json, + ) + + self._persist_mention_graph_data( + group_id=group_id, + sender_id=sender_id, + message_id=message_id, + mentioned_user_ids=mentioned_ids, + msg_time=msg_time, + ) + + processed += 1 + if mentioned_ids: + with_mentions += 1 + if idx <= 2: + self.LOG.debug( + f"[{self.name}] @抽取样本: group={group_id}, msg={message_id}, " + f"sender={sender_id}, mentioned_count={len(mentioned_ids)}" + ) + except Exception as e: + failed += 1 + if len(fail_samples) < 5: + fail_samples.append(str(row.get("message_id") or "")) + self.LOG.error(f"[{self.name}] @抽取失败: group={group_id}, message_id={row.get('message_id')}, error={e}") + + elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) + stats = {"total": len(rows), "processed": processed, "with_mentions": with_mentions, "failed": failed} + self.LOG.info( + f"[{self.name}] @批处理完成: group={group_id}, total={stats['total']}, processed={processed}, " + f"with_mentions={with_mentions}, failed={failed}, cost={elapsed_ms}ms, fail_samples={fail_samples}" + ) + return stats + + @staticmethod + def _safe_parse_message_time(value: Any) -> datetime: + """安全解析消息时间,失败时回退到当前时间。""" + if isinstance(value, datetime): + return value + text = str(value or "").strip() + if not text: + return datetime.now() + try: + return datetime.strptime(text, "%Y-%m-%d %H:%M:%S") + except Exception: + return datetime.now() + + @staticmethod + def _extract_mentioned_user_ids(raw_xml: str) -> List[str]: + """从消息 XML 提取@用户ID清单。""" + raw_xml = str(raw_xml or "") + if not raw_xml: + return [] + + at_user_list_text = "" + try: + root = ET.fromstring(raw_xml) + node = root.find(".//atuserlist") + if node is not None and node.text: + at_user_list_text = str(node.text).strip() + except Exception: + match = re.search(r"", raw_xml, flags=re.IGNORECASE | re.DOTALL) + if match: + at_user_list_text = str(match.group(1) or "").strip() + + if not at_user_list_text: + return [] + + raw_ids = re.split(r"[,\s;]+", at_user_list_text) + seen = set() + result: List[str] = [] + for uid in raw_ids: + normalized = str(uid or "").strip() + if not normalized or normalized in seen: + continue + seen.add(normalized) + result.append(normalized) + return result + + def _persist_mention_graph_data( + self, + group_id: str, + sender_id: str, + message_id: str, + mentioned_user_ids: List[str], + msg_time: datetime, + ) -> None: + """落盘社交图增量数据(明细 + 边 + 个人日汇总)。""" + if not self.db or not group_id or not sender_id or not message_id: + return + + invalid_mentions = {"notify@all", "all", "@all"} + clean_ids: List[str] = [] + seen = set() + for uid in mentioned_user_ids: + normalized = str(uid or "").strip() + if (not normalized or normalized in invalid_mentions or normalized == sender_id or normalized in seen): + continue + seen.add(normalized) + clean_ids.append(normalized) + + if not clean_ids: + return + + existing = set(self.db.get_existing_mentions(message_id, group_id, sender_id)) + new_ids = [uid for uid in clean_ids if uid not in existing] + if not new_ids: + return + + stat_date = msg_time.strftime("%Y-%m-%d") + msg_time_text = msg_time.strftime("%Y-%m-%d %H:%M:%S") + + mention_rows = [(message_id, group_id, sender_id, uid, stat_date, msg_time_text) for uid in new_ids] + self.db.insert_message_mentions(mention_rows) + + edge_rows = [(stat_date, group_id, sender_id, uid, 1, 1.0) for uid in new_ids] + self.db.upsert_social_edges_daily(edge_rows) + + # 发起方:主动@次数 + 互动分 + self.db.upsert_social_daily_row((stat_date, group_id, sender_id, 0, len(new_ids), 0, float(len(new_ids)))) + # 接收方:被@次数 + 互动分 + for uid in new_ids: + self.db.upsert_social_daily_row((stat_date, group_id, uid, 1, 0, 0, 1.0)) + + self.db.refresh_unique_interactors(stat_date, group_id, [sender_id, *new_ids]) + def _build_explain_text(self) -> str: """输出算法说明文本。""" return ( diff --git a/utils/system_jobs.py b/utils/system_jobs.py index 7f04aa6..a6141cd 100644 --- a/utils/system_jobs.py +++ b/utils/system_jobs.py @@ -43,14 +43,6 @@ def get_system_job_definitions(robot) -> List[Dict[str, Any]]: "trigger_config": {"seconds": 300}, "handler": _build_process_pending_images_handler(robot), }, - { - "job_key": "process_pending_mentions", - "name": "待抽取@关系处理", - "description": "每 10 分钟处理一次待抽取@消息并更新社交图", - "trigger_type": "every_seconds", - "trigger_config": {"seconds": 600}, - "handler": _build_process_pending_mentions_handler(robot), - }, ] def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]]: @@ -61,18 +53,6 @@ def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None] return _handler -def _build_process_pending_mentions_handler(robot) -> Callable[[], Awaitable[None]]: - async def _handler(): - if hasattr(robot, "message_storage") and robot.message_storage: - await robot.message_storage.process_pending_mentions( - batch_size=200, - window_start_minutes=20, - window_end_minutes=10, - ) - - return _handler - - class SystemJobLoader: """系统任务加载器:从数据库读取调度配置并注册到 async_job。""" diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index 0ec7723..9f8029d 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -358,49 +358,6 @@ class MessageStorage: except Exception as e: logger.exception(f"定时处理媒体任务出错: {e}") - async def process_pending_mentions( - self, - batch_size: int = 200, - window_start_minutes: int = 20, - window_end_minutes: int = 10, - ): - """定时任务:批量处理待抽取 @ 的消息并写入社交图。 - - 说明: - 1. 该任务与主消息归档链路解耦,不阻塞实时收发; - 2. 每次只处理有限批次,避免长事务和数据库抖动; - 3. 重复执行安全:底层按 message_id + sender + mentioned_user_id 做幂等控制。 - 4. 默认只处理 10~20 分钟前的数据,减少对热数据区间的扫描压力。 - """ - try: - started_at = datetime.now() - logger.info( - "触发定时@抽取任务: " - f"batch_size={batch_size}, window=[NOW-{window_start_minutes}m, NOW-{window_end_minutes}m)" - ) - stats = self.message_db.process_pending_mentions( - batch_size=batch_size, - window_start_minutes=window_start_minutes, - window_end_minutes=window_end_minutes, - ) - total = int(stats.get("total", 0)) - if total == 0: - elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) - logger.info(f"定时@抽取任务结束: 无待处理数据, 耗时={elapsed_ms}ms") - return - - elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) - logger.info( - "定时@抽取任务结束: " - f"读取={stats.get('total', 0)}, " - f"处理={stats.get('processed', 0)}, " - f"含@={stats.get('with_mentions', 0)}, " - f"失败={stats.get('failed', 0)}, " - f"耗时={elapsed_ms}ms" - ) - except Exception as e: - logger.exception(f"定时处理@抽取任务出错: {e}") - def _process_image_done(self, future): """任务完成统一回调(极轻量)""" try: