diff --git a/db/message_storage.py b/db/message_storage.py
index 6aa0931..0e071e3 100644
--- a/db/message_storage.py
+++ b/db/message_storage.py
@@ -2,8 +2,6 @@
from datetime import datetime
import json
-import re
-import xml.etree.ElementTree as ET
from typing import Dict, List, Optional
from db.base import BaseDBOperator
@@ -91,400 +89,6 @@ class MessageStorageDB(BaseDBOperator):
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
return str(msg)
- def _extract_mentioned_user_ids(self, raw_xml: str) -> List[str]:
- """从消息 XML 中提取被@用户ID列表,并返回去重后的列表。
-
- 解析策略:
- 1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点;
- 2. 若 XML 解析失败,则退化为正则提取 `atuserlist` 文本;
- 3. 去重并过滤空值,保证输出稳定。
-
- 返回值示例:`["wxid_a", "wxid_b"]`
- """
- raw_xml = str(raw_xml or "")
- if not raw_xml:
- return []
-
- at_user_list_text = ""
- try:
- root = ET.fromstring(raw_xml)
- node = root.find(".//atuserlist")
- if node is not None and node.text:
- at_user_list_text = str(node.text).strip()
- except Exception:
- # 兼容异常格式 XML,采用正则兜底,确保尽量不丢数据。
- match = re.search(r"", raw_xml, flags=re.IGNORECASE | re.DOTALL)
- if match:
- at_user_list_text = str(match.group(1) or "").strip()
-
- if not at_user_list_text:
- return []
-
- # 微信 atuserlist 常见分隔符为 ',',但实际环境可能混入 ';' 或空白,这里统一兼容。
- raw_ids = re.split(r"[,\s;]+", at_user_list_text)
- seen = set()
- result = []
- for uid in raw_ids:
- normalized_uid = str(uid or "").strip()
- if not normalized_uid or normalized_uid in seen:
- continue
- seen.add(normalized_uid)
- result.append(normalized_uid)
-
- return result
-
- def get_pending_mention_extract_messages(
- self,
- limit: int = 200,
- window_start_minutes: int = 20,
- window_end_minutes: int = 10,
- ) -> List[Dict]:
- """获取待处理 @ 抽取的消息批次。
-
- 筛选规则:
- 1. 群消息;
- 2. mentioned_user_ids 为空(表示还未处理);
- 3. message_xml 非空;
- 4. 仅处理固定时间窗口(默认:10~20分钟前),降低扫描压力与热数据竞争。
- """
- # 兜底修正窗口参数,确保窗口有效:start > end >= 0
- start_m = max(int(window_start_minutes), 1)
- end_m = max(int(window_end_minutes), 0)
- if start_m <= end_m:
- start_m = end_m + 10
- self.LOG.warning(
- f"@抽取窗口参数异常,已自动修正: window_start_minutes={window_start_minutes}, "
- f"window_end_minutes={window_end_minutes}, 修正后=[NOW-{start_m}m, NOW-{end_m}m)"
- )
-
- sql = """
- SELECT message_id, group_id, sender, message_xml, timestamp
- FROM messages
- WHERE group_id IS NOT NULL
- AND group_id <> ''
- AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '')
- AND message_xml IS NOT NULL
- AND message_xml <> ''
- AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE)
- AND timestamp < DATE_SUB(NOW(), INTERVAL %s MINUTE)
- ORDER BY timestamp ASC
- LIMIT %s
- """
- rows = self.execute_query(sql, (start_m, end_m, limit)) or []
- self.LOG.debug(
- f"查询待抽取@消息: window=[NOW-{start_m}m, NOW-{end_m}m), limit={limit}, 命中={len(rows)}"
- )
- return rows
-
- def process_pending_mentions(
- self,
- batch_size: int = 200,
- window_start_minutes: int = 20,
- window_end_minutes: int = 10,
- ) -> Dict[str, int]:
- """批量处理待抽取 @ 的消息,并同步社交图数据。
-
- 返回统计:
- - total: 本批读取条数
- - processed: 成功处理条数
- - with_mentions: 提取到 @ 的消息条数
- - failed: 失败条数
- """
- started_at = datetime.now()
- self.LOG.info(
- "开始执行@批处理: "
- f"batch_size={batch_size}, window_start_minutes={window_start_minutes}, "
- f"window_end_minutes={window_end_minutes}"
- )
-
- rows = self.get_pending_mention_extract_messages(
- limit=batch_size,
- window_start_minutes=window_start_minutes,
- window_end_minutes=window_end_minutes,
- )
- if not rows:
- elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
- self.LOG.info(f"@批处理结束: 命中0条, 耗时={elapsed_ms}ms")
- return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
-
- processed = 0
- with_mentions = 0
- failed = 0
- # 记录少量失败样本,便于快速定位问题,不刷屏。
- fail_samples: List[str] = []
-
- for idx, row in enumerate(rows, start=1):
- try:
- message_id = str(row.get("message_id") or "").strip()
- group_id = str(row.get("group_id") or "").strip()
- sender_id = str(row.get("sender") or "").strip()
- raw_xml = str(row.get("message_xml") or "")
- ts_raw = row.get("timestamp")
- msg_time = self._safe_parse_message_time(ts_raw)
-
- mentioned_user_ids = self._extract_mentioned_user_ids(raw_xml)
- mentioned_user_ids_json = json.dumps(mentioned_user_ids, ensure_ascii=False)
-
- self._update_message_mentioned_user_ids(
- message_id=message_id,
- group_id=group_id,
- sender_id=sender_id,
- mentioned_user_ids_json=mentioned_user_ids_json,
- )
- self._persist_mention_graph_data(
- group_id=group_id,
- sender_id=sender_id,
- message_id=message_id,
- mentioned_user_ids=mentioned_user_ids,
- msg_time=msg_time,
- )
-
- processed += 1
- if mentioned_user_ids:
- with_mentions += 1
- if idx <= 3:
- # 前3条打 debug 明细,便于确认当前批处理真实在工作。
- self.LOG.debug(
- f"@批处理样本[{idx}]: message_id={message_id}, group_id={group_id}, "
- f"sender={sender_id}, mentioned_count={len(mentioned_user_ids)}"
- )
- except Exception as e:
- failed += 1
- self.LOG.error(f"处理待抽取@消息失败: message_id={row.get('message_id')}, error={e}")
- if len(fail_samples) < 5:
- fail_samples.append(str(row.get("message_id") or ""))
-
- elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
- stats = {
- "total": len(rows),
- "processed": processed,
- "with_mentions": with_mentions,
- "failed": failed,
- }
- self.LOG.info(
- f"@批处理结束: total={stats['total']}, processed={stats['processed']}, "
- f"with_mentions={stats['with_mentions']}, failed={stats['failed']}, 耗时={elapsed_ms}ms, "
- f"fail_samples={fail_samples}"
- )
- return stats
-
- @staticmethod
- def _safe_parse_message_time(value) -> datetime:
- """安全解析消息时间,失败时回退到当前时间。"""
- if isinstance(value, datetime):
- return value
- text = str(value or "").strip()
- if not text:
- return datetime.now()
- try:
- return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
- except Exception:
- return datetime.now()
-
- def _update_message_mentioned_user_ids(
- self,
- message_id: str,
- group_id: str,
- sender_id: str,
- mentioned_user_ids_json: str,
- ) -> None:
- """回填消息表的 mentioned_user_ids 字段。"""
- self.execute_update(
- """
- UPDATE messages
- SET mentioned_user_ids = %s
- WHERE message_id = %s
- AND group_id = %s
- AND sender = %s
- """,
- (mentioned_user_ids_json, message_id, group_id, sender_id),
- )
-
- def _persist_mention_graph_data(
- self,
- group_id: str,
- sender_id: str,
- message_id: str,
- mentioned_user_ids: List[str],
- msg_time: datetime,
- ) -> None:
- """落盘社交图增量数据(明细 + 边 + 个人日汇总)。
-
- 设计原则:
- 1. 只在群消息中处理(group_id 为空直接忽略);
- 2. 过滤无效 @ 目标(空值、@所有人、自己@自己);
- 3. 统计写入失败不抛异常,不影响主消息归档。
- """
- # 非群消息或缺少关键字段时直接跳过,避免写入脏数据。
- if not group_id or not sender_id or not message_id or not mentioned_user_ids:
- return
-
- # 统一清洗被@列表,避免重复统计。
- invalid_mentions = {"notify@all", "all", "@all"}
- clean_mentioned_ids: List[str] = []
- seen = set()
- for uid in mentioned_user_ids:
- normalized_uid = str(uid or "").strip()
- if (not normalized_uid or normalized_uid in invalid_mentions or
- normalized_uid == sender_id or normalized_uid in seen):
- continue
- seen.add(normalized_uid)
- clean_mentioned_ids.append(normalized_uid)
-
- if not clean_mentioned_ids:
- return
-
- try:
- stat_date = msg_time.strftime("%Y-%m-%d")
- msg_time_str = msg_time.strftime("%Y-%m-%d %H:%M:%S")
-
- # 幂等控制:只处理“该消息中尚未写入明细表”的新增 @ 目标。
- existed_rows = self.execute_query(
- """
- SELECT mentioned_user_id
- FROM t_message_mentions
- WHERE message_id = %s
- AND group_id = %s
- AND sender_id = %s
- """,
- (message_id, group_id, sender_id),
- ) or []
- existed_ids = {str(r.get("mentioned_user_id") or "").strip() for r in existed_rows}
- newly_mentioned_ids = [uid for uid in clean_mentioned_ids if uid not in existed_ids]
- if not newly_mentioned_ids:
- self.LOG.debug(
- f"社交图写入跳过(无新增@关系): message_id={message_id}, group_id={group_id}, sender={sender_id}"
- )
- return
-
- # 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。
- mention_rows = [
- (message_id, group_id, sender_id, mentioned_uid, stat_date, msg_time_str)
- for mentioned_uid in newly_mentioned_ids
- ]
- self.execute_batch(
- """
- INSERT IGNORE INTO t_message_mentions
- (message_id, group_id, sender_id, mentioned_user_id, stat_date, msg_time)
- VALUES (%s, %s, %s, %s, %s, %s)
- """,
- mention_rows,
- )
-
- # 2) 写社交日边表:一条 @ 关系视为 sender -> mentioned_uid 的一条有向边增量。
- edge_rows = [
- (stat_date, group_id, sender_id, mentioned_uid, 1, 1.0)
- for mentioned_uid in newly_mentioned_ids
- ]
- self.execute_batch(
- """
- INSERT INTO t_social_edges_daily
- (stat_date, group_id, from_user_id, to_user_id, mention_count, interaction_score)
- VALUES (%s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- mention_count = mention_count + VALUES(mention_count),
- interaction_score = interaction_score + VALUES(interaction_score),
- update_time = CURRENT_TIMESTAMP
- """,
- edge_rows,
- )
-
- # 3) 写个人日汇总:更新被@次数/主动@次数,供 value_rank 直接读取。
- sender_social_row = (
- stat_date, group_id, sender_id,
- 0, len(newly_mentioned_ids), 0,
- float(len(newly_mentioned_ids)),
- )
- self.execute_update(
- """
- INSERT INTO t_value_rank_social_daily
- (stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- mention_others_count = mention_others_count + VALUES(mention_others_count),
- interaction_score = interaction_score + VALUES(interaction_score),
- update_time = CURRENT_TIMESTAMP
- """,
- sender_social_row,
- )
-
- receiver_social_rows = [
- (stat_date, group_id, mentioned_uid, 1, 0, 0, 1.0)
- for mentioned_uid in newly_mentioned_ids
- ]
- self.execute_batch(
- """
- INSERT INTO t_value_rank_social_daily
- (stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- mentioned_count = mentioned_count + VALUES(mentioned_count),
- interaction_score = interaction_score + VALUES(interaction_score),
- update_time = CURRENT_TIMESTAMP
- """,
- receiver_social_rows,
- )
-
- # 4) 回填 unique_interactors:针对本条消息受影响的用户实时重算“去重互动人数”。
- affected_user_ids = [sender_id, *newly_mentioned_ids]
- self._refresh_unique_interactors(stat_date, group_id, affected_user_ids)
- self.LOG.debug(
- f"社交图写入完成: message_id={message_id}, group_id={group_id}, sender={sender_id}, "
- f"new_mentions={len(newly_mentioned_ids)}, affected_users={len(affected_user_ids)}"
- )
- except Exception as e:
- # 社交图统计属于增强链路,不能反向影响主消息入库稳定性。
- self.LOG.error(f"写入社交图增量数据失败: {e}")
-
- def _refresh_unique_interactors(self, stat_date: str, group_id: str, user_ids: List[str]) -> None:
- """重算并回填用户在指定日期内的去重互动人数。
-
- 定义:
- - 某用户当天主动@过的人 + 被谁@过(去重并集)
- """
- if not user_ids:
- return
-
- deduped_user_ids = []
- seen = set()
- for uid in user_ids:
- normalized_uid = str(uid or "").strip()
- if not normalized_uid or normalized_uid in seen:
- continue
- seen.add(normalized_uid)
- deduped_user_ids.append(normalized_uid)
-
- for uid in deduped_user_ids:
- try:
- row = self.execute_query(
- """
- SELECT COUNT(DISTINCT partner_id) AS partner_count
- FROM (
- SELECT mentioned_user_id AS partner_id
- FROM t_message_mentions
- WHERE stat_date = %s AND group_id = %s AND sender_id = %s
- UNION
- SELECT sender_id AS partner_id
- FROM t_message_mentions
- WHERE stat_date = %s AND group_id = %s AND mentioned_user_id = %s
- ) t
- """,
- (stat_date, group_id, uid, stat_date, group_id, uid),
- fetch_one=True,
- ) or {}
- partner_count = int(row.get("partner_count") or 0)
-
- self.execute_update(
- """
- UPDATE t_value_rank_social_daily
- SET unique_interactors = %s, update_time = CURRENT_TIMESTAMP
- WHERE stat_date = %s AND group_id = %s AND user_id = %s
- """,
- (partner_count, stat_date, group_id, uid),
- )
- except Exception as e:
- self.LOG.error(f"回填 unique_interactors 失败: group={group_id}, user={uid}, err={e}")
-
def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]:
"""获取最近的消息"""
sql = """
diff --git a/plugins/value_rank/config.toml b/plugins/value_rank/config.toml
index 497a56f..997d9ef 100644
--- a/plugins/value_rank/config.toml
+++ b/plugins/value_rank/config.toml
@@ -28,3 +28,8 @@ base_score_scale = 1000
# 排行默认展示数量
default_rank_limit = 10
max_rank_limit = 50
+
+# @关系批处理(插件定时任务)参数
+mention_batch_size = 200
+mention_window_start_minutes = 20
+mention_window_end_minutes = 10
diff --git a/plugins/value_rank/main.py b/plugins/value_rank/main.py
index 79ce737..e399b0c 100644
--- a/plugins/value_rank/main.py
+++ b/plugins/value_rank/main.py
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import json
import math
+import re
+import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
@@ -268,6 +270,145 @@ class ValueRankDB(BaseDBOperator):
result[user_id] = float(row.get("score") or 0.0)
return result
+ def get_pending_mention_extract_messages_for_group(
+ self,
+ group_id: str,
+ limit: int,
+ window_start_minutes: int,
+ window_end_minutes: int,
+ ) -> List[Dict[str, Any]]:
+ """按群读取待处理@抽取消息。"""
+ sql = """
+ SELECT message_id, group_id, sender, message_xml, timestamp
+ FROM messages
+ WHERE group_id = %s
+ AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '')
+ AND message_xml IS NOT NULL
+ AND message_xml <> ''
+ AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE)
+ AND timestamp < DATE_SUB(NOW(), INTERVAL %s MINUTE)
+ ORDER BY timestamp ASC
+ LIMIT %s
+ """
+ return self.execute_query(sql, (group_id, window_start_minutes, window_end_minutes, limit)) or []
+
+ def update_message_mentioned_user_ids(
+ self,
+ message_id: str,
+ group_id: str,
+ sender_id: str,
+ mentioned_user_ids_json: str,
+ ) -> bool:
+ """回填消息表的 mentioned_user_ids 字段。"""
+ return self.execute_update(
+ """
+ UPDATE messages
+ SET mentioned_user_ids = %s
+ WHERE message_id = %s
+ AND group_id = %s
+ AND sender = %s
+ """,
+ (mentioned_user_ids_json, message_id, group_id, sender_id),
+ )
+
+ def get_existing_mentions(self, message_id: str, group_id: str, sender_id: str) -> List[str]:
+ """查询某条消息已经入库的@关系,避免重复累加。"""
+ rows = self.execute_query(
+ """
+ SELECT mentioned_user_id
+ FROM t_message_mentions
+ WHERE message_id = %s
+ AND group_id = %s
+ AND sender_id = %s
+ """,
+ (message_id, group_id, sender_id),
+ ) or []
+ return [str(r.get("mentioned_user_id") or "").strip() for r in rows if str(r.get("mentioned_user_id") or "").strip()]
+
+ def insert_message_mentions(self, rows: List[Tuple[Any, ...]]) -> bool:
+ """批量写入@明细。"""
+ if not rows:
+ return True
+ return self.execute_batch(
+ """
+ INSERT IGNORE INTO t_message_mentions
+ (message_id, group_id, sender_id, mentioned_user_id, stat_date, msg_time)
+ VALUES (%s, %s, %s, %s, %s, %s)
+ """,
+ rows,
+ )
+
+ def upsert_social_edges_daily(self, rows: List[Tuple[Any, ...]]) -> bool:
+ """批量累加社交边。"""
+ if not rows:
+ return True
+ return self.execute_batch(
+ """
+ INSERT INTO t_social_edges_daily
+ (stat_date, group_id, from_user_id, to_user_id, mention_count, interaction_score)
+ VALUES (%s, %s, %s, %s, %s, %s)
+ ON DUPLICATE KEY UPDATE
+ mention_count = mention_count + VALUES(mention_count),
+ interaction_score = interaction_score + VALUES(interaction_score),
+ update_time = CURRENT_TIMESTAMP
+ """,
+ rows,
+ )
+
+ def upsert_social_daily_row(self, row: Tuple[Any, ...]) -> bool:
+ """写入或更新单个用户的社交日汇总。"""
+ return self.execute_update(
+ """
+ INSERT INTO t_value_rank_social_daily
+ (stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score)
+ VALUES (%s, %s, %s, %s, %s, %s, %s)
+ ON DUPLICATE KEY UPDATE
+ mentioned_count = mentioned_count + VALUES(mentioned_count),
+ mention_others_count = mention_others_count + VALUES(mention_others_count),
+ interaction_score = interaction_score + VALUES(interaction_score),
+ update_time = CURRENT_TIMESTAMP
+ """,
+ row,
+ )
+
+ def refresh_unique_interactors(self, stat_date: str, group_id: str, user_ids: List[str]) -> None:
+ """回填去重互动人数。"""
+ deduped = []
+ seen = set()
+ for uid in user_ids:
+ normalized = str(uid or "").strip()
+ if not normalized or normalized in seen:
+ continue
+ seen.add(normalized)
+ deduped.append(normalized)
+
+ for uid in deduped:
+ row = self.execute_query(
+ """
+ SELECT COUNT(DISTINCT partner_id) AS partner_count
+ FROM (
+ SELECT mentioned_user_id AS partner_id
+ FROM t_message_mentions
+ WHERE stat_date = %s AND group_id = %s AND sender_id = %s
+ UNION
+ SELECT sender_id AS partner_id
+ FROM t_message_mentions
+ WHERE stat_date = %s AND group_id = %s AND mentioned_user_id = %s
+ ) t
+ """,
+ (stat_date, group_id, uid, stat_date, group_id, uid),
+ fetch_one=True,
+ ) or {}
+ partner_count = int(row.get("partner_count") or 0)
+ self.execute_update(
+ """
+ UPDATE t_value_rank_social_daily
+ SET unique_interactors = %s, update_time = CURRENT_TIMESTAMP
+ WHERE stat_date = %s AND group_id = %s AND user_id = %s
+ """,
+ (partner_count, stat_date, group_id, uid),
+ )
+
class ValueRankPlugin(MessagePluginInterface):
"""群成员身价排行插件。
@@ -337,6 +478,9 @@ class ValueRankPlugin(MessagePluginInterface):
self.default_rank_limit = 10
self.max_rank_limit = 50
+ self.mention_batch_size = 200
+ self.mention_window_start_minutes = 20
+ self.mention_window_end_minutes = 10
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件与配置。"""
@@ -361,6 +505,9 @@ class ValueRankPlugin(MessagePluginInterface):
self.default_rank_limit = int(cfg.get("default_rank_limit", self.default_rank_limit))
self.max_rank_limit = int(cfg.get("max_rank_limit", self.max_rank_limit))
+ self.mention_batch_size = int(cfg.get("mention_batch_size", self.mention_batch_size))
+ self.mention_window_start_minutes = int(cfg.get("mention_window_start_minutes", self.mention_window_start_minutes))
+ self.mention_window_end_minutes = int(cfg.get("mention_window_end_minutes", self.mention_window_end_minutes))
# 权重归一化:避免配置误差导致总权重不为 1。
weight_sum = self.points_weight + self.message_weight + self.active_days_weight + self.social_weight
@@ -470,6 +617,17 @@ class ValueRankPlugin(MessagePluginInterface):
"payload": {},
"default_enabled": True,
},
+ {
+ "action_key": "value_rank_mentions_extract",
+ "name": "@关系批处理",
+ "description": "每10分钟批量抽取10-20分钟前的@关系并更新社交图",
+ "trigger_type": "every_seconds",
+ "trigger_config": {"seconds": 600},
+ "target_scope": "all_enabled_groups",
+ "target_config": {},
+ "payload": {},
+ "default_enabled": True,
+ },
{
"action_key": "value_rank_weekly_report_push",
"name": "身价周报推送",
@@ -486,7 +644,7 @@ class ValueRankPlugin(MessagePluginInterface):
async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行调度动作。"""
- if action_key not in {"value_rank_daily_recompute", "value_rank_weekly_report_push"}:
+ if action_key not in {"value_rank_daily_recompute", "value_rank_weekly_report_push", "value_rank_mentions_extract"}:
return {"success": False, "summary": f"不支持动作: {action_key}", "detail": {}}
target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()]
@@ -504,6 +662,35 @@ class ValueRankPlugin(MessagePluginInterface):
stat_date = datetime.now().strftime("%Y-%m-%d")
bot = context.get("bot") or getattr(self, "bot", None)
+ # @抽取任务不依赖 bot。
+ if action_key == "value_rank_mentions_extract":
+ total_stats = {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
+ for gid in target_groups:
+ try:
+ stats = self._process_pending_mentions_for_group(gid)
+ total_stats["total"] += int(stats.get("total", 0))
+ total_stats["processed"] += int(stats.get("processed", 0))
+ total_stats["with_mentions"] += int(stats.get("with_mentions", 0))
+ total_stats["failed"] += int(stats.get("failed", 0))
+ success_groups.append(gid)
+ except Exception as e:
+ failed_groups[gid] = str(e)
+
+ return {
+ "success": len(failed_groups) == 0,
+ "summary": (
+ f"@关系批处理完成: 读取{total_stats['total']}条, "
+ f"处理{total_stats['processed']}条, 含@{total_stats['with_mentions']}条, "
+ f"失败{total_stats['failed']}条, 异常群{len(failed_groups)}个"
+ ),
+ "detail": {
+ "window": f"[NOW-{self.mention_window_start_minutes}m, NOW-{self.mention_window_end_minutes}m)",
+ "batch_size": self.mention_batch_size,
+ "stats": total_stats,
+ "failed_groups": failed_groups,
+ },
+ }
+
# 周报任务先确保当日快照存在,再执行推送,避免“有报表命令但无数据”。
if action_key == "value_rank_weekly_report_push" and not bot:
return {"success": False, "summary": "周报推送失败:bot 未注入", "detail": {}}
@@ -864,6 +1051,172 @@ class ValueRankPlugin(MessagePluginInterface):
lines.append("提示:分数由积分/发言/活跃/社交影响力综合计算。")
return "\n".join(lines)
+ def _process_pending_mentions_for_group(self, group_id: str) -> Dict[str, int]:
+ """处理单群待抽取@消息(插件内定时业务)。"""
+ if not self.db:
+ return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
+
+ started_at = datetime.now()
+ window_start = max(int(self.mention_window_start_minutes), 1)
+ window_end = max(int(self.mention_window_end_minutes), 0)
+ if window_start <= window_end:
+ window_start = window_end + 10
+ self.LOG.warning(
+ f"[{self.name}] @窗口参数异常已修正: group={group_id}, "
+ f"window_start={self.mention_window_start_minutes}, "
+ f"window_end={self.mention_window_end_minutes}, fixed=[{window_start},{window_end}]"
+ )
+
+ rows = self.db.get_pending_mention_extract_messages_for_group(
+ group_id=group_id,
+ limit=self.mention_batch_size,
+ window_start_minutes=window_start,
+ window_end_minutes=window_end,
+ )
+ if not rows:
+ return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
+
+ processed, with_mentions, failed = 0, 0, 0
+ fail_samples: List[str] = []
+
+ for idx, row in enumerate(rows, start=1):
+ try:
+ message_id = str(row.get("message_id") or "").strip()
+ sender_id = str(row.get("sender") or "").strip()
+ raw_xml = str(row.get("message_xml") or "")
+ msg_time = self._safe_parse_message_time(row.get("timestamp"))
+
+ mentioned_ids = self._extract_mentioned_user_ids(raw_xml)
+ mentioned_ids_json = json.dumps(mentioned_ids, ensure_ascii=False)
+ self.db.update_message_mentioned_user_ids(
+ message_id=message_id,
+ group_id=group_id,
+ sender_id=sender_id,
+ mentioned_user_ids_json=mentioned_ids_json,
+ )
+
+ self._persist_mention_graph_data(
+ group_id=group_id,
+ sender_id=sender_id,
+ message_id=message_id,
+ mentioned_user_ids=mentioned_ids,
+ msg_time=msg_time,
+ )
+
+ processed += 1
+ if mentioned_ids:
+ with_mentions += 1
+ if idx <= 2:
+ self.LOG.debug(
+ f"[{self.name}] @抽取样本: group={group_id}, msg={message_id}, "
+ f"sender={sender_id}, mentioned_count={len(mentioned_ids)}"
+ )
+ except Exception as e:
+ failed += 1
+ if len(fail_samples) < 5:
+ fail_samples.append(str(row.get("message_id") or ""))
+ self.LOG.error(f"[{self.name}] @抽取失败: group={group_id}, message_id={row.get('message_id')}, error={e}")
+
+ elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
+ stats = {"total": len(rows), "processed": processed, "with_mentions": with_mentions, "failed": failed}
+ self.LOG.info(
+ f"[{self.name}] @批处理完成: group={group_id}, total={stats['total']}, processed={processed}, "
+ f"with_mentions={with_mentions}, failed={failed}, cost={elapsed_ms}ms, fail_samples={fail_samples}"
+ )
+ return stats
+
+ @staticmethod
+ def _safe_parse_message_time(value: Any) -> datetime:
+ """安全解析消息时间,失败时回退到当前时间。"""
+ if isinstance(value, datetime):
+ return value
+ text = str(value or "").strip()
+ if not text:
+ return datetime.now()
+ try:
+ return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
+ except Exception:
+ return datetime.now()
+
+ @staticmethod
+ def _extract_mentioned_user_ids(raw_xml: str) -> List[str]:
+ """从消息 XML 提取@用户ID清单。"""
+ raw_xml = str(raw_xml or "")
+ if not raw_xml:
+ return []
+
+ at_user_list_text = ""
+ try:
+ root = ET.fromstring(raw_xml)
+ node = root.find(".//atuserlist")
+ if node is not None and node.text:
+ at_user_list_text = str(node.text).strip()
+ except Exception:
+ match = re.search(r"", raw_xml, flags=re.IGNORECASE | re.DOTALL)
+ if match:
+ at_user_list_text = str(match.group(1) or "").strip()
+
+ if not at_user_list_text:
+ return []
+
+ raw_ids = re.split(r"[,\s;]+", at_user_list_text)
+ seen = set()
+ result: List[str] = []
+ for uid in raw_ids:
+ normalized = str(uid or "").strip()
+ if not normalized or normalized in seen:
+ continue
+ seen.add(normalized)
+ result.append(normalized)
+ return result
+
+ def _persist_mention_graph_data(
+ self,
+ group_id: str,
+ sender_id: str,
+ message_id: str,
+ mentioned_user_ids: List[str],
+ msg_time: datetime,
+ ) -> None:
+ """落盘社交图增量数据(明细 + 边 + 个人日汇总)。"""
+ if not self.db or not group_id or not sender_id or not message_id:
+ return
+
+ invalid_mentions = {"notify@all", "all", "@all"}
+ clean_ids: List[str] = []
+ seen = set()
+ for uid in mentioned_user_ids:
+ normalized = str(uid or "").strip()
+ if (not normalized or normalized in invalid_mentions or normalized == sender_id or normalized in seen):
+ continue
+ seen.add(normalized)
+ clean_ids.append(normalized)
+
+ if not clean_ids:
+ return
+
+ existing = set(self.db.get_existing_mentions(message_id, group_id, sender_id))
+ new_ids = [uid for uid in clean_ids if uid not in existing]
+ if not new_ids:
+ return
+
+ stat_date = msg_time.strftime("%Y-%m-%d")
+ msg_time_text = msg_time.strftime("%Y-%m-%d %H:%M:%S")
+
+ mention_rows = [(message_id, group_id, sender_id, uid, stat_date, msg_time_text) for uid in new_ids]
+ self.db.insert_message_mentions(mention_rows)
+
+ edge_rows = [(stat_date, group_id, sender_id, uid, 1, 1.0) for uid in new_ids]
+ self.db.upsert_social_edges_daily(edge_rows)
+
+ # 发起方:主动@次数 + 互动分
+ self.db.upsert_social_daily_row((stat_date, group_id, sender_id, 0, len(new_ids), 0, float(len(new_ids))))
+ # 接收方:被@次数 + 互动分
+ for uid in new_ids:
+ self.db.upsert_social_daily_row((stat_date, group_id, uid, 1, 0, 0, 1.0))
+
+ self.db.refresh_unique_interactors(stat_date, group_id, [sender_id, *new_ids])
+
def _build_explain_text(self) -> str:
"""输出算法说明文本。"""
return (
diff --git a/utils/system_jobs.py b/utils/system_jobs.py
index 7f04aa6..a6141cd 100644
--- a/utils/system_jobs.py
+++ b/utils/system_jobs.py
@@ -43,14 +43,6 @@ def get_system_job_definitions(robot) -> List[Dict[str, Any]]:
"trigger_config": {"seconds": 300},
"handler": _build_process_pending_images_handler(robot),
},
- {
- "job_key": "process_pending_mentions",
- "name": "待抽取@关系处理",
- "description": "每 10 分钟处理一次待抽取@消息并更新社交图",
- "trigger_type": "every_seconds",
- "trigger_config": {"seconds": 600},
- "handler": _build_process_pending_mentions_handler(robot),
- },
]
def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]]:
@@ -61,18 +53,6 @@ def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]
return _handler
-def _build_process_pending_mentions_handler(robot) -> Callable[[], Awaitable[None]]:
- async def _handler():
- if hasattr(robot, "message_storage") and robot.message_storage:
- await robot.message_storage.process_pending_mentions(
- batch_size=200,
- window_start_minutes=20,
- window_end_minutes=10,
- )
-
- return _handler
-
-
class SystemJobLoader:
"""系统任务加载器:从数据库读取调度配置并注册到 async_job。"""
diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py
index 0ec7723..9f8029d 100644
--- a/utils/wechat/message_to_db.py
+++ b/utils/wechat/message_to_db.py
@@ -358,49 +358,6 @@ class MessageStorage:
except Exception as e:
logger.exception(f"定时处理媒体任务出错: {e}")
- async def process_pending_mentions(
- self,
- batch_size: int = 200,
- window_start_minutes: int = 20,
- window_end_minutes: int = 10,
- ):
- """定时任务:批量处理待抽取 @ 的消息并写入社交图。
-
- 说明:
- 1. 该任务与主消息归档链路解耦,不阻塞实时收发;
- 2. 每次只处理有限批次,避免长事务和数据库抖动;
- 3. 重复执行安全:底层按 message_id + sender + mentioned_user_id 做幂等控制。
- 4. 默认只处理 10~20 分钟前的数据,减少对热数据区间的扫描压力。
- """
- try:
- started_at = datetime.now()
- logger.info(
- "触发定时@抽取任务: "
- f"batch_size={batch_size}, window=[NOW-{window_start_minutes}m, NOW-{window_end_minutes}m)"
- )
- stats = self.message_db.process_pending_mentions(
- batch_size=batch_size,
- window_start_minutes=window_start_minutes,
- window_end_minutes=window_end_minutes,
- )
- total = int(stats.get("total", 0))
- if total == 0:
- elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
- logger.info(f"定时@抽取任务结束: 无待处理数据, 耗时={elapsed_ms}ms")
- return
-
- elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
- logger.info(
- "定时@抽取任务结束: "
- f"读取={stats.get('total', 0)}, "
- f"处理={stats.get('processed', 0)}, "
- f"含@={stats.get('with_mentions', 0)}, "
- f"失败={stats.get('failed', 0)}, "
- f"耗时={elapsed_ms}ms"
- )
- except Exception as e:
- logger.exception(f"定时处理@抽取任务出错: {e}")
-
def _process_image_done(self, future):
"""任务完成统一回调(极轻量)"""
try: