优化@批处理窗口:每10分钟仅处理10-20分钟前消息

- 调整待抽取@查询逻辑:默认仅扫描 timestamp 在 [NOW-20m, NOW-10m) 的消息

- 保留 mentioned_user_ids 为空才处理的条件,处理过自动跳过

- 更新 MessageStorage 与系统任务调用参数,统一使用窗口化批处理配置

- 增加窗口参数兜底修正,避免错误配置导致全量扫描
This commit is contained in:
liuwei
2026-04-21 13:54:41 +08:00
parent 906f0905df
commit 1ddc1dcf18
3 changed files with 41 additions and 9 deletions

View File

@@ -133,15 +133,26 @@ class MessageStorageDB(BaseDBOperator):
return result
def get_pending_mention_extract_messages(self, limit: int = 200, max_age_days: int = 7) -> List[Dict]:
def get_pending_mention_extract_messages(
self,
limit: int = 200,
window_start_minutes: int = 20,
window_end_minutes: int = 10,
) -> List[Dict]:
"""获取待处理 @ 抽取的消息批次。
筛选规则:
1. 群消息;
2. mentioned_user_ids 为空(表示还未处理);
3. message_xml 非空;
4. 时间在 max_age_days 窗口内,避免扫描无限历史
4. 仅处理固定时间窗口默认10~20分钟前降低扫描压力与热数据竞争
"""
# 兜底修正窗口参数确保窗口有效start > end >= 0
start_m = max(int(window_start_minutes), 1)
end_m = max(int(window_end_minutes), 0)
if start_m <= end_m:
start_m = end_m + 10
sql = """
SELECT message_id, group_id, sender, message_xml, timestamp
FROM messages
@@ -150,13 +161,19 @@ class MessageStorageDB(BaseDBOperator):
AND (mentioned_user_ids IS NULL OR mentioned_user_ids = '')
AND message_xml IS NOT NULL
AND message_xml <> ''
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
AND timestamp >= DATE_SUB(NOW(), INTERVAL %s MINUTE)
AND timestamp < DATE_SUB(NOW(), INTERVAL %s MINUTE)
ORDER BY timestamp ASC
LIMIT %s
"""
return self.execute_query(sql, (max_age_days, limit)) or []
return self.execute_query(sql, (start_m, end_m, limit)) or []
def process_pending_mentions(self, batch_size: int = 200, max_age_days: int = 7) -> Dict[str, int]:
def process_pending_mentions(
self,
batch_size: int = 200,
window_start_minutes: int = 20,
window_end_minutes: int = 10,
) -> Dict[str, int]:
"""批量处理待抽取 @ 的消息,并同步社交图数据。
返回统计:
@@ -165,7 +182,11 @@ class MessageStorageDB(BaseDBOperator):
- with_mentions: 提取到 @ 的消息条数
- failed: 失败条数
"""
rows = self.get_pending_mention_extract_messages(limit=batch_size, max_age_days=max_age_days)
rows = self.get_pending_mention_extract_messages(
limit=batch_size,
window_start_minutes=window_start_minutes,
window_end_minutes=window_end_minutes,
)
if not rows:
return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}

View File

@@ -64,7 +64,11 @@ def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]
def _build_process_pending_mentions_handler(robot) -> Callable[[], Awaitable[None]]:
async def _handler():
if hasattr(robot, "message_storage") and robot.message_storage:
await robot.message_storage.process_pending_mentions(batch_size=200, max_age_days=7)
await robot.message_storage.process_pending_mentions(
batch_size=200,
window_start_minutes=20,
window_end_minutes=10,
)
return _handler

View File

@@ -358,18 +358,25 @@ class MessageStorage:
except Exception as e:
logger.exception(f"定时处理媒体任务出错: {e}")
async def process_pending_mentions(self, batch_size: int = 200, max_age_days: int = 7):
async def process_pending_mentions(
self,
batch_size: int = 200,
window_start_minutes: int = 20,
window_end_minutes: int = 10,
):
"""定时任务:批量处理待抽取 @ 的消息并写入社交图。
说明:
1. 该任务与主消息归档链路解耦,不阻塞实时收发;
2. 每次只处理有限批次,避免长事务和数据库抖动;
3. 重复执行安全:底层按 message_id + sender + mentioned_user_id 做幂等控制。
4. 默认只处理 10~20 分钟前的数据,减少对热数据区间的扫描压力。
"""
try:
stats = self.message_db.process_pending_mentions(
batch_size=batch_size,
max_age_days=max_age_days,
window_start_minutes=window_start_minutes,
window_end_minutes=window_end_minutes,
)
total = int(stats.get("total", 0))
if total == 0: