From 78adab65b27eacfebfffea009fab437f269d193a Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 21 Apr 2026 13:58:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=BC=BA@=E6=89=B9=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E6=97=A5=E5=BF=97=EF=BC=8C=E6=8F=90=E5=8D=87?= =?UTF-8?q?=E5=8F=AF=E8=A7=82=E6=B5=8B=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 增加定时@抽取任务开始/结束日志,输出窗口参数、处理条数与耗时 - 在DB批处理层记录查询命中、前3条样本明细、失败样本ID,便于快速排障 - 增加窗口参数异常修正日志,避免误配置导致行为不可见 - 在社交图增量写入处增加跳过/完成日志,明确是否产生新增@关系 --- db/message_storage.py | 47 ++++++++++++++++++++++++++++++++--- utils/wechat/message_to_db.py | 14 ++++++++--- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/db/message_storage.py b/db/message_storage.py index e597fec..6aa0931 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -152,6 +152,10 @@ class MessageStorageDB(BaseDBOperator): end_m = max(int(window_end_minutes), 0) if start_m <= end_m: start_m = end_m + 10 + self.LOG.warning( + f"@抽取窗口参数异常,已自动修正: window_start_minutes={window_start_minutes}, " + f"window_end_minutes={window_end_minutes}, 修正后=[NOW-{start_m}m, NOW-{end_m}m)" + ) sql = """ SELECT message_id, group_id, sender, message_xml, timestamp @@ -166,7 +170,11 @@ class MessageStorageDB(BaseDBOperator): ORDER BY timestamp ASC LIMIT %s """ - return self.execute_query(sql, (start_m, end_m, limit)) or [] + rows = self.execute_query(sql, (start_m, end_m, limit)) or [] + self.LOG.debug( + f"查询待抽取@消息: window=[NOW-{start_m}m, NOW-{end_m}m), limit={limit}, 命中={len(rows)}" + ) + return rows def process_pending_mentions( self, @@ -182,19 +190,30 @@ class MessageStorageDB(BaseDBOperator): - with_mentions: 提取到 @ 的消息条数 - failed: 失败条数 """ + started_at = datetime.now() + self.LOG.info( + "开始执行@批处理: " + f"batch_size={batch_size}, window_start_minutes={window_start_minutes}, " + f"window_end_minutes={window_end_minutes}" + ) + rows = self.get_pending_mention_extract_messages( limit=batch_size, window_start_minutes=window_start_minutes, window_end_minutes=window_end_minutes, ) if not rows: + elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) + self.LOG.info(f"@批处理结束: 命中0条, 耗时={elapsed_ms}ms") return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} processed = 0 with_mentions = 0 failed = 0 + # 记录少量失败样本,便于快速定位问题,不刷屏。 + fail_samples: List[str] = [] - for row in rows: + for idx, row in enumerate(rows, start=1): try: message_id = str(row.get("message_id") or "").strip() group_id = str(row.get("group_id") or "").strip() @@ -223,16 +242,31 @@ class MessageStorageDB(BaseDBOperator): processed += 1 if mentioned_user_ids: with_mentions += 1 + if idx <= 3: + # 前3条打 debug 明细,便于确认当前批处理真实在工作。 + self.LOG.debug( + f"@批处理样本[{idx}]: message_id={message_id}, group_id={group_id}, " + f"sender={sender_id}, mentioned_count={len(mentioned_user_ids)}" + ) except Exception as e: failed += 1 self.LOG.error(f"处理待抽取@消息失败: message_id={row.get('message_id')}, error={e}") + if len(fail_samples) < 5: + fail_samples.append(str(row.get("message_id") or "")) - return { + elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) + stats = { "total": len(rows), "processed": processed, "with_mentions": with_mentions, "failed": failed, } + self.LOG.info( + f"@批处理结束: total={stats['total']}, processed={stats['processed']}, " + f"with_mentions={stats['with_mentions']}, failed={stats['failed']}, 耗时={elapsed_ms}ms, " + f"fail_samples={fail_samples}" + ) + return stats @staticmethod def _safe_parse_message_time(value) -> datetime: @@ -318,6 +352,9 @@ class MessageStorageDB(BaseDBOperator): existed_ids = {str(r.get("mentioned_user_id") or "").strip() for r in existed_rows} newly_mentioned_ids = [uid for uid in clean_mentioned_ids if uid not in existed_ids] if not newly_mentioned_ids: + self.LOG.debug( + f"社交图写入跳过(无新增@关系): message_id={message_id}, group_id={group_id}, sender={sender_id}" + ) return # 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。 @@ -391,6 +428,10 @@ class MessageStorageDB(BaseDBOperator): # 4) 回填 unique_interactors:针对本条消息受影响的用户实时重算“去重互动人数”。 affected_user_ids = [sender_id, *newly_mentioned_ids] self._refresh_unique_interactors(stat_date, group_id, affected_user_ids) + self.LOG.debug( + f"社交图写入完成: message_id={message_id}, group_id={group_id}, sender={sender_id}, " + f"new_mentions={len(newly_mentioned_ids)}, affected_users={len(affected_user_ids)}" + ) except Exception as e: # 社交图统计属于增强链路,不能反向影响主消息入库稳定性。 self.LOG.error(f"写入社交图增量数据失败: {e}") diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index f207fdc..0ec7723 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -373,6 +373,11 @@ class MessageStorage: 4. 默认只处理 10~20 分钟前的数据,减少对热数据区间的扫描压力。 """ try: + started_at = datetime.now() + logger.info( + "触发定时@抽取任务: " + f"batch_size={batch_size}, window=[NOW-{window_start_minutes}m, NOW-{window_end_minutes}m)" + ) stats = self.message_db.process_pending_mentions( batch_size=batch_size, window_start_minutes=window_start_minutes, @@ -380,15 +385,18 @@ class MessageStorage: ) total = int(stats.get("total", 0)) if total == 0: - logger.debug("待处理@抽取队列为空,本轮跳过") + elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) + logger.info(f"定时@抽取任务结束: 无待处理数据, 耗时={elapsed_ms}ms") return + elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) logger.info( - "批量@抽取完成: " + "定时@抽取任务结束: " f"读取={stats.get('total', 0)}, " f"处理={stats.get('processed', 0)}, " f"含@={stats.get('with_mentions', 0)}, " - f"失败={stats.get('failed', 0)}" + f"失败={stats.get('failed', 0)}, " + f"耗时={elapsed_ms}ms" ) except Exception as e: logger.exception(f"定时处理@抽取任务出错: {e}")