将@关系批处理业务迁移到 value_rank 插件

- 从 MessageStorageDB 移除@抽取与社交图写入逻辑,消息层仅保留归档职责

- 从系统级任务移除 process_pending_mentions,取消 message_to_db 中对应入口

- 在 value_rank 插件新增定时动作 value_rank_mentions_extract(每10分钟)

- 在插件内实现窗口化批处理(默认10~20分钟前)、@提取、幂等写入明细/边表/日汇总及 unique_interactors 回填

- 新增插件侧可配置参数 mention_batch_size / mention_window_start_minutes / mention_window_end_minutes
This commit is contained in:
liuwei
2026-04-21 14:10:25 +08:00
parent d60d496bc3
commit d64d11a384
5 changed files with 359 additions and 460 deletions

View File

@@ -2,8 +2,6 @@
from datetime import datetime
import json
import re
import xml.etree.ElementTree as ET
from typing import Dict, List, Optional
from db.base import BaseDBOperator
@@ -91,400 +89,6 @@ class MessageStorageDB(BaseDBOperator):
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
return str(msg)
def _extract_mentioned_user_ids(self, raw_xml: str) -> List[str]:
"""从消息 XML 中提取被@用户ID列表并返回去重后的列表。
解析策略:
1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点;
2. 若 XML 解析失败,则退化为正则提取 `atuserlist` 文本;
3. 去重并过滤空值,保证输出稳定。
返回值示例:`["wxid_a", "wxid_b"]`
"""
raw_xml = str(raw_xml or "")
if not raw_xml:
return []
at_user_list_text = ""
try:
root = ET.fromstring(raw_xml)
node = root.find(".//atuserlist")
if node is not None and node.text:
at_user_list_text = str(node.text).strip()
except Exception:
# 兼容异常格式 XML采用正则兜底确保尽量不丢数据。
match = re.search(r"<atuserlist><!\[CDATA\[(.*?)\]\]></atuserlist>", raw_xml, flags=re.IGNORECASE | re.DOTALL)
if match:
at_user_list_text = str(match.group(1) or "").strip()
if not at_user_list_text:
return []
# 微信 atuserlist 常见分隔符为 ',',但实际环境可能混入 ';' 或空白,这里统一兼容。
raw_ids = re.split(r"[,\s;]+", at_user_list_text)
seen = set()
result = []
for uid in raw_ids:
normalized_uid = str(uid or "").strip()
if not normalized_uid or normalized_uid in seen:
continue
seen.add(normalized_uid)
result.append(normalized_uid)
return result
def get_pending_mention_extract_messages(
self,
limit: int = 200,
window_start_minutes: int = 20,
window_end_minutes: int = 10,
) -> List[Dict]:
"""获取待处理 @ 抽取的消息批次。
筛选规则:
1. 群消息;
2. mentioned_user_ids 为空(表示还未处理);
3. message_xml 非空;
4. 仅处理固定时间窗口默认10~20分钟前降低扫描压力与热数据竞争。
"""
# 兜底修正窗口参数确保窗口有效start > end >= 0
start_m = max(int(window_start_minutes), 1)
end_m = max(int(window_end_minutes), 0)
if start_m <= end_m:
start_m = end_m + 10
self.LOG.warning(
f"@抽取窗口参数异常,已自动修正: window_start_minutes={window_start_minutes}, "
f"window_end_minutes={window_end_minutes}, 修正后=[NOW-{start_m}m, NOW-{end_m}m)"
)
sql = """
SELECT message_id, group_id, sender, message_xml, timestamp
FROM messages
WHERE group_id IS NOT NULL
AND group_id <> ''
AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '')
AND message_xml IS NOT NULL
AND message_xml <> ''
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE)
AND timestamp < DATE_SUB(NOW(), INTERVAL %s MINUTE)
ORDER BY timestamp ASC
LIMIT %s
"""
rows = self.execute_query(sql, (start_m, end_m, limit)) or []
self.LOG.debug(
f"查询待抽取@消息: window=[NOW-{start_m}m, NOW-{end_m}m), limit={limit}, 命中={len(rows)}"
)
return rows
def process_pending_mentions(
self,
batch_size: int = 200,
window_start_minutes: int = 20,
window_end_minutes: int = 10,
) -> Dict[str, int]:
"""批量处理待抽取 @ 的消息,并同步社交图数据。
返回统计:
- total: 本批读取条数
- processed: 成功处理条数
- with_mentions: 提取到 @ 的消息条数
- failed: 失败条数
"""
started_at = datetime.now()
self.LOG.info(
"开始执行@批处理: "
f"batch_size={batch_size}, window_start_minutes={window_start_minutes}, "
f"window_end_minutes={window_end_minutes}"
)
rows = self.get_pending_mention_extract_messages(
limit=batch_size,
window_start_minutes=window_start_minutes,
window_end_minutes=window_end_minutes,
)
if not rows:
elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
self.LOG.info(f"@批处理结束: 命中0条, 耗时={elapsed_ms}ms")
return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
processed = 0
with_mentions = 0
failed = 0
# 记录少量失败样本,便于快速定位问题,不刷屏。
fail_samples: List[str] = []
for idx, row in enumerate(rows, start=1):
try:
message_id = str(row.get("message_id") or "").strip()
group_id = str(row.get("group_id") or "").strip()
sender_id = str(row.get("sender") or "").strip()
raw_xml = str(row.get("message_xml") or "")
ts_raw = row.get("timestamp")
msg_time = self._safe_parse_message_time(ts_raw)
mentioned_user_ids = self._extract_mentioned_user_ids(raw_xml)
mentioned_user_ids_json = json.dumps(mentioned_user_ids, ensure_ascii=False)
self._update_message_mentioned_user_ids(
message_id=message_id,
group_id=group_id,
sender_id=sender_id,
mentioned_user_ids_json=mentioned_user_ids_json,
)
self._persist_mention_graph_data(
group_id=group_id,
sender_id=sender_id,
message_id=message_id,
mentioned_user_ids=mentioned_user_ids,
msg_time=msg_time,
)
processed += 1
if mentioned_user_ids:
with_mentions += 1
if idx <= 3:
# 前3条打 debug 明细,便于确认当前批处理真实在工作。
self.LOG.debug(
f"@批处理样本[{idx}]: message_id={message_id}, group_id={group_id}, "
f"sender={sender_id}, mentioned_count={len(mentioned_user_ids)}"
)
except Exception as e:
failed += 1
self.LOG.error(f"处理待抽取@消息失败: message_id={row.get('message_id')}, error={e}")
if len(fail_samples) < 5:
fail_samples.append(str(row.get("message_id") or ""))
elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
stats = {
"total": len(rows),
"processed": processed,
"with_mentions": with_mentions,
"failed": failed,
}
self.LOG.info(
f"@批处理结束: total={stats['total']}, processed={stats['processed']}, "
f"with_mentions={stats['with_mentions']}, failed={stats['failed']}, 耗时={elapsed_ms}ms, "
f"fail_samples={fail_samples}"
)
return stats
@staticmethod
def _safe_parse_message_time(value) -> datetime:
"""安全解析消息时间,失败时回退到当前时间。"""
if isinstance(value, datetime):
return value
text = str(value or "").strip()
if not text:
return datetime.now()
try:
return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
except Exception:
return datetime.now()
def _update_message_mentioned_user_ids(
self,
message_id: str,
group_id: str,
sender_id: str,
mentioned_user_ids_json: str,
) -> None:
"""回填消息表的 mentioned_user_ids 字段。"""
self.execute_update(
"""
UPDATE messages
SET mentioned_user_ids = %s
WHERE message_id = %s
AND group_id = %s
AND sender = %s
""",
(mentioned_user_ids_json, message_id, group_id, sender_id),
)
def _persist_mention_graph_data(
self,
group_id: str,
sender_id: str,
message_id: str,
mentioned_user_ids: List[str],
msg_time: datetime,
) -> None:
"""落盘社交图增量数据(明细 + 边 + 个人日汇总)。
设计原则:
1. 只在群消息中处理group_id 为空直接忽略);
2. 过滤无效 @ 目标(空值、@所有人、自己@自己);
3. 统计写入失败不抛异常,不影响主消息归档。
"""
# 非群消息或缺少关键字段时直接跳过,避免写入脏数据。
if not group_id or not sender_id or not message_id or not mentioned_user_ids:
return
# 统一清洗被@列表,避免重复统计。
invalid_mentions = {"notify@all", "all", "@all"}
clean_mentioned_ids: List[str] = []
seen = set()
for uid in mentioned_user_ids:
normalized_uid = str(uid or "").strip()
if (not normalized_uid or normalized_uid in invalid_mentions or
normalized_uid == sender_id or normalized_uid in seen):
continue
seen.add(normalized_uid)
clean_mentioned_ids.append(normalized_uid)
if not clean_mentioned_ids:
return
try:
stat_date = msg_time.strftime("%Y-%m-%d")
msg_time_str = msg_time.strftime("%Y-%m-%d %H:%M:%S")
# 幂等控制:只处理“该消息中尚未写入明细表”的新增 @ 目标。
existed_rows = self.execute_query(
"""
SELECT mentioned_user_id
FROM t_message_mentions
WHERE message_id = %s
AND group_id = %s
AND sender_id = %s
""",
(message_id, group_id, sender_id),
) or []
existed_ids = {str(r.get("mentioned_user_id") or "").strip() for r in existed_rows}
newly_mentioned_ids = [uid for uid in clean_mentioned_ids if uid not in existed_ids]
if not newly_mentioned_ids:
self.LOG.debug(
f"社交图写入跳过(无新增@关系): message_id={message_id}, group_id={group_id}, sender={sender_id}"
)
return
# 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。
mention_rows = [
(message_id, group_id, sender_id, mentioned_uid, stat_date, msg_time_str)
for mentioned_uid in newly_mentioned_ids
]
self.execute_batch(
"""
INSERT IGNORE INTO t_message_mentions
(message_id, group_id, sender_id, mentioned_user_id, stat_date, msg_time)
VALUES (%s, %s, %s, %s, %s, %s)
""",
mention_rows,
)
# 2) 写社交日边表:一条 @ 关系视为 sender -> mentioned_uid 的一条有向边增量。
edge_rows = [
(stat_date, group_id, sender_id, mentioned_uid, 1, 1.0)
for mentioned_uid in newly_mentioned_ids
]
self.execute_batch(
"""
INSERT INTO t_social_edges_daily
(stat_date, group_id, from_user_id, to_user_id, mention_count, interaction_score)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
mention_count = mention_count + VALUES(mention_count),
interaction_score = interaction_score + VALUES(interaction_score),
update_time = CURRENT_TIMESTAMP
""",
edge_rows,
)
# 3) 写个人日汇总:更新被@次数/主动@次数,供 value_rank 直接读取。
sender_social_row = (
stat_date, group_id, sender_id,
0, len(newly_mentioned_ids), 0,
float(len(newly_mentioned_ids)),
)
self.execute_update(
"""
INSERT INTO t_value_rank_social_daily
(stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
mention_others_count = mention_others_count + VALUES(mention_others_count),
interaction_score = interaction_score + VALUES(interaction_score),
update_time = CURRENT_TIMESTAMP
""",
sender_social_row,
)
receiver_social_rows = [
(stat_date, group_id, mentioned_uid, 1, 0, 0, 1.0)
for mentioned_uid in newly_mentioned_ids
]
self.execute_batch(
"""
INSERT INTO t_value_rank_social_daily
(stat_date, group_id, user_id, mentioned_count, mention_others_count, unique_interactors, interaction_score)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
mentioned_count = mentioned_count + VALUES(mentioned_count),
interaction_score = interaction_score + VALUES(interaction_score),
update_time = CURRENT_TIMESTAMP
""",
receiver_social_rows,
)
# 4) 回填 unique_interactors针对本条消息受影响的用户实时重算“去重互动人数”。
affected_user_ids = [sender_id, *newly_mentioned_ids]
self._refresh_unique_interactors(stat_date, group_id, affected_user_ids)
self.LOG.debug(
f"社交图写入完成: message_id={message_id}, group_id={group_id}, sender={sender_id}, "
f"new_mentions={len(newly_mentioned_ids)}, affected_users={len(affected_user_ids)}"
)
except Exception as e:
# 社交图统计属于增强链路,不能反向影响主消息入库稳定性。
self.LOG.error(f"写入社交图增量数据失败: {e}")
def _refresh_unique_interactors(self, stat_date: str, group_id: str, user_ids: List[str]) -> None:
"""重算并回填用户在指定日期内的去重互动人数。
定义:
- 某用户当天主动@过的人 + 被谁@过(去重并集)
"""
if not user_ids:
return
deduped_user_ids = []
seen = set()
for uid in user_ids:
normalized_uid = str(uid or "").strip()
if not normalized_uid or normalized_uid in seen:
continue
seen.add(normalized_uid)
deduped_user_ids.append(normalized_uid)
for uid in deduped_user_ids:
try:
row = self.execute_query(
"""
SELECT COUNT(DISTINCT partner_id) AS partner_count
FROM (
SELECT mentioned_user_id AS partner_id
FROM t_message_mentions
WHERE stat_date = %s AND group_id = %s AND sender_id = %s
UNION
SELECT sender_id AS partner_id
FROM t_message_mentions
WHERE stat_date = %s AND group_id = %s AND mentioned_user_id = %s
) t
""",
(stat_date, group_id, uid, stat_date, group_id, uid),
fetch_one=True,
) or {}
partner_count = int(row.get("partner_count") or 0)
self.execute_update(
"""
UPDATE t_value_rank_social_daily
SET unique_interactors = %s, update_time = CURRENT_TIMESTAMP
WHERE stat_date = %s AND group_id = %s AND user_id = %s
""",
(partner_count, stat_date, group_id, uid),
)
except Exception as e:
self.LOG.error(f"回填 unique_interactors 失败: group={group_id}, user={uid}, err={e}")
def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]:
"""获取最近的消息"""
sql = """