将@抽取与社交图写入改为定时批处理
- 精简 archive_message 主链路:仅做消息归档,不再同步执行@解析与社交统计 - 新增 MessageStorageDB.process_pending_mentions 批处理能力,按批次回填 mentioned_user_ids 并写入社交图 - 新增系统任务 process_pending_mentions,每10分钟执行一次(every_seconds=600) - 增加幂等保护:基于 t_message_mentions 已有记录过滤新增@关系,避免重复累加社交边和热度 - 保留详细中文注释,说明性能优化目标与批处理策略
This commit is contained in:
@@ -41,9 +41,6 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
|
|
||||||
# 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。
|
# 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。
|
||||||
raw_payload = self._serialize_raw_payload(msg)
|
raw_payload = self._serialize_raw_payload(msg)
|
||||||
# 在入库阶段结构化提取被@清单,避免后续统计每次都回扫原始包。
|
|
||||||
mentioned_user_ids = self._extract_mentioned_user_ids(msg)
|
|
||||||
mentioned_user_ids_json = json.dumps(mentioned_user_ids, ensure_ascii=False)
|
|
||||||
|
|
||||||
sql_with_raw_payload = """
|
sql_with_raw_payload = """
|
||||||
INSERT INTO messages (
|
INSERT INTO messages (
|
||||||
@@ -52,17 +49,10 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
)
|
)
|
||||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
"""
|
"""
|
||||||
params_with_raw_payload = (*base_params[:8], raw_payload, mentioned_user_ids_json, base_params[8])
|
# 为了降低主链路延迟,这里不做@解析,mentioned_user_ids 先置空,后续由定时任务批处理回填。
|
||||||
|
params_with_raw_payload = (*base_params[:8], raw_payload, None, base_params[8])
|
||||||
archived = self.execute_update(sql_with_raw_payload, params_with_raw_payload)
|
archived = self.execute_update(sql_with_raw_payload, params_with_raw_payload)
|
||||||
if archived:
|
if archived:
|
||||||
# 归档成功后增量写社交关系数据。该步骤为统计增强逻辑,失败不应影响主流程。
|
|
||||||
self._persist_mention_graph_data(
|
|
||||||
group_id=str(getattr(msg, "roomid", "") or ""),
|
|
||||||
sender_id=str(getattr(msg, "sender", "") or ""),
|
|
||||||
message_id=str(getattr(msg, "msg_id", "") or ""),
|
|
||||||
mentioned_user_ids=mentioned_user_ids,
|
|
||||||
msg_time=datetime.now(),
|
|
||||||
)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# 兼容旧表结构:如果线上还没执行 ALTER TABLE,加列前仍可继续正常归档。
|
# 兼容旧表结构:如果线上还没执行 ALTER TABLE,加列前仍可继续正常归档。
|
||||||
@@ -73,17 +63,7 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
)
|
)
|
||||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
"""
|
"""
|
||||||
archived = self.execute_update(sql_legacy, base_params)
|
return self.execute_update(sql_legacy, base_params)
|
||||||
if archived:
|
|
||||||
# 老表结构下(未加 mentioned_user_ids 字段)也继续补写社交图数据,避免统计断层。
|
|
||||||
self._persist_mention_graph_data(
|
|
||||||
group_id=str(getattr(msg, "roomid", "") or ""),
|
|
||||||
sender_id=str(getattr(msg, "sender", "") or ""),
|
|
||||||
message_id=str(getattr(msg, "msg_id", "") or ""),
|
|
||||||
mentioned_user_ids=mentioned_user_ids,
|
|
||||||
msg_time=datetime.now(),
|
|
||||||
)
|
|
||||||
return archived
|
|
||||||
|
|
||||||
def _serialize_raw_payload(self, msg: WxMessage) -> str:
|
def _serialize_raw_payload(self, msg: WxMessage) -> str:
|
||||||
"""将消息对象尽量完整地序列化为 JSON 字符串。
|
"""将消息对象尽量完整地序列化为 JSON 字符串。
|
||||||
@@ -111,8 +91,8 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
|
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
|
||||||
return str(msg)
|
return str(msg)
|
||||||
|
|
||||||
def _extract_mentioned_user_ids(self, msg: WxMessage) -> List[str]:
|
def _extract_mentioned_user_ids(self, raw_xml: str) -> List[str]:
|
||||||
"""从消息中提取被@用户ID列表,并返回去重后的列表。
|
"""从消息 XML 中提取被@用户ID列表,并返回去重后的列表。
|
||||||
|
|
||||||
解析策略:
|
解析策略:
|
||||||
1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点;
|
1. 优先从 `msg.msg_source` 的 XML 里读取 `atuserlist` 节点;
|
||||||
@@ -121,7 +101,7 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
|
|
||||||
返回值示例:`["wxid_a", "wxid_b"]`
|
返回值示例:`["wxid_a", "wxid_b"]`
|
||||||
"""
|
"""
|
||||||
raw_xml = str(getattr(msg, "msg_source", "") or "")
|
raw_xml = str(raw_xml or "")
|
||||||
if not raw_xml:
|
if not raw_xml:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
@@ -153,6 +133,118 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def get_pending_mention_extract_messages(self, limit: int = 200, max_age_days: int = 7) -> List[Dict]:
|
||||||
|
"""获取待处理 @ 抽取的消息批次。
|
||||||
|
|
||||||
|
筛选规则:
|
||||||
|
1. 群消息;
|
||||||
|
2. mentioned_user_ids 为空(表示还未处理);
|
||||||
|
3. message_xml 非空;
|
||||||
|
4. 时间在 max_age_days 窗口内,避免扫描无限历史。
|
||||||
|
"""
|
||||||
|
sql = """
|
||||||
|
SELECT message_id, group_id, sender, message_xml, timestamp
|
||||||
|
FROM messages
|
||||||
|
WHERE group_id IS NOT NULL
|
||||||
|
AND group_id <> ''
|
||||||
|
AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '')
|
||||||
|
AND message_xml IS NOT NULL
|
||||||
|
AND message_xml <> ''
|
||||||
|
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
|
||||||
|
ORDER BY timestamp ASC
|
||||||
|
LIMIT %s
|
||||||
|
"""
|
||||||
|
return self.execute_query(sql, (max_age_days, limit)) or []
|
||||||
|
|
||||||
|
def process_pending_mentions(self, batch_size: int = 200, max_age_days: int = 7) -> Dict[str, int]:
|
||||||
|
"""批量处理待抽取 @ 的消息,并同步社交图数据。
|
||||||
|
|
||||||
|
返回统计:
|
||||||
|
- total: 本批读取条数
|
||||||
|
- processed: 成功处理条数
|
||||||
|
- with_mentions: 提取到 @ 的消息条数
|
||||||
|
- failed: 失败条数
|
||||||
|
"""
|
||||||
|
rows = self.get_pending_mention_extract_messages(limit=batch_size, max_age_days=max_age_days)
|
||||||
|
if not rows:
|
||||||
|
return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
|
||||||
|
|
||||||
|
processed = 0
|
||||||
|
with_mentions = 0
|
||||||
|
failed = 0
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
try:
|
||||||
|
message_id = str(row.get("message_id") or "").strip()
|
||||||
|
group_id = str(row.get("group_id") or "").strip()
|
||||||
|
sender_id = str(row.get("sender") or "").strip()
|
||||||
|
raw_xml = str(row.get("message_xml") or "")
|
||||||
|
ts_raw = row.get("timestamp")
|
||||||
|
msg_time = self._safe_parse_message_time(ts_raw)
|
||||||
|
|
||||||
|
mentioned_user_ids = self._extract_mentioned_user_ids(raw_xml)
|
||||||
|
mentioned_user_ids_json = json.dumps(mentioned_user_ids, ensure_ascii=False)
|
||||||
|
|
||||||
|
self._update_message_mentioned_user_ids(
|
||||||
|
message_id=message_id,
|
||||||
|
group_id=group_id,
|
||||||
|
sender_id=sender_id,
|
||||||
|
mentioned_user_ids_json=mentioned_user_ids_json,
|
||||||
|
)
|
||||||
|
self._persist_mention_graph_data(
|
||||||
|
group_id=group_id,
|
||||||
|
sender_id=sender_id,
|
||||||
|
message_id=message_id,
|
||||||
|
mentioned_user_ids=mentioned_user_ids,
|
||||||
|
msg_time=msg_time,
|
||||||
|
)
|
||||||
|
|
||||||
|
processed += 1
|
||||||
|
if mentioned_user_ids:
|
||||||
|
with_mentions += 1
|
||||||
|
except Exception as e:
|
||||||
|
failed += 1
|
||||||
|
self.LOG.error(f"处理待抽取@消息失败: message_id={row.get('message_id')}, error={e}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"total": len(rows),
|
||||||
|
"processed": processed,
|
||||||
|
"with_mentions": with_mentions,
|
||||||
|
"failed": failed,
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _safe_parse_message_time(value) -> datetime:
|
||||||
|
"""安全解析消息时间,失败时回退到当前时间。"""
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value
|
||||||
|
text = str(value or "").strip()
|
||||||
|
if not text:
|
||||||
|
return datetime.now()
|
||||||
|
try:
|
||||||
|
return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
|
||||||
|
except Exception:
|
||||||
|
return datetime.now()
|
||||||
|
|
||||||
|
def _update_message_mentioned_user_ids(
|
||||||
|
self,
|
||||||
|
message_id: str,
|
||||||
|
group_id: str,
|
||||||
|
sender_id: str,
|
||||||
|
mentioned_user_ids_json: str,
|
||||||
|
) -> None:
|
||||||
|
"""回填消息表的 mentioned_user_ids 字段。"""
|
||||||
|
self.execute_update(
|
||||||
|
"""
|
||||||
|
UPDATE messages
|
||||||
|
SET mentioned_user_ids = %s
|
||||||
|
WHERE message_id = %s
|
||||||
|
AND group_id = %s
|
||||||
|
AND sender = %s
|
||||||
|
""",
|
||||||
|
(mentioned_user_ids_json, message_id, group_id, sender_id),
|
||||||
|
)
|
||||||
|
|
||||||
def _persist_mention_graph_data(
|
def _persist_mention_graph_data(
|
||||||
self,
|
self,
|
||||||
group_id: str,
|
group_id: str,
|
||||||
@@ -191,10 +283,26 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
stat_date = msg_time.strftime("%Y-%m-%d")
|
stat_date = msg_time.strftime("%Y-%m-%d")
|
||||||
msg_time_str = msg_time.strftime("%Y-%m-%d %H:%M:%S")
|
msg_time_str = msg_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
# 幂等控制:只处理“该消息中尚未写入明细表”的新增 @ 目标。
|
||||||
|
existed_rows = self.execute_query(
|
||||||
|
"""
|
||||||
|
SELECT mentioned_user_id
|
||||||
|
FROM t_message_mentions
|
||||||
|
WHERE message_id = %s
|
||||||
|
AND group_id = %s
|
||||||
|
AND sender_id = %s
|
||||||
|
""",
|
||||||
|
(message_id, group_id, sender_id),
|
||||||
|
) or []
|
||||||
|
existed_ids = {str(r.get("mentioned_user_id") or "").strip() for r in existed_rows}
|
||||||
|
newly_mentioned_ids = [uid for uid in clean_mentioned_ids if uid not in existed_ids]
|
||||||
|
if not newly_mentioned_ids:
|
||||||
|
return
|
||||||
|
|
||||||
# 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。
|
# 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。
|
||||||
mention_rows = [
|
mention_rows = [
|
||||||
(message_id, group_id, sender_id, mentioned_uid, stat_date, msg_time_str)
|
(message_id, group_id, sender_id, mentioned_uid, stat_date, msg_time_str)
|
||||||
for mentioned_uid in clean_mentioned_ids
|
for mentioned_uid in newly_mentioned_ids
|
||||||
]
|
]
|
||||||
self.execute_batch(
|
self.execute_batch(
|
||||||
"""
|
"""
|
||||||
@@ -208,7 +316,7 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
# 2) 写社交日边表:一条 @ 关系视为 sender -> mentioned_uid 的一条有向边增量。
|
# 2) 写社交日边表:一条 @ 关系视为 sender -> mentioned_uid 的一条有向边增量。
|
||||||
edge_rows = [
|
edge_rows = [
|
||||||
(stat_date, group_id, sender_id, mentioned_uid, 1, 1.0)
|
(stat_date, group_id, sender_id, mentioned_uid, 1, 1.0)
|
||||||
for mentioned_uid in clean_mentioned_ids
|
for mentioned_uid in newly_mentioned_ids
|
||||||
]
|
]
|
||||||
self.execute_batch(
|
self.execute_batch(
|
||||||
"""
|
"""
|
||||||
@@ -226,8 +334,8 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
# 3) 写个人日汇总:更新被@次数/主动@次数,供 value_rank 直接读取。
|
# 3) 写个人日汇总:更新被@次数/主动@次数,供 value_rank 直接读取。
|
||||||
sender_social_row = (
|
sender_social_row = (
|
||||||
stat_date, group_id, sender_id,
|
stat_date, group_id, sender_id,
|
||||||
0, len(clean_mentioned_ids), 0,
|
0, len(newly_mentioned_ids), 0,
|
||||||
float(len(clean_mentioned_ids)),
|
float(len(newly_mentioned_ids)),
|
||||||
)
|
)
|
||||||
self.execute_update(
|
self.execute_update(
|
||||||
"""
|
"""
|
||||||
@@ -244,7 +352,7 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
|
|
||||||
receiver_social_rows = [
|
receiver_social_rows = [
|
||||||
(stat_date, group_id, mentioned_uid, 1, 0, 0, 1.0)
|
(stat_date, group_id, mentioned_uid, 1, 0, 0, 1.0)
|
||||||
for mentioned_uid in clean_mentioned_ids
|
for mentioned_uid in newly_mentioned_ids
|
||||||
]
|
]
|
||||||
self.execute_batch(
|
self.execute_batch(
|
||||||
"""
|
"""
|
||||||
@@ -260,7 +368,7 @@ class MessageStorageDB(BaseDBOperator):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# 4) 回填 unique_interactors:针对本条消息受影响的用户实时重算“去重互动人数”。
|
# 4) 回填 unique_interactors:针对本条消息受影响的用户实时重算“去重互动人数”。
|
||||||
affected_user_ids = [sender_id, *clean_mentioned_ids]
|
affected_user_ids = [sender_id, *newly_mentioned_ids]
|
||||||
self._refresh_unique_interactors(stat_date, group_id, affected_user_ids)
|
self._refresh_unique_interactors(stat_date, group_id, affected_user_ids)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# 社交图统计属于增强链路,不能反向影响主消息入库稳定性。
|
# 社交图统计属于增强链路,不能反向影响主消息入库稳定性。
|
||||||
|
|||||||
@@ -43,6 +43,14 @@ def get_system_job_definitions(robot) -> List[Dict[str, Any]]:
|
|||||||
"trigger_config": {"seconds": 300},
|
"trigger_config": {"seconds": 300},
|
||||||
"handler": _build_process_pending_images_handler(robot),
|
"handler": _build_process_pending_images_handler(robot),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"job_key": "process_pending_mentions",
|
||||||
|
"name": "待抽取@关系处理",
|
||||||
|
"description": "每 10 分钟处理一次待抽取@消息并更新社交图",
|
||||||
|
"trigger_type": "every_seconds",
|
||||||
|
"trigger_config": {"seconds": 600},
|
||||||
|
"handler": _build_process_pending_mentions_handler(robot),
|
||||||
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]]:
|
def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]]:
|
||||||
@@ -53,6 +61,14 @@ def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]
|
|||||||
return _handler
|
return _handler
|
||||||
|
|
||||||
|
|
||||||
|
def _build_process_pending_mentions_handler(robot) -> Callable[[], Awaitable[None]]:
|
||||||
|
async def _handler():
|
||||||
|
if hasattr(robot, "message_storage") and robot.message_storage:
|
||||||
|
await robot.message_storage.process_pending_mentions(batch_size=200, max_age_days=7)
|
||||||
|
|
||||||
|
return _handler
|
||||||
|
|
||||||
|
|
||||||
class SystemJobLoader:
|
class SystemJobLoader:
|
||||||
"""系统任务加载器:从数据库读取调度配置并注册到 async_job。"""
|
"""系统任务加载器:从数据库读取调度配置并注册到 async_job。"""
|
||||||
|
|
||||||
|
|||||||
@@ -358,6 +358,34 @@ class MessageStorage:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"定时处理媒体任务出错: {e}")
|
logger.exception(f"定时处理媒体任务出错: {e}")
|
||||||
|
|
||||||
|
async def process_pending_mentions(self, batch_size: int = 200, max_age_days: int = 7):
|
||||||
|
"""定时任务:批量处理待抽取 @ 的消息并写入社交图。
|
||||||
|
|
||||||
|
说明:
|
||||||
|
1. 该任务与主消息归档链路解耦,不阻塞实时收发;
|
||||||
|
2. 每次只处理有限批次,避免长事务和数据库抖动;
|
||||||
|
3. 重复执行安全:底层按 message_id + sender + mentioned_user_id 做幂等控制。
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
stats = self.message_db.process_pending_mentions(
|
||||||
|
batch_size=batch_size,
|
||||||
|
max_age_days=max_age_days,
|
||||||
|
)
|
||||||
|
total = int(stats.get("total", 0))
|
||||||
|
if total == 0:
|
||||||
|
logger.debug("待处理@抽取队列为空,本轮跳过")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"批量@抽取完成: "
|
||||||
|
f"读取={stats.get('total', 0)}, "
|
||||||
|
f"处理={stats.get('processed', 0)}, "
|
||||||
|
f"含@={stats.get('with_mentions', 0)}, "
|
||||||
|
f"失败={stats.get('failed', 0)}"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"定时处理@抽取任务出错: {e}")
|
||||||
|
|
||||||
def _process_image_done(self, future):
|
def _process_image_done(self, future):
|
||||||
"""任务完成统一回调(极轻量)"""
|
"""任务完成统一回调(极轻量)"""
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user