From 1ddc1dcf1851333d0e763c6c923156daccf80af1 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 21 Apr 2026 13:54:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96@=E6=89=B9=E5=A4=84=E7=90=86?= =?UTF-8?q?=E7=AA=97=E5=8F=A3=EF=BC=9A=E6=AF=8F10=E5=88=86=E9=92=9F?= =?UTF-8?q?=E4=BB=85=E5=A4=84=E7=90=8610-20=E5=88=86=E9=92=9F=E5=89=8D?= =?UTF-8?q?=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 调整待抽取@查询逻辑:默认仅扫描 timestamp 在 [NOW-20m, NOW-10m) 的消息 - 保留 mentioned_user_ids 为空才处理的条件,处理过自动跳过 - 更新 MessageStorage 与系统任务调用参数,统一使用窗口化批处理配置 - 增加窗口参数兜底修正,避免错误配置导致全量扫描 --- db/message_storage.py | 33 +++++++++++++++++++++++++++------ utils/system_jobs.py | 6 +++++- utils/wechat/message_to_db.py | 11 +++++++++-- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/db/message_storage.py b/db/message_storage.py index 7d173f7..e597fec 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -133,15 +133,26 @@ class MessageStorageDB(BaseDBOperator): return result - def get_pending_mention_extract_messages(self, limit: int = 200, max_age_days: int = 7) -> List[Dict]: + def get_pending_mention_extract_messages( + self, + limit: int = 200, + window_start_minutes: int = 20, + window_end_minutes: int = 10, + ) -> List[Dict]: """获取待处理 @ 抽取的消息批次。 筛选规则: 1. 群消息; 2. mentioned_user_ids 为空(表示还未处理); 3. message_xml 非空; - 4. 时间在 max_age_days 窗口内,避免扫描无限历史。 + 4. 仅处理固定时间窗口(默认:10~20分钟前),降低扫描压力与热数据竞争。 """ + # 兜底修正窗口参数,确保窗口有效:start > end >= 0 + start_m = max(int(window_start_minutes), 1) + end_m = max(int(window_end_minutes), 0) + if start_m <= end_m: + start_m = end_m + 10 + sql = """ SELECT message_id, group_id, sender, message_xml, timestamp FROM messages @@ -150,13 +161,19 @@ class MessageStorageDB(BaseDBOperator): AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '') AND message_xml IS NOT NULL AND message_xml <> '' - AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) + AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE) + AND timestamp < DATE_SUB(NOW(), INTERVAL %s MINUTE) ORDER BY timestamp ASC LIMIT %s """ - return self.execute_query(sql, (max_age_days, limit)) or [] + return self.execute_query(sql, (start_m, end_m, limit)) or [] - def process_pending_mentions(self, batch_size: int = 200, max_age_days: int = 7) -> Dict[str, int]: + def process_pending_mentions( + self, + batch_size: int = 200, + window_start_minutes: int = 20, + window_end_minutes: int = 10, + ) -> Dict[str, int]: """批量处理待抽取 @ 的消息,并同步社交图数据。 返回统计: @@ -165,7 +182,11 @@ class MessageStorageDB(BaseDBOperator): - with_mentions: 提取到 @ 的消息条数 - failed: 失败条数 """ - rows = self.get_pending_mention_extract_messages(limit=batch_size, max_age_days=max_age_days) + rows = self.get_pending_mention_extract_messages( + limit=batch_size, + window_start_minutes=window_start_minutes, + window_end_minutes=window_end_minutes, + ) if not rows: return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} diff --git a/utils/system_jobs.py b/utils/system_jobs.py index fe34b6c..7f04aa6 100644 --- a/utils/system_jobs.py +++ b/utils/system_jobs.py @@ -64,7 +64,11 @@ def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None] def _build_process_pending_mentions_handler(robot) -> Callable[[], Awaitable[None]]: async def _handler(): if hasattr(robot, "message_storage") and robot.message_storage: - await robot.message_storage.process_pending_mentions(batch_size=200, max_age_days=7) + await robot.message_storage.process_pending_mentions( + batch_size=200, + window_start_minutes=20, + window_end_minutes=10, + ) return _handler diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index 92abfdd..f207fdc 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -358,18 +358,25 @@ class MessageStorage: except Exception as e: logger.exception(f"定时处理媒体任务出错: {e}") - async def process_pending_mentions(self, batch_size: int = 200, max_age_days: int = 7): + async def process_pending_mentions( + self, + batch_size: int = 200, + window_start_minutes: int = 20, + window_end_minutes: int = 10, + ): """定时任务:批量处理待抽取 @ 的消息并写入社交图。 说明: 1. 该任务与主消息归档链路解耦,不阻塞实时收发; 2. 每次只处理有限批次,避免长事务和数据库抖动; 3. 重复执行安全:底层按 message_id + sender + mentioned_user_id 做幂等控制。 + 4. 默认只处理 10~20 分钟前的数据,减少对热数据区间的扫描压力。 """ try: stats = self.message_db.process_pending_mentions( batch_size=batch_size, - max_age_days=max_age_days, + window_start_minutes=window_start_minutes, + window_end_minutes=window_end_minutes, ) total = int(stats.get("total", 0)) if total == 0: