增强@批处理执行日志,提升可观测性
- 增加定时@抽取任务开始/结束日志,输出窗口参数、处理条数与耗时 - 在DB批处理层记录查询命中、前3条样本明细、失败样本ID,便于快速排障 - 增加窗口参数异常修正日志,避免误配置导致行为不可见 - 在社交图增量写入处增加跳过/完成日志,明确是否产生新增@关系
This commit is contained in:
@@ -152,6 +152,10 @@ class MessageStorageDB(BaseDBOperator):
|
||||
end_m = max(int(window_end_minutes), 0)
|
||||
if start_m <= end_m:
|
||||
start_m = end_m + 10
|
||||
self.LOG.warning(
|
||||
f"@抽取窗口参数异常,已自动修正: window_start_minutes={window_start_minutes}, "
|
||||
f"window_end_minutes={window_end_minutes}, 修正后=[NOW-{start_m}m, NOW-{end_m}m)"
|
||||
)
|
||||
|
||||
sql = """
|
||||
SELECT message_id, group_id, sender, message_xml, timestamp
|
||||
@@ -166,7 +170,11 @@ class MessageStorageDB(BaseDBOperator):
|
||||
ORDER BY timestamp ASC
|
||||
LIMIT %s
|
||||
"""
|
||||
return self.execute_query(sql, (start_m, end_m, limit)) or []
|
||||
rows = self.execute_query(sql, (start_m, end_m, limit)) or []
|
||||
self.LOG.debug(
|
||||
f"查询待抽取@消息: window=[NOW-{start_m}m, NOW-{end_m}m), limit={limit}, 命中={len(rows)}"
|
||||
)
|
||||
return rows
|
||||
|
||||
def process_pending_mentions(
|
||||
self,
|
||||
@@ -182,19 +190,30 @@ class MessageStorageDB(BaseDBOperator):
|
||||
- with_mentions: 提取到 @ 的消息条数
|
||||
- failed: 失败条数
|
||||
"""
|
||||
started_at = datetime.now()
|
||||
self.LOG.info(
|
||||
"开始执行@批处理: "
|
||||
f"batch_size={batch_size}, window_start_minutes={window_start_minutes}, "
|
||||
f"window_end_minutes={window_end_minutes}"
|
||||
)
|
||||
|
||||
rows = self.get_pending_mention_extract_messages(
|
||||
limit=batch_size,
|
||||
window_start_minutes=window_start_minutes,
|
||||
window_end_minutes=window_end_minutes,
|
||||
)
|
||||
if not rows:
|
||||
elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
|
||||
self.LOG.info(f"@批处理结束: 命中0条, 耗时={elapsed_ms}ms")
|
||||
return {"total": 0, "processed": 0, "with_mentions": 0, "failed": 0}
|
||||
|
||||
processed = 0
|
||||
with_mentions = 0
|
||||
failed = 0
|
||||
# 记录少量失败样本,便于快速定位问题,不刷屏。
|
||||
fail_samples: List[str] = []
|
||||
|
||||
for row in rows:
|
||||
for idx, row in enumerate(rows, start=1):
|
||||
try:
|
||||
message_id = str(row.get("message_id") or "").strip()
|
||||
group_id = str(row.get("group_id") or "").strip()
|
||||
@@ -223,16 +242,31 @@ class MessageStorageDB(BaseDBOperator):
|
||||
processed += 1
|
||||
if mentioned_user_ids:
|
||||
with_mentions += 1
|
||||
if idx <= 3:
|
||||
# 前3条打 debug 明细,便于确认当前批处理真实在工作。
|
||||
self.LOG.debug(
|
||||
f"@批处理样本[{idx}]: message_id={message_id}, group_id={group_id}, "
|
||||
f"sender={sender_id}, mentioned_count={len(mentioned_user_ids)}"
|
||||
)
|
||||
except Exception as e:
|
||||
failed += 1
|
||||
self.LOG.error(f"处理待抽取@消息失败: message_id={row.get('message_id')}, error={e}")
|
||||
if len(fail_samples) < 5:
|
||||
fail_samples.append(str(row.get("message_id") or ""))
|
||||
|
||||
return {
|
||||
elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
|
||||
stats = {
|
||||
"total": len(rows),
|
||||
"processed": processed,
|
||||
"with_mentions": with_mentions,
|
||||
"failed": failed,
|
||||
}
|
||||
self.LOG.info(
|
||||
f"@批处理结束: total={stats['total']}, processed={stats['processed']}, "
|
||||
f"with_mentions={stats['with_mentions']}, failed={stats['failed']}, 耗时={elapsed_ms}ms, "
|
||||
f"fail_samples={fail_samples}"
|
||||
)
|
||||
return stats
|
||||
|
||||
@staticmethod
|
||||
def _safe_parse_message_time(value) -> datetime:
|
||||
@@ -318,6 +352,9 @@ class MessageStorageDB(BaseDBOperator):
|
||||
existed_ids = {str(r.get("mentioned_user_id") or "").strip() for r in existed_rows}
|
||||
newly_mentioned_ids = [uid for uid in clean_mentioned_ids if uid not in existed_ids]
|
||||
if not newly_mentioned_ids:
|
||||
self.LOG.debug(
|
||||
f"社交图写入跳过(无新增@关系): message_id={message_id}, group_id={group_id}, sender={sender_id}"
|
||||
)
|
||||
return
|
||||
|
||||
# 1) 写 @ 明细表:用于追溯“哪条消息 @ 了谁”。
|
||||
@@ -391,6 +428,10 @@ class MessageStorageDB(BaseDBOperator):
|
||||
# 4) 回填 unique_interactors:针对本条消息受影响的用户实时重算“去重互动人数”。
|
||||
affected_user_ids = [sender_id, *newly_mentioned_ids]
|
||||
self._refresh_unique_interactors(stat_date, group_id, affected_user_ids)
|
||||
self.LOG.debug(
|
||||
f"社交图写入完成: message_id={message_id}, group_id={group_id}, sender={sender_id}, "
|
||||
f"new_mentions={len(newly_mentioned_ids)}, affected_users={len(affected_user_ids)}"
|
||||
)
|
||||
except Exception as e:
|
||||
# 社交图统计属于增强链路,不能反向影响主消息入库稳定性。
|
||||
self.LOG.error(f"写入社交图增量数据失败: {e}")
|
||||
|
||||
@@ -373,6 +373,11 @@ class MessageStorage:
|
||||
4. 默认只处理 10~20 分钟前的数据,减少对热数据区间的扫描压力。
|
||||
"""
|
||||
try:
|
||||
started_at = datetime.now()
|
||||
logger.info(
|
||||
"触发定时@抽取任务: "
|
||||
f"batch_size={batch_size}, window=[NOW-{window_start_minutes}m, NOW-{window_end_minutes}m)"
|
||||
)
|
||||
stats = self.message_db.process_pending_mentions(
|
||||
batch_size=batch_size,
|
||||
window_start_minutes=window_start_minutes,
|
||||
@@ -380,15 +385,18 @@ class MessageStorage:
|
||||
)
|
||||
total = int(stats.get("total", 0))
|
||||
if total == 0:
|
||||
logger.debug("待处理@抽取队列为空,本轮跳过")
|
||||
elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
|
||||
logger.info(f"定时@抽取任务结束: 无待处理数据, 耗时={elapsed_ms}ms")
|
||||
return
|
||||
|
||||
elapsed_ms = int((datetime.now() - started_at).total_seconds() * 1000)
|
||||
logger.info(
|
||||
"批量@抽取完成: "
|
||||
"定时@抽取任务结束: "
|
||||
f"读取={stats.get('total', 0)}, "
|
||||
f"处理={stats.get('processed', 0)}, "
|
||||
f"含@={stats.get('with_mentions', 0)}, "
|
||||
f"失败={stats.get('failed', 0)}"
|
||||
f"失败={stats.get('failed', 0)}, "
|
||||
f"耗时={elapsed_ms}ms"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"定时处理@抽取任务出错: {e}")
|
||||
|
||||
Reference in New Issue
Block a user