diff --git a/db/message_storage.py b/db/message_storage.py index e597fec..6aa0931 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -152,6 +152,10 @@ class MessageStorageDB(BaseDBOperator): end_m = max(int(window_end_minutes), 0) if start_m <= end_m: start_m = end_m + 10 + self.LOG.warning( + f"@抽取窗口参数异常,已自动修正: window_start_minutes={window_start_minutes}, " + f"window_end_minutes={window_end_minutes}, 修正后=[NOW-{start_m}m, NOW-{end_m}m)" + ) sql = """ SELECT message_id, group_id, sender, message_xml, timestamp @@ -166,7 +170,11 @@ class MessageStorageDB(BaseDBOperator): ORDER BY timestamp ASC LIMIT %s """ - return self.execute_query(sql, (start_m, end_m, limit)) or [] + rows = self.execute_query(sql, (start_m, end_m, limit)) or [] + self.LOG.debug( + f"查询待抽取@消息: window=[NOW-{start_m}m, NOW-{end_m}m), limit={limit}, 命中={len(rows)}" + ) + return rows def process_pending_mentions( self, @@ -182,19 +190,30 @@ class MessageStorageDB(BaseDBOperator): - with_mentions: 提取到 @ 的消息条数 - failed: 失败条数 """ + started_at = datetime.now() + self.LOG.info( + "开始执行@批处理: " + f"batch_size={batch_size}, window_start_minutes={window_start_minutes}, " + f"window_end_minutes={window_end_minutes}" + ) + rows = self.get_pending_mention_extract_messages( limit=batch_size, window_start_minutes=window_start_minutes, window_end_minutes=window_end_minutes, ) if not rows: + elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) + self.LOG.info(f"@批处理结束: 命中0条, 耗时={elapsed_ms}ms") return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0} processed = 0 with_mentions = 0 failed = 0 + # 记录少量失败样本,便于快速定位问题,不刷屏。 + fail_samples: List[str] = [] - for row in rows: + for idx, row in enumerate(rows, start=1): try: message_id = str(row.get("message_id") or "").strip() group_id = str(row.get("group_id") or "").strip() @@ -223,16 +242,31 @@ class MessageStorageDB(BaseDBOperator): processed += 1 if mentioned_user_ids: with_mentions += 1 + if idx <= 3: + # 前3条打 debug 明细,便于确认当前批处理真实在工作。 + self.LOG.debug( + f"@批处理样本[{idx}]: message_id={message_id}, group_id={group_id}, " + f"sender={sender_id}, mentioned_count={len(mentioned_user_ids)}" + ) except Exception as e: failed += 1 self.LOG.error(f"处理待抽取@消息失败: message_id={row.get('message_id')}, error={e}") + if len(fail_samples) < 5: + fail_samples.append(str(row.get("message_id") or "")) - return { + elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) + stats = { "total": len(rows), "processed": processed, "with_mentions": with_mentions, "failed": failed, } + self.LOG.info( + f"@批处理结束: total={stats['total']}, processed={stats['processed']}, " + f"with_mentions={stats['with_mentions']}, failed={stats['failed']}, 耗时={elapsed_ms}ms, " + f"fail_samples={fail_samples}" + ) + return stats @staticmethod def _safe_parse_message_time(value) -> datetime: @@ -318,6 +352,9 @@ class MessageStorageDB(BaseDBOperator): existed_ids = {str(r.get("mentioned_user_id") or "").strip() for r in existed_rows} newly_mentioned_ids = [uid for uid in clean_mentioned_ids if uid not in existed_ids] if not newly_mentioned_ids: + self.LOG.debug( + f"社交图写入跳过(无新增@关系): message_id={message_id}, group_id={group_id}, sender={sender_id}" + ) return # 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。 @@ -391,6 +428,10 @@ class MessageStorageDB(BaseDBOperator): # 4) 回填 unique_interactors:针对本条消息受影响的用户实时重算“去重互动人数”。 affected_user_ids = [sender_id, *newly_mentioned_ids] self._refresh_unique_interactors(stat_date, group_id, affected_user_ids) + self.LOG.debug( + f"社交图写入完成: message_id={message_id}, group_id={group_id}, sender={sender_id}, " + f"new_mentions={len(newly_mentioned_ids)}, affected_users={len(affected_user_ids)}" + ) except Exception as e: # 社交图统计属于增强链路,不能反向影响主消息入库稳定性。 self.LOG.error(f"写入社交图增量数据失败: {e}") diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index f207fdc..0ec7723 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -373,6 +373,11 @@ class MessageStorage: 4. 默认只处理 10~20 分钟前的数据,减少对热数据区间的扫描压力。 """ try: + started_at = datetime.now() + logger.info( + "触发定时@抽取任务: " + f"batch_size={batch_size}, window=[NOW-{window_start_minutes}m, NOW-{window_end_minutes}m)" + ) stats = self.message_db.process_pending_mentions( batch_size=batch_size, window_start_minutes=window_start_minutes, @@ -380,15 +385,18 @@ class MessageStorage: ) total = int(stats.get("total", 0)) if total == 0: - logger.debug("待处理@抽取队列为空,本轮跳过") + elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) + logger.info(f"定时@抽取任务结束: 无待处理数据, 耗时={elapsed_ms}ms") return + elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000) logger.info( - "批量@抽取完成: " + "定时@抽取任务结束: " f"读取={stats.get('total', 0)}, " f"处理={stats.get('processed', 0)}, " f"含@={stats.get('with_mentions', 0)}, " - f"失败={stats.get('failed', 0)}" + f"失败={stats.get('failed', 0)}, " + f"耗时={elapsed_ms}ms" ) except Exception as e: logger.exception(f"定时处理@抽取任务出错: {e}")