import asyncio import html import json import re import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, Tuple, Optional, List from loguru import logger from markupsafe import Markup try: # 优先使用 markdown 库做完整渲染(支持表格、代码块等)。 import markdown as markdown_lib except Exception: markdown_lib = None from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.message_summary_db import MessageSummaryDBOperator from utils.compress_chat_data import compress_chat_data from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.decorator.rate_limit_decorator import group_feature_rate_limit from utils.html_template_renderer import HtmlTemplateRenderer from utils.markdown_to_image import convert_md_str_to_image, html_to_image from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.string_utils import remove_reasoning_content, remove_trailing_content from utils.ai.unified_llm import UnifiedLLMClient from utils.wechat.contact_manager import ContactManager from utils.wechat.message_to_db import MessageStorage from wechat_ipad import WechatAPIClient class MessageSummaryPlugin(MessagePluginInterface): """消息总结插件,用于生成群聊消息总结""" # 功能权限常量 FEATURE_KEY = "SUMMARY_CAPABILITY" FEATURE_DESCRIPTION = "📝 群总结能力 [#总结]" @property def name(self) -> str: return "群聊总结" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "使用AI生成群聊消息总结" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "#" @property def commands(self) -> List[str]: return ["总结", "summary"] @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.message_storage = None self.message_summary_db = None self.revoke = None self._auto_revoke = None # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" try: # 从插件配置中获取API密钥和URL api_config = self._config.get("api", {}) self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo") self._api_base_url = str(api_config.get("api_base_url", "http://192.168.2.240/v1")).rstrip("/") self._api_mode = str(api_config.get("mode", "workflow")).strip().lower() default_endpoint = "workflows/run" if self._api_mode == "workflow" else "chat-messages" self._api_endpoint = str(api_config.get("endpoint", default_endpoint)).lstrip("/") self._api_url = api_config.get("api_url", f"{self._api_base_url}/{self._api_endpoint}") self._workflow_output_key = str(api_config.get("workflow_output_key", "text")).strip() self._response_mode = api_config.get("response_mode", "blocking") self._connect_timeout_seconds = int(api_config.get("connect_timeout_seconds", 10)) self._request_timeout_seconds = int(api_config.get("request_timeout_seconds", 180)) self._retry_delays_seconds = api_config.get("retry_delays_seconds", [10, 20]) # 输出阶段超时与体积保护:防止 Markdown 转图在异常环境下长时间卡死。 output_config = self._config.get("output", {}) self._image_render_timeout_seconds = int(output_config.get("image_render_timeout_seconds", 90)) # 默认只尝试 1 次,优先保证任务快速返回;需要更高成功率可在配置里提高。 self._image_render_retries = int(output_config.get("image_render_retries", 1)) # 输出模板配置: # 1. 默认使用 markdown,优先保证线上样式稳定; # 2. 若配置为 template 则走 HTML 模板生图; # 3. template 失败会自动回退到 markdown 模式,保证可用性。 self._summary_image_mode = str(output_config.get("summary_image_mode", "markdown")).strip().lower() self._summary_image_template_path = str( output_config.get("summary_image_template_path", "plugins/message_summary/templates/summary_card.html") ).strip() self.llm_client = UnifiedLLMClient(api_config) self._api_mode = self.llm_client.mode or self._api_mode self._response_mode = self.llm_client.response_mode or self._response_mode self._workflow_output_key = self.llm_client.workflow_output_key or self._workflow_output_key self.message_storage = MessageStorage() db_manager = context.get("db_manager") if db_manager: self.message_summary_db = MessageSummaryDBOperator(db_manager) self.LOG.debug(f"初始化 {self.name} 插件成功") return True except Exception as e: if hasattr(self, 'LOG'): self.LOG.error(f"初始化 {self.name} 插件失败: {e}") else: print(f"初始化 {self.name} 插件失败: {e}") return False def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True @plugin_stats_decorator(plugin_name="群聊总结") @plugin_points_cost(10, "群聊总结消耗积分", FEATURE_KEY) @group_feature_rate_limit(max_per_minute=30, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" try: # 检查是否是总结命令 content = str(message.get("content", "") or "").strip() self.bot: WechatAPIClient = message.get("bot") if not content.startswith(self.command_prefix): return False, None command_parts = content[len(self.command_prefix):].split() if not command_parts: return False, None command = command_parts[0] if command not in self.commands: return False, None # 获取需要总结的内容 group_id = message.get("roomid") self.revoke: MessageAutoRevoke = message.get("revoke") if not group_id: await self.bot.send_text_message(group_id, "只支持群聊消息总结", message.get("sender")) return False, None # 权限判断 gbm: GroupBotManager = message.get("gbm") if gbm and gbm.get_group_permission(group_id, self.feature) == PermissionStatus.DISABLED: return False, None # 从消息历史中获取群聊记录 all_contacts: dict = message.get("all_contacts") group_members: dict = ContactManager.get_instance().get_group_members(group_id) custom_date = self._parse_summary_date(command_parts[1] if len(command_parts) > 1 else "") if len(command_parts) > 1 and custom_date is None: await self.bot.send_text_message(group_id, "日期格式不对,用 `#总结 2026-04-08` 这种。", message.get("sender")) return False, None period_start, period_end = self._resolve_manual_summary_range(custom_date) message_count = self.message_storage.count_messages_by_date_range(group_id, period_start, period_end) message_stats = self.message_storage.get_message_stats_by_date_range(group_id, period_start, period_end) if message_count < 100: await self.bot.send_text_message( group_id, f"这段时间消息太少了,只有 {message_count} 条,不总结了。", message.get("sender"), ) return False, None chat_content = self.message_storage.get_messages_by_date_range(group_id, group_members, period_start, period_end) if not chat_content: await self.bot.send_text_message(group_id, "这段时间没捞到可总结的聊天记录。", message.get("sender")) return False, None # 获取群名并处理 group_name = all_contacts.get(group_id, group_id) group_name = self._sanitize_group_name(group_name) period_label = self._format_summary_period_label(period_start, period_end, custom_date is not None) client_msg_id, create_time, new_msg_id = await self.bot.send_text_message( group_id, f"⏳ 正在总结 [{period_label}] 的群消息… 😊", ) self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) # 创建线程异步处理总结生成和发送 res = await self._async_generate_and_send_summary( chat_content, group_name, group_id, message, period_start=period_start, period_end=period_end, message_count=message_count, message_stats=message_stats, ) if res: return True, "异步总结已启动" else: return False, "总结失败" except Exception as e: self.LOG.error(f"处理消息总结命令失败: {e}") return False, None def get_schedule_actions(self) -> List[Dict[str, Any]]: """声明群总结插件支持的后台可配置定时动作。""" return [ { "action_key": "daily_summary", "name": "昨日群聊总结推送", "description": "每天自动总结昨天群聊内容并发送到目标群", "trigger_type": "at_times", "trigger_config": {"time_list": ["09:00"]}, "target_scope": "all_enabled_groups", "target_config": {}, "payload": {"min_messages": 100}, # 保持与原先硬编码任务一致:默认启用。 "default_enabled": True, } ] async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]: """执行群总结定时动作,返回结构化执行结果。""" if action_key != "daily_summary": return { "success": False, "summary": f"不支持的动作: {action_key}", "detail": {"action_key": action_key}, } payload = context.get("payload") or {} # 使用后台可配置参数控制最低消息阈值,避免写死在代码里。 min_messages = int(payload.get("min_messages", 100)) target_groups = context.get("target_groups") or [] result = await self.daily_summary_job( target_groups=[str(g).strip() for g in target_groups if str(g).strip()], min_message_count=min_messages, ) return { "success": bool(result.get("failed_groups", 0) == 0), "summary": ( f"群总结完成: 目标{result.get('total_groups', 0)}群," f"发送成功{result.get('sent_groups', 0)}群,失败{result.get('failed_groups', 0)}群" ), "detail": result, } async def _async_generate_and_send_summary( self, chat_content: str, group_name: str, group_id: str, message: Dict[str, Any], period_start: Optional[datetime] = None, period_end: Optional[datetime] = None, message_count: Optional[int] = None, message_stats: Optional[Dict[str, int]] = None, ): """异步生成并发送总结""" try: # 生成总结 summary, image_path = await self._generate_summary(chat_content, group_name, message_stats=message_stats) if image_path: # 图片生成成功,发送图片 await self.bot.send_image_message(group_id, Path(image_path)) if period_start and period_end and message_count is not None: self._save_daily_summary_record( group_id, group_name, period_start, period_end, message_count, summary, str(image_path), ) self.LOG.info(f"成功发送图片总结到群 {group_id}") return True else: # 图片生成失败,发送文本消息 if summary and len(summary.strip()) > 0: # 截断过长的文本 max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, summary) self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 30) if period_start and period_end and message_count is not None: self._save_daily_summary_record( group_id, group_name, period_start, period_end, message_count, summary, None, ) self.LOG.info(f"图片生成失败,已发送文本总结到群 {group_id}") return True else: # 连文本内容都没有 client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, "❌ 生成总结失败,请稍后再试!") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) return False except Exception as e: self.LOG.error(f"异步生成总结失败: {e}") client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, f"❌ 生成总结失败,请稍后再试") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) return False @staticmethod def _parse_summary_date(raw: str) -> Optional[datetime]: text = str(raw or "").strip() if not text: return None try: return datetime.strptime(text, "%Y-%m-%d") except ValueError: return None @staticmethod def _resolve_manual_summary_range(custom_date: Optional[datetime]) -> Tuple[datetime, datetime]: now = datetime.now() if custom_date is not None: day_start = custom_date.replace(hour=0, minute=0, second=0, microsecond=0) day_end = custom_date.replace(hour=23, minute=59, second=59, microsecond=999999) return day_start, day_end yesterday = now - timedelta(days=1) start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) return start, now @staticmethod def _format_summary_period_label(start_time: datetime, end_time: datetime, is_custom_day: bool) -> str: if is_custom_day: return start_time.strftime("%Y-%m-%d") return f"{start_time.strftime('%Y-%m-%d 00:00')} 到现在" def _sanitize_group_name(self, group_name: str) -> str: """处理群名,去除特殊字符并限制长度""" # 去除特殊字符,只保留字母、数字、中文和基本标点 sanitized_name = re.sub(r'[^\w\s\u4e00-\u9fff,.,。]', '', group_name) # 限制长度为15个字符 if len(sanitized_name) > 15: sanitized_name = sanitized_name[:15] # 如果处理后为空,则使用默认名称 if not sanitized_name: sanitized_name = "群聊" return sanitized_name def _append_usage_info(self, answer: str, metadata: Dict[str, Any]) -> str: """把 token 统计追加到总结文本末尾""" if not answer or not answer.strip(): return answer usage = metadata.get("usage", {}) if metadata else {} if not usage: return answer prompt_tokens = usage.get("prompt_tokens") completion_tokens = usage.get("completion_tokens") total_tokens = usage.get("total_tokens") latency = usage.get("latency") info_parts = [] # chat-messages 风格的明细 if prompt_tokens is not None and completion_tokens is not None: info_parts.append(f"输入: {prompt_tokens}") info_parts.append(f"生成: {completion_tokens}") # workflow 常见只有 total_tokens if total_tokens is not None: info_parts.append(f"总: {total_tokens}") if latency is not None: info_parts.append(f"耗时: {latency}s") if not info_parts: return answer tokens_info = "\n\n【tokens】" + " ".join(info_parts) return answer + tokens_info def _clean_summary_output(self, answer: str) -> str: """清理总结输出中的思考内容和无关尾部内容""" if not answer: return answer cleaned = remove_reasoning_content(answer) cleaned = remove_trailing_content(cleaned) cleaned = re.sub(r'\n{3,}', '\n\n', cleaned).strip() return cleaned @staticmethod def _sanitize_rendered_html(rendered_html: str) -> str: """对渲染后的 HTML 做最小安全过滤。 安全策略: 1. 移除 script/style/iframe 等高风险标签,避免模板渲染执行脚本; 2. 清除行内事件属性(onload/onerror/onclick...); 3. 禁止 javascript: 协议链接。 说明: - 这里是“轻量过滤”,目标是平衡安全与展示完整度; - 若后续需要更严格过滤,可接入专门的 HTML Sanitizer。 """ safe_html = str(rendered_html or "") # 删除高风险标签及其内容。 safe_html = re.sub( r"<\s*(script|style|iframe|object|embed|form|link|meta)\b[^>]*>.*?<\s*/\s*\1\s*>", "", safe_html, flags=re.IGNORECASE | re.DOTALL, ) # 删除自闭合高风险标签。 safe_html = re.sub( r"<\s*(script|style|iframe|object|embed|form|link|meta)\b[^>]*/\s*>", "", safe_html, flags=re.IGNORECASE | re.DOTALL, ) # 删除行内事件处理器属性。 safe_html = re.sub(r"\son[a-zA-Z]+\s*=\s*(['\"]).*?\1", "", safe_html, flags=re.IGNORECASE | re.DOTALL) # 阻断 javascript: 链接。 safe_html = re.sub( r"""(href|src)\s*=\s*(['"])\s*javascript:[^'"]*\2""", r'\1=\2#\2', safe_html, flags=re.IGNORECASE, ) return safe_html @classmethod def _summary_markdown_to_html(cls, summary_text: str) -> str: """把总结 Markdown 转为 HTML 片段(模板内部展示用)。 升级点: 1. 使用 markdown 库完整支持标题、列表、粗斜体、引用、代码块、表格等结构; 2. 对 LLM 输出里常见的富标签 markdown(例如 ```、|表格|、> 引用)效果更好; 3. 渲染后做一次轻量安全过滤,避免模板内注入脚本。 """ text = str(summary_text or "").strip() if not text: return "
暂无总结内容。
" # 兼容处理: # 1. 环境安装了 markdown 库时,走完整渲染; # 2. 未安装时自动降级到内置轻量转换,避免插件启动失败。 if markdown_lib is not None: rendered = markdown_lib.markdown( text, extensions=[ "extra", # 综合扩展:支持表格、定义列表等 "fenced_code", # 支持 ``` 代码块 "tables", # 支持 Markdown 表格 "sane_lists", # 更稳定的列表解析 "nl2br", # 保留换行,提升聊天总结可读性 ], output_format="html5", ) return cls._sanitize_rendered_html(rendered) # 轻量回退实现(兼容无 markdown 三方包的运行环境)。 lines = text.splitlines() html_parts: List[str] = [] list_items: List[str] = [] def flush_list() -> None: if not list_items: return html_parts.append("{html.escape(line)}
") flush_list() rendered = "".join(html_parts) return cls._sanitize_rendered_html(rendered) @staticmethod def _strip_markdown_inline(text: str) -> str: """清理行内 Markdown 标记,输出可直接展示的纯文本。 设计说明: 1. 仅处理常见行内语法(粗体/斜体/代码/链接),避免把模板渲染复杂度继续堆高; 2. 不在这里做 HTML 生成,保证“结构化渲染层”始终输出纯文本; 3. 处理后再交给模板做自动转义,避免注入风险。 """ cleaned = str(text or "").strip() if not cleaned: return "" # 图片语法: -> alt cleaned = re.sub(r"!\[([^\]]*)\]\([^)]+\)", r"\1", cleaned) # 链接语法:[text](url) -> text cleaned = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", cleaned) # 行内代码:`code` -> code cleaned = re.sub(r"`([^`]+)`", r"\1", cleaned) # 粗体/斜体标记清理 cleaned = re.sub(r"(\*\*|__)(.*?)\1", r"\2", cleaned) cleaned = re.sub(r"(\*|_)(.*?)\1", r"\2", cleaned) # 删除零宽字符,避免昵称或内容出现布局异常。 cleaned = re.sub(r"[\u200b-\u200f\u2060\ufeff]", "", cleaned) return cleaned.strip() @classmethod def _extract_llm_payload_text(cls, summary_text: str) -> str: """从 LLM 返回文本中提取真正的总结正文。 兼容场景: 1. 直接返回 Markdown 正文; 2. 返回 JSON 字符串(如 {"text":"..."}); 3. 返回被双引号包裹且含转义换行的字符串。 """ text = str(summary_text or "").strip() if not text: return "" # 第一层:尝试按 JSON 解析外层包装。 try: if (text.startswith("{") and text.endswith("}")) or (text.startswith("[") and text.endswith("]")): payload = json.loads(text) if isinstance(payload, dict): for key in ("text", "summary", "answer", "content", "result"): value = payload.get(key) if isinstance(value, str) and value.strip(): return value.strip() if isinstance(payload, str) and payload.strip(): return payload.strip() except Exception: pass # 第二层:处理被引号包裹的 JSON 字符串(例如 "\"# 标题\\n内容\"")。 try: if text.startswith('"') and text.endswith('"'): decoded = json.loads(text) if isinstance(decoded, str) and decoded.strip(): return decoded.strip() except Exception: pass return text @classmethod def _extract_json_object_from_text(cls, raw_text: str) -> Optional[Dict[str, Any]]: """从文本中提取 JSON 对象(优先服务 template 模式)。""" # 设计说明: # 1. 优先支持三类常见返回:纯 JSON、```json 代码块、被前后说明文字包裹的 JSON; # 2. 仅返回 dict,避免数组/字符串误入模板渲染链路; # 3. 解析失败时返回 None,不中断主流程,后续自动回退 Markdown 结构提取。 text = str(raw_text or "").strip() if not text: return None # 场景一:整段就是 JSON 对象。 try: if text.startswith("{") and text.endswith("}"): parsed = json.loads(text) if isinstance(parsed, dict): return parsed except Exception: pass # 场景二:```json ... ``` 包裹。 fenced_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", text, flags=re.IGNORECASE) if fenced_match: candidate = str(fenced_match.group(1) or "").strip() try: parsed = json.loads(candidate) if isinstance(parsed, dict): return parsed except Exception: pass # 场景三:文本中夹杂 JSON。采用大括号包围段做兜底提取。 left = text.find("{") right = text.rfind("}") if left >= 0 and right > left: candidate = text[left:right + 1].strip() try: parsed = json.loads(candidate) if isinstance(parsed, dict): return parsed except Exception: return None return None @classmethod def _normalize_json_text_list(cls, value: Any, limit: int = 6, item_max_len: int = 120) -> List[str]: """把 JSON 字段标准化为字符串列表。""" # 设计说明: # 1. 兼容字符串、数组、混合对象等脏数据输入; # 2. 统一做 Markdown 行内清理,避免样式噪音进入模板; # 3. 强制长度与条数上限,防止单条过长撑爆卡片布局。 texts: List[str] = [] if isinstance(value, str): candidate = cls._strip_markdown_inline(value).strip() if candidate: texts.append(candidate[:item_max_len]) return texts[:limit] if not isinstance(value, list): return texts for item in value: if isinstance(item, str): candidate = cls._strip_markdown_inline(item).strip() elif isinstance(item, dict): candidate = cls._strip_markdown_inline( str(item.get("text") or item.get("title") or item.get("value") or "") ).strip() else: candidate = cls._strip_markdown_inline(str(item or "")).strip() if not candidate: continue texts.append(candidate[:item_max_len]) if len(texts) >= limit: break return texts @classmethod def _extract_template_json_data(cls, summary_text: str) -> Optional[Dict[str, Any]]: """提取 template 模式专用 JSON 结构。""" # 说明: # 1. 允许 LLM 按固定 schema 输出 JSON,渲染稳定性显著高于 Markdown 再解析; # 2. 这里做“宽松字段兼容”,便于后续提示词小幅调整也不影响线上; # 3. 只有检测到有效 JSON 且关键字段存在时才返回,避免误判。 payload = cls._extract_json_object_from_text(summary_text) if not payload: return None title = cls._strip_markdown_inline( str(payload.get("title") or payload.get("document_title") or payload.get("doc_title") or "") ).strip() lead = cls._strip_markdown_inline( str(payload.get("lead") or payload.get("summary_lead") or payload.get("overview") or "") ).strip() fallback_text = cls._strip_markdown_inline( str(payload.get("fallback_text") or payload.get("raw_summary") or "") ).strip() # 解析话题卡片。 topic_cards: List[Dict[str, Any]] = [] topics = payload.get("topics") if isinstance(topics, list): for raw_topic in topics: if not isinstance(raw_topic, dict): continue topic_title = cls._clean_topic_title(str(raw_topic.get("title") or raw_topic.get("name") or "")) if not topic_title: topic_title = "未命名话题" overview_points = cls._normalize_json_text_list( raw_topic.get("overview_points") or raw_topic.get("key_points") or raw_topic.get("highlights"), limit=3, item_max_len=120, ) analysis_points = cls._normalize_json_text_list( raw_topic.get("analysis_points") or raw_topic.get("analysis"), limit=2, item_max_len=120, ) quote_text = cls._strip_markdown_inline(str(raw_topic.get("quote_text") or raw_topic.get("quote") or "")).strip() time_range = cls._strip_markdown_inline(str(raw_topic.get("time_range") or raw_topic.get("time") or "")).strip() participants = cls._strip_markdown_inline( str(raw_topic.get("participants") or raw_topic.get("participant_count") or "") ).strip() topic_cards.append( { "title": topic_title[:42], "time_range": time_range[:58], "participants": participants[:42], "overview_points": overview_points, "analysis_points": analysis_points, "quote_text": quote_text[:120], } ) if len(topic_cards) >= 5: break # 解析命名模块。 named_modules = { "shared_resources": cls._normalize_json_text_list(payload.get("shared_resources"), limit=6, item_max_len=110), "marketplace": cls._normalize_json_text_list(payload.get("marketplace"), limit=6, item_max_len=110), "unresolved_pool": cls._normalize_json_text_list(payload.get("unresolved_pool"), limit=4, item_max_len=110), "core_points": cls._normalize_json_text_list(payload.get("core_knowledge_points") or payload.get("core_points"), limit=4, item_max_len=110), "top_contributors": cls._normalize_json_text_list(payload.get("top_contributors"), limit=3, item_max_len=18), } # 构造 sections 给现有统计提取逻辑复用。 sections: List[Dict[str, Any]] = [] for topic in topic_cards: items: List[Dict[str, str]] = [] for line in topic.get("overview_points", []): items.append({"kind": "bullet", "text": line}) for line in topic.get("analysis_points", []): items.append({"kind": "paragraph", "text": line}) if topic.get("quote_text"): items.append({"kind": "quote", "text": topic["quote_text"]}) sections.append({"title": topic.get("title", "未命名话题"), "items": items}) if not topic_cards and not any(named_modules.values()) and not lead and not title: return None if not lead and topic_cards: lead = (topic_cards[0].get("overview_points") or [""])[0] if not fallback_text: fallback_text = lead or "暂无总结内容。" return { "document_title": title, "lead": lead or "暂无总结内容。", "fallback_text": fallback_text, "sections": sections, "topic_cards": topic_cards, "named_modules": named_modules, } @classmethod def _build_summary_layout_data(cls, summary_text: str) -> Dict[str, Any]: """把 LLM 总结文本重排为模板可直接消费的结构化数据。 输出结构: - lead: 导语文本(通常来自一级标题后的首段) - sections: 分节内容,每节包含 title + items - fallback_text: 兜底纯文本(解析失败时仍可展示) """ raw_text = cls._extract_llm_payload_text(summary_text) lines = [line.rstrip() for line in str(raw_text or "").splitlines()] # 标准化:去除分隔线与空白噪音,减少模板渲染无效块。 normalized_lines: List[str] = [] for raw_line in lines: line = str(raw_line or "").strip() if not line: normalized_lines.append("") continue if re.fullmatch(r"[-*_]{3,}", line): # Markdown 横线仅用于语义分隔,结构化渲染里直接跳过。 continue normalized_lines.append(line) document_title = "" lead_lines: List[str] = [] sections: List[Dict[str, Any]] = [] current_section: Optional[Dict[str, Any]] = None paragraph_buffer: List[str] = [] in_code_block = False code_lines: List[str] = [] def flush_paragraph() -> None: """把段落缓存写入当前分节。""" nonlocal paragraph_buffer, current_section if not paragraph_buffer: return paragraph_text = cls._strip_markdown_inline(" ".join(paragraph_buffer).strip()) paragraph_buffer = [] if not paragraph_text: return if current_section is None: lead_lines.append(paragraph_text) return current_section["items"].append({"kind": "paragraph", "text": paragraph_text}) def ensure_section(title: str) -> None: """确保当前分节存在,并切换到新分节。""" nonlocal current_section clean_title = cls._strip_markdown_inline(title) or "未命名章节" current_section = {"title": clean_title, "items": []} sections.append(current_section) for line in normalized_lines: # 代码块开始/结束处理。 if line.startswith("```"): flush_paragraph() if in_code_block: if current_section is None: ensure_section("代码片段") code_text = "\n".join(code_lines).strip() if code_text: current_section["items"].append({"kind": "code", "text": code_text}) code_lines = [] in_code_block = False else: in_code_block = True code_lines = [] continue if in_code_block: code_lines.append(line) continue if not line: flush_paragraph() continue # 一级标题:用于页面主标题文案,不作为分节。 h1_match = re.match(r"^#\s+(.+)$", line) if h1_match: flush_paragraph() title_text = cls._strip_markdown_inline(h1_match.group(1)) if title_text: document_title = title_text continue # 二级及以下标题统一作为分节标题。 section_match = re.match(r"^#{2,6}\s+(.+)$", line) if section_match: flush_paragraph() ensure_section(section_match.group(1)) continue # 有些模型会输出“1. 标题”这种无井号标题,识别后转成分节。 numeric_heading_match = re.match(r"^\d+[.)、]\s+(.+)$", line) if numeric_heading_match and len(cls._strip_markdown_inline(numeric_heading_match.group(1))) <= 40: flush_paragraph() ensure_section(numeric_heading_match.group(1)) continue # 引用块。 quote_match = re.match(r"^>\s*(.+)$", line) if quote_match: flush_paragraph() if current_section is None: ensure_section("精选引用") quote_text = cls._strip_markdown_inline(quote_match.group(1)) if quote_text: current_section["items"].append({"kind": "quote", "text": quote_text}) continue # 无序列表。 bullet_match = re.match(r"^[-*+]\s+(.+)$", line) if bullet_match: flush_paragraph() if current_section is None: ensure_section("重点清单") bullet_text = cls._strip_markdown_inline(bullet_match.group(1)) if bullet_text: current_section["items"].append({"kind": "bullet", "text": bullet_text}) continue # 有序列表作为 bullet 统一渲染,避免复杂列表嵌套导致样式凌乱。 ordered_match = re.match(r"^\d+[.)]\s+(.+)$", line) if ordered_match: flush_paragraph() if current_section is None: ensure_section("重点清单") ordered_text = cls._strip_markdown_inline(ordered_match.group(1)) if ordered_text: current_section["items"].append({"kind": "bullet", "text": ordered_text}) continue # 其余内容按段落累计,遇到空行或结构标记时再落盘。 paragraph_buffer.append(line) flush_paragraph() if in_code_block and code_lines: if current_section is None: ensure_section("代码片段") code_text = "\n".join(code_lines).strip() if code_text: current_section["items"].append({"kind": "code", "text": code_text}) # 清理空章节,并过滤“统计注入段”等不需要重复展示的标题。 skip_titles = {"群概览", "tokens", "统计信息", "消息统计"} cleaned_sections: List[Dict[str, Any]] = [] for section in sections: if not section.get("items"): continue title_text = str(section.get("title") or "").strip().lower() if title_text in skip_titles: continue cleaned_sections.append(section) # 导语优先使用 lead_lines,否则取首个章节首条,保证顶部一定有可读摘要。 lead = " ".join(lead_lines).strip() if not lead and cleaned_sections: first_items = cleaned_sections[0].get("items") or [] if first_items: lead = str(first_items[0].get("text", "")).strip() if not lead: lead = "暂无总结内容。" # 模板兜底纯文本:用于极端解析失败场景。 fallback_text = cls._strip_markdown_inline( "\n".join([line for line in normalized_lines if line]).strip() ) or "暂无总结内容。" return { "document_title": document_title, "lead": lead, "sections": cleaned_sections, "fallback_text": fallback_text, } @staticmethod def _to_int(value: Any) -> int: """安全转换为整数,避免模板渲染阶段因脏数据抛错。""" try: return int(value or 0) except Exception: return 0 @classmethod def _build_summary_template_metrics( cls, message_stats: Optional[Dict[str, Any]], layout_data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """构建 Gemini 模板高信息密度展示所需的统计视图数据。""" # 说明: # 1. 该函数专注“模板展示层”数据准备,不改动业务存储; # 2. 尽量从 message_stats 与结构化 sections 中提取可读指标; # 3. 数据不全时采用稳妥兜底,避免模板渲染失败。 stats = message_stats or {} usage = (metadata or {}).get("usage", {}) or {} sections = layout_data.get("sections") or [] total_count = cls._to_int(stats.get("total_count")) participant_count = cls._to_int(stats.get("participant_count")) text_count = cls._to_int(stats.get("text_count")) image_count = cls._to_int(stats.get("image_count")) video_count = cls._to_int(stats.get("video_count")) link_count = cls._to_int(stats.get("link_count")) emoji_count = cls._to_int(stats.get("emoji_count")) # 口径兜底: # 1. 在极端统计异常场景下,文本数/活跃人数不应超过总消息数; # 2. 这里做展示层修正,避免页面出现“Text > Msgs”这类反直觉数据。 if total_count > 0: text_count = min(text_count, total_count) participant_count = min(participant_count, total_count) # 媒体量:把图片+视频合并,符合卡片化概览阅读习惯。 media_count = image_count + video_count section_count = len(sections) bullet_count = 0 quote_count = 0 code_count = 0 for section in sections: for item in section.get("items", []): kind = str(item.get("kind") or "") if kind == "bullet": bullet_count += 1 elif kind == "quote": quote_count += 1 elif kind == "code": code_count += 1 # 活跃度等级:仅用于 UI 文案提示,不参与核心业务计算。 if total_count >= 1600: activity_badge = "爆炸活跃" elif total_count >= 1000: activity_badge = "高活跃" elif total_count >= 400: activity_badge = "中活跃" else: activity_badge = "常规活跃" # 话题雷达:优先取前几节标题,避免依赖额外 LLM 输出字段。 topic_tags: List[str] = [] for section in sections: title_text = str(section.get("title") or "").strip() if not title_text: continue normalized = re.sub(r"^[\d\W_]+", "", title_text) normalized = re.sub(r"\s+", " ", normalized).strip() if not normalized: continue topic_tags.append(normalized[:24]) if len(topic_tags) >= 6: break # 核心看点:优先抓 bullet,没有则回退 paragraph。 highlights: List[str] = [] for section in sections: for item in section.get("items", []): item_text = str(item.get("text") or "").strip() if not item_text: continue if item.get("kind") == "bullet": highlights.append(item_text[:120]) if len(highlights) >= 4: break if len(highlights) >= 4: break if not highlights: for section in sections: for item in section.get("items", []): item_text = str(item.get("text") or "").strip() if not item_text: continue highlights.append(item_text[:120]) if len(highlights) >= 4: break if len(highlights) >= 4: break # 使用统计:读取模型调用元数据,辅助观测总结成本。 total_tokens = cls._to_int(usage.get("total_tokens")) prompt_tokens = cls._to_int(usage.get("prompt_tokens")) completion_tokens = cls._to_int(usage.get("completion_tokens")) latency = usage.get("latency") latency_text = f"{latency}s" if latency not in (None, "") else "-" return { "kpi_cards": [ {"label": "Msgs", "value": total_count, "tone": "slate"}, {"label": "Active", "value": participant_count, "tone": "blue"}, {"label": "Text", "value": text_count, "tone": "violet"}, {"label": "Media", "value": media_count, "tone": "emerald"}, ], "mini_stats": [ {"label": "Links", "value": link_count}, {"label": "Emoji", "value": emoji_count}, {"label": "Video", "value": video_count}, {"label": "Sections", "value": section_count}, {"label": "Bullets", "value": bullet_count}, {"label": "Quotes", "value": quote_count}, {"label": "Code", "value": code_count}, ], "topic_tags": topic_tags, "highlights": highlights, "activity_badge": activity_badge, "token_total": total_tokens, "token_prompt": prompt_tokens, "token_completion": completion_tokens, "latency_text": latency_text, } @staticmethod def _clean_topic_title(title: str) -> str: """清理话题标题噪音,保持模板展示紧凑可读。""" text = str(title or "").strip() if not text: return "未命名话题" # 去掉 markdown/序号/emoji 前缀噪音。 text = re.sub(r"^[#\s\d\W_]+", "", text) text = text.replace("【", "").replace("】", "") text = re.sub(r"\s+", " ", text).strip() return text[:42] if text else "未命名话题" @classmethod def _build_topic_cards_from_sections(cls, sections: List[Dict[str, Any]], limit: int = 5) -> List[Dict[str, Any]]: """把结构化 sections 聚合为“话题卡片”列表(最多 5 个)。""" if not sections: return [] # 说明: # 1. LLM 常见输出是“话题标题 + 核心观点/客观分析/亮点瞬间”; # 2. 这里通过标题关键字与邻近分节聚合,避免渲染成冗长平铺列表。 topic_start_indices: List[int] = [] for idx, section in enumerate(sections): title_text = str(section.get("title") or "").strip() if not title_text: continue if "【" in title_text and "】" in title_text: topic_start_indices.append(idx) continue if re.match(r"^\d+[.)、]\s*", title_text): topic_start_indices.append(idx) continue if any(key in title_text for key in ["话题", "讨论", "专题"]): topic_start_indices.append(idx) if not topic_start_indices: # 兜底:没有显式话题标题时,按前 N 个分节强制抽取。 topic_start_indices = list(range(min(limit, len(sections)))) # 去重并排序,保证聚合窗口有序。 topic_start_indices = sorted(set(topic_start_indices))[:limit] cards: List[Dict[str, Any]] = [] for pos, start_idx in enumerate(topic_start_indices): end_idx = topic_start_indices[pos + 1] if pos + 1 < len(topic_start_indices) else len(sections) group_sections = sections[start_idx:end_idx] if not group_sections: continue main_title = cls._clean_topic_title(str(group_sections[0].get("title") or "")) topic = { "title": main_title, "time_range": "", "participants": "", "overview_points": [], "analysis_points": [], "quote_text": "", } for sec in group_sections: sec_title = str(sec.get("title") or "").strip() lower_title = sec_title.lower() items = sec.get("items") or [] item_texts = [str(item.get("text") or "").strip() for item in items if str(item.get("text") or "").strip()] # 从任意段提取时段/参与人数,兼容“放在 bullet 里”的输出。 for text in item_texts: if not topic["time_range"] and ("时段" in text or "时间" in text): topic["time_range"] = text[:58] if not topic["participants"] and ("参与人数" in text or "人参与" in text): topic["participants"] = text[:42] # 依据子标题语义归类内容。 if any(key in lower_title for key in ["核心观点", "观点回顾", "要点"]): for text in item_texts: topic["overview_points"].append(text[:120]) elif any(key in lower_title for key in ["客观分析", "深度分析", "分析"]): for text in item_texts: topic["analysis_points"].append(text[:120]) elif any(key in lower_title for key in ["亮点瞬间", "金句", "高光"]): for text in item_texts: if not topic["quote_text"]: topic["quote_text"] = text[:120] else: # 未命中的子分节,优先补进 overview,保持信息不丢。 for text in item_texts: topic["overview_points"].append(text[:120]) # 控制单卡体积,避免一张卡过长压垮整图布局。 topic["overview_points"] = topic["overview_points"][:3] topic["analysis_points"] = topic["analysis_points"][:2] if not topic["quote_text"] and topic["analysis_points"]: topic["quote_text"] = topic["analysis_points"][0][:110] cards.append(topic) if len(cards) >= limit: break return cards @classmethod def _build_auxiliary_sections(cls, sections: List[Dict[str, Any]], topic_titles: List[str]) -> List[Dict[str, Any]]: """抽取非话题区块(如交易/荣誉榜),用于页面底部辅助展示。""" if not sections: return [] title_set = {str(t).strip() for t in topic_titles if str(t).strip()} aux_sections: List[Dict[str, Any]] = [] for section in sections: section_title = cls._clean_topic_title(str(section.get("title") or "")) if not section_title or section_title in title_set: continue # 仅保留明显“功能区”类型标题,防止重复渲染话题细节块。 if not any(key in section_title for key in ["交易", "资源", "荣誉", "MVP", "排行榜", "总结", "快报"]): continue texts: List[str] = [] for item in section.get("items", []): text = str(item.get("text") or "").strip() if text: texts.append(text[:110]) if len(texts) >= 4: break if not texts: continue aux_sections.append({"title": section_title, "items": texts}) if len(aux_sections) >= 3: break return aux_sections @classmethod def _collect_section_texts(cls, section: Dict[str, Any], limit: int = 6) -> List[str]: """从分节中提取可展示文本,统一做长度控制。""" texts: List[str] = [] for item in section.get("items", []): text = str(item.get("text") or "").strip() if not text: continue # 说明:模板展示强调“短信息块”,避免单条过长撑爆布局。 texts.append(text[:120]) if len(texts) >= limit: break return texts @classmethod def _extract_contributor_names_from_texts(cls, texts: List[str], limit: int = 3) -> List[str]: """从文本中抽取贡献者昵称(优先提取 @昵称)。""" names: List[str] = [] for text in texts: # 说明:昵称不做翻译和重写,尽量保留原始 @ 片段,兼容中文/英文/符号昵称。 for match in re.findall(r"@([^\s::,,。]{1,24})", text): name = str(match).strip() if not name or name in names: continue names.append(name) if len(names) >= limit: return names # 若没有 @昵称,则退化为文本前缀,保证至少有可展示头像缩写数据。 for text in texts: candidate = cls._strip_markdown_inline(text).strip() if not candidate: continue candidate = candidate[:12] if candidate in names: continue names.append(candidate) if len(names) >= limit: break return names @classmethod def _build_template_named_modules(cls, sections: List[Dict[str, Any]]) -> Dict[str, Any]: """按 gemini-code 模板模块语义抽取结构化数据。""" modules: Dict[str, Any] = { "shared_resources": [], "marketplace": [], "unresolved_pool": [], "core_points": [], "top_contributors": [], } if not sections: return modules for section in sections: title_raw = str(section.get("title") or "").strip() title = title_raw.lower() if not title: continue texts = cls._collect_section_texts(section, limit=8) if not texts: continue # 资源区:仓库、文档、工具链接等。 if any(key in title for key in ["shared resources", "资源", "工具", "链接"]): for text in texts: if text not in modules["shared_resources"]: modules["shared_resources"].append(text) continue # 交易区:出/求/报价/服务等。 if any(key in title for key in ["marketplace", "交易", "卖货", "快报"]): for text in texts: if text not in modules["marketplace"]: modules["marketplace"].append(text) continue # 待解问题池。 if any(key in title for key in ["unresolved", "待解", "未解决", "问题池"]): for text in texts: if text not in modules["unresolved_pool"]: modules["unresolved_pool"].append(text) continue # 核心知识点。 if any(key in title for key in ["core knowledge", "知识", "经验", "配置"]): for text in texts: if text not in modules["core_points"]: modules["core_points"].append(text) continue # 贡献者/荣誉榜。 if any(key in title for key in ["top contributors", "荣誉榜", "mvp", "贡献者"]): names = cls._extract_contributor_names_from_texts(texts, limit=3) for name in names: if name not in modules["top_contributors"]: modules["top_contributors"].append(name) continue # 兜底:若未抽到贡献者,则尝试从全部分节文本粗提取。 if not modules["top_contributors"]: all_texts: List[str] = [] for section in sections: all_texts.extend(cls._collect_section_texts(section, limit=2)) modules["top_contributors"] = cls._extract_contributor_names_from_texts(all_texts, limit=3) # 统一上限,控制页面高度与信息密度。 modules["shared_resources"] = modules["shared_resources"][:6] modules["marketplace"] = modules["marketplace"][:6] modules["unresolved_pool"] = modules["unresolved_pool"][:4] modules["core_points"] = modules["core_points"][:4] modules["top_contributors"] = modules["top_contributors"][:3] return modules @classmethod def _build_resource_hub_items(cls, resources: List[str]) -> List[Dict[str, str]]: """把资源文本转换为“资源库列表”展示数据。 设计说明: 1. 该方法只做轻量规则分类,不依赖额外模型推理,保证稳定与速度; 2. 输出包含 icon/type/title,模板可直接渲染“左图标-中标题-右箭头”结构; 3. 资源条目数量限制在 6 条,避免长图膨胀。 """ items: List[Dict[str, str]] = [] for raw in resources or []: text = cls._strip_markdown_inline(str(raw or "").strip()) if not text: continue lower = text.lower() # 说明:以关键词做资源类型判断,优先覆盖仓库、文档、链接、附件四类。 if "github.com" in lower or "gitlab" in lower or "仓库" in text: icon = "⌘" kind = "Repo" elif "http://" in lower or "https://" in lower or "链接" in text: icon = "↗" kind = "Link" elif any(k in lower for k in [".pdf", ".doc", ".ppt", "文档", "手册", "白皮书"]): icon = "▣" kind = "Doc" else: icon = "•" kind = "Item" items.append( { "icon": icon, "type": kind, "title": text[:72], } ) if len(items) >= 6: break return items def _render_summary_template_html( self, group_name: str, summary_text: str, message_stats: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None, ) -> str: """根据模板路径渲染总结图片 HTML。""" # 约束: # 1. 不再把 LLM 原文直接转 HTML 内嵌到模板; # 2. 先结构化解析文本,再由模板按组件渲染,稳定控制最终排版。 renderer = HtmlTemplateRenderer() # 解析策略: # 1. template 模式优先吃 JSON(稳定、可控、低漂移); # 2. JSON 不可用时再回退 Markdown 结构解析,保持兼容。 json_layout_data = self._extract_template_json_data(summary_text) layout_data = json_layout_data or self._build_summary_layout_data(summary_text) metrics_data = self._build_summary_template_metrics( message_stats=message_stats, layout_data=layout_data, metadata=metadata, ) sections = layout_data.get("sections", []) or [] if json_layout_data and json_layout_data.get("topic_cards"): topic_cards = json_layout_data.get("topic_cards", [])[:5] else: topic_cards = self._build_topic_cards_from_sections(sections, limit=5) topic_titles = [card.get("title", "") for card in topic_cards] auxiliary_sections = self._build_auxiliary_sections(sections, topic_titles) if json_layout_data and isinstance(json_layout_data.get("named_modules"), dict): named_modules = json_layout_data.get("named_modules", {}) else: named_modules = self._build_template_named_modules(sections) resource_hub_items = self._build_resource_hub_items(named_modules.get("shared_resources", [])) # 说明: # 1. 这里注入“本地字体 CSS”到模板,避免依赖 Google Fonts 等外网资源; # 2. 字体文件统一从仓库根目录 fonts/ 下读取,便于部署时统一管理; # 3. 若字体文件缺失,自动回退系统字体,不影响功能可用性。 local_font_css = self._build_local_font_css() return renderer.render( self._summary_image_template_path, { "title": f"{group_name} 群聊总结", "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 兼容字段:保留给旧模板使用,新版 Gemini 模板已不再依赖该字段。 "summary_html": Markup(self._summary_markdown_to_html(summary_text)), "summary_doc_title": layout_data.get("document_title", ""), "summary_lead": layout_data.get("lead", ""), "summary_sections": layout_data.get("sections", []), "summary_topics": topic_cards, "summary_aux_sections": auxiliary_sections, "summary_shared_resources": named_modules.get("shared_resources", []), "summary_marketplace": named_modules.get("marketplace", []), "summary_unresolved_pool": named_modules.get("unresolved_pool", []), "summary_core_points": named_modules.get("core_points", []), "summary_top_contributors": named_modules.get("top_contributors", []), "summary_resource_hub": resource_hub_items, "summary_fallback_text": layout_data.get("fallback_text", ""), "summary_metrics": metrics_data, "local_font_css": Markup(local_font_css), }, ) def _build_local_font_css(self) -> str: """构建模板可注入的本地字体 CSS。""" # 说明: # 1. 使用绝对 file:/// URI,确保 Playwright set_content 场景下也能正确定位字体文件; # 2. 按存在性逐个注册字体,避免硬编码导致不存在文件时报错; # 3. 仅返回 CSS 字符串,不在此处抛异常,保证模板渲染链路稳定。 try: project_root = Path(__file__).resolve().parents[2] font_dir = project_root / "fonts" # 说明: # 1. 当前策略切换为“线上字体优先”,Inter / JetBrains Mono 交给模板的 Google Fonts 加载; # 2. 本地仅保留中文兜底字体,避免外网不可达时中文显示异常。 simhei_path = font_dir / "simhei.ttf" simsun_path = font_dir / "simsun.ttf" css_parts: List[str] = [] # 中文兜底字体,保证 CJK 展示稳定。 if simhei_path.exists(): css_parts.append( "@font-face {" "font-family: 'ABotSimHei'; " f"src: url('{simhei_path.resolve().as_uri()}') format('truetype'); " "font-weight: 400 900; " "font-style: normal; " "font-display: swap; " "}" ) if simsun_path.exists(): css_parts.append( "@font-face {" "font-family: 'ABotSimSun'; " f"src: url('{simsun_path.resolve().as_uri()}') format('truetype'); " "font-weight: 400 700; " "font-style: normal; " "font-display: swap; " "}" ) # 说明: # 1. 通过 CSS 变量统一字体栈,模板端只需引用变量即可; # 2. 先使用线上字体名,再回退本地中文与系统字体,兼顾一致性和容错性。 css_parts.append( ":root { " "--abot-font-sans: 'Inter', 'ABotSimHei', 'ABotSimSun', 'PingFang SC', " "'Microsoft YaHei', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; " "--abot-font-code: 'JetBrains Mono', 'Cascadia Mono', 'Consolas', 'SFMono-Regular', Menlo, monospace; " "}" ) return "\n".join(css_parts) except Exception as e: self.LOG.warning(f"构建本地字体 CSS 失败,回退系统字体: {e}") return "" async def _render_summary_image( self, answer: str, group_name: str, message_stats: Optional[Dict[str, Any]], metadata: Optional[Dict[str, Any]], output_filename: str, total_timeout: int, ) -> str: """生成总结图片:模板优先,失败自动回退 Markdown。""" image_root = Path("temp") / "md2image" image_root.mkdir(parents=True, exist_ok=True) output_path = image_root / output_filename # 优先模板渲染,便于后续仅改 HTML 文件完成样式迭代。 if self._summary_image_mode != "markdown": try: html_content = self._render_summary_template_html( group_name=group_name, summary_text=answer, message_stats=message_stats, metadata=metadata, ) await asyncio.wait_for(html_to_image(html_content, str(output_path)), timeout=total_timeout) if not output_path.exists() or output_path.stat().st_size < 1024: raise RuntimeError("模板截图输出文件异常") return str(output_path.resolve()) except Exception as template_error: self.LOG.warning(f"模板渲染失败,回退 Markdown 转图: {template_error}") # 回退逻辑:保持旧链路,确保模板异常时仍能发送图片。 return await asyncio.wait_for( convert_md_str_to_image( answer, output_filename, max_retries=self._image_render_retries, render_timeout_seconds=self._image_render_timeout_seconds, html_timeout_seconds=min(30, self._image_render_timeout_seconds), ), timeout=total_timeout, ) def _stringify_output(self, value: Any) -> str: """把 workflow 输出统一转成文本""" if value is None: return "" if isinstance(value, str): return value.strip() if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False) return str(value).strip() def _parse_workflow_response(self, response_data: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: """解析 Dify workflow 响应""" payload = (response_data or {}).get("data", {}) or {} status = str(payload.get("status", "")).strip().lower() if status and status not in {"succeeded", "success", "partial-succeeded"}: error_message = payload.get("error") or response_data.get("message") or f"workflow status={status}" raise RuntimeError(str(error_message)) outputs = payload.get("outputs", {}) or {} answer = "" if self._workflow_output_key and outputs.get(self._workflow_output_key) is not None: answer = self._stringify_output(outputs.get(self._workflow_output_key)) elif outputs.get("text") is not None: answer = self._stringify_output(outputs.get("text")) elif outputs.get("answer") is not None: answer = self._stringify_output(outputs.get("answer")) elif outputs.get("result_json") is not None: answer = self._stringify_output(outputs.get("result_json")) elif outputs.get("result") is not None: answer = self._stringify_output(outputs.get("result")) else: for value in outputs.values(): answer = self._stringify_output(value) if answer: break metadata = { "usage": { "total_tokens": payload.get("total_tokens", 0), "latency": payload.get("elapsed_time", 0), } } return answer, metadata def _get_revoke_manager(self) -> Optional[MessageAutoRevoke]: """优先使用消息上下文中的撤回器,定时任务场景则懒初始化一个""" if self.revoke: return self.revoke if self._auto_revoke is None and self.bot: self._auto_revoke = MessageAutoRevoke(self.bot) return self._auto_revoke async def _send_text_with_revoke(self, target: str, content: str, delay: int) -> Tuple[int, int, int]: """发送文本并按需登记自动撤回""" client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, content) revoke_manager = self._get_revoke_manager() if revoke_manager: revoke_manager.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, delay) return client_msg_id, create_time, new_msg_id def _save_daily_summary_record( self, group_id: str, group_name: str, period_start: datetime, period_end: datetime, message_count: int, summary_text: str, image_path: Optional[str], ) -> bool: """保存每日总结到数据库""" if not self.message_summary_db: self.LOG.warning("群总结数据库未初始化,跳过总结入库") return False summary_record = { "chatroom_id": group_id, "group_name": group_name, "summary_type": "daily", "period_key": period_start.strftime("%Y-%m-%d"), "period_start": period_start.strftime("%Y-%m-%d %H:%M:%S"), "period_end": period_end.strftime("%Y-%m-%d %H:%M:%S"), "source_message_count": message_count, "summary_text": summary_text or "", "image_path": image_path, "meta": { "source": "message_summary_plugin", "summary_date": period_start.strftime("%Y-%m-%d"), "has_image": bool(image_path), }, } saved = self.message_summary_db.save_summary(summary_record) if saved: self.LOG.info(f"群总结入库成功: group_id={group_id}, period_key={summary_record['period_key']}") else: self.LOG.error(f"群总结入库失败: group_id={group_id}, period_key={summary_record['period_key']}") return saved async def _generate_summary( self, chat_content: str, group_name: str, message_stats: Optional[Dict[str, int]] = None, ) -> Tuple[str, Optional[str]]: """生成总结""" content_compress = chat_content try: content_compress = compress_chat_data(chat_content) self.LOG.info(f"压缩内容成功:{len(content_compress)}--{len(chat_content)}") except Exception as e: self.LOG.error(f"压缩内容失败:{e}") prompt = f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}" inputs = { "query": prompt, "group_name": group_name, "chat_content": content_compress, # 传给 Dify 工作流做提示词分流: # - markdown: 走历史 Markdown 风格提示词; # - template: 走模板友好的紧凑结构提示词。 "summary_mode": self._summary_image_mode, } self.LOG.info(f"群聊总结请求准备: group={group_name}, mode={self._api_mode}, response_mode={self._response_mode}") max_retries = len(self._retry_delays_seconds) + 1 for attempt in range(1, max_retries + 1): try: response = await asyncio.to_thread( self.llm_client.run, prompt, group_name if group_name is not None else "message_summary_bot", inputs, f"message_summary:{group_name}", ) if not response or not response.get("text"): raise RuntimeError(self.llm_client.last_error or "LLM 未返回有效总结内容") answer = self._clean_summary_output(response.get("text", "")) metadata = {"usage": response.get("usage", {}) or {}} spath = "" # 说明: # 1. 模板模式下统计信息由卡片呈现,不再额外拼接“群概览”文本块; # 2. markdown 模式仍保留旧逻辑,保证兼容历史输出。 if self._summary_image_mode == "markdown": answer = self._prepend_stats_section(answer, message_stats or {}) answer = self._append_usage_info(answer, metadata) if answer and len(answer.strip()) > 0: try: timestamp = int(time.time()) output_path = f"summary_{timestamp}.png" self.LOG.info(f"开始生成图片: {output_path}") # 额外包一层总超时,确保就算底层依赖异常也不会把整个任务拖住。 total_timeout = max(30, self._image_render_timeout_seconds * self._image_render_retries + 10) # 统一走图片渲染入口:优先模板,失败自动降级到旧的 Markdown 转图链路。 spath = await self._render_summary_image( answer=answer, group_name=group_name, message_stats=message_stats or {}, metadata=metadata, output_filename=output_path, total_timeout=total_timeout, ) self.LOG.info(f"成功生成图片: {spath}") except Exception as e: self.LOG.error(f"生成图片失败: {e}", exc_info=True) max_length = 2000 if len(answer) > max_length: answer = answer[:max_length] + "\n\n... (内容过长,已截断)" self.LOG.info("图片生成失败,将发送文本消息作为备选方案") spath = None else: spath = None return answer, spath except Exception as e: self.LOG.error(f"处理总结时出现未知错误: attempt={attempt}/{max_retries}, error={e}") if attempt < max_retries: delay = self._retry_delays_seconds[attempt - 1] self.LOG.warning(f"群总结生成失败,准备重试: attempt={attempt}/{max_retries}, delay={delay}s") await asyncio.sleep(delay) return "生成总结时出错", None @staticmethod def _prepend_stats_section(summary: str, message_stats: Dict[str, int]) -> str: stats = message_stats or {} if not summary or not summary.strip(): return summary pairs = [ ("总", int(stats.get("total_count") or 0)), ("人数", int(stats.get("participant_count") or 0)), ("文本", int(stats.get("text_count") or 0)), ("图片", int(stats.get("image_count") or 0)), ("视频", int(stats.get("video_count") or 0)), ("链接", int(stats.get("link_count") or 0)), ("表情", int(stats.get("emoji_count") or 0)), ] stats_line = " · ".join([f"**{label}** {value}" for label, value in pairs]) section_lines = [ "## 群概览", stats_line, ] section = "\n".join(section_lines) return f"{section}\n\n{summary.strip()}" async def daily_summary_job( self, target_groups: Optional[List[str]] = None, min_message_count: int = 100, ) -> Dict[str, Any]: """定时任务:总结指定群在昨日的聊天信息并推送。 Args: target_groups: 目标群列表;为空时自动取所有已开启权限的群。 min_message_count: 触发总结的最低消息条数阈值。 Returns: dict: 执行统计信息,便于后台调度日志展示。 """ try: if not self.bot: self.LOG.warning("每日聊天总结任务跳过:bot 尚未注入") return {"total_groups": 0, "sent_groups": 0, "failed_groups": 0, "skipped_groups": 0} self.LOG.info("开始执行每日聊天总结任务") # 计算昨天的时间范围 yesterday = datetime.now() - timedelta(days=1) yesterday_start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) yesterday_end = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999) self.LOG.info( f"总结时间范围: {yesterday_start.strftime('%Y-%m-%d %H:%M:%S')} 至 {yesterday_end.strftime('%Y-%m-%d %H:%M:%S')}") # 调度动作支持“全部启用群”与“指定群”两种模式;指定群也会再做权限校验,防止误推送。 if target_groups: enabled_groups = [] for group_id in target_groups: if GroupBotManager.get_group_permission(group_id, self.feature) == PermissionStatus.ENABLED: enabled_groups.append(group_id) self.LOG.info(f"按指定目标群执行总结,输入={len(target_groups)},权限通过={len(enabled_groups)}") else: all_groups = GroupBotManager.get_group_list() if not all_groups: self.LOG.info("没有群聊启用群机器人,跳过定时总结") return {"total_groups": 0, "sent_groups": 0, "failed_groups": 0, "skipped_groups": 0} enabled_groups = [] for group_id in all_groups: if GroupBotManager.get_group_permission(group_id, self.feature) == PermissionStatus.ENABLED: enabled_groups.append(group_id) if not enabled_groups: self.LOG.info("没有群聊开启定时总结功能,跳过") return {"total_groups": 0, "sent_groups": 0, "failed_groups": 0, "skipped_groups": 0} self.LOG.info(f"找到 {len(enabled_groups)} 个开启定时总结的群聊") sent_groups = 0 failed_groups = 0 skipped_groups = 0 # 为每个群生成总结 for group_id in enabled_groups: try: # 先统计昨天的消息数量 message_count = self.message_storage.count_messages_by_date_range( group_id, yesterday_start, yesterday_end ) # 消息低于阈值时跳过,阈值可由后台 payload 配置。 if message_count < min_message_count: self.LOG.info(f"群 {group_id} 昨天只有 {message_count} 条消息,不足{min_message_count}条,跳过总结") skipped_groups += 1 continue self.LOG.info(f"群 {group_id} 昨天有 {message_count} 条消息,开始获取内容") message_stats = self.message_storage.get_message_stats_by_date_range( group_id, yesterday_start, yesterday_end, ) # 获取群成员信息 group_members = ContactManager.get_instance().get_group_members(group_id) # 获取群名 group_name = ContactManager.get_instance().get_nickname(group_id) group_name = self._sanitize_group_name(group_name) # 获取昨天的聊天记录 chat_content = self.message_storage.get_messages_by_date_range( group_id, group_members, yesterday_start, yesterday_end ) if not chat_content: self.LOG.info(f"群 {group_id} 昨天聊天记录为空,跳过总结") continue self.LOG.info( f"开始为群 {group_name} 生成总结,消息数量: {message_count},内容长度: {len(chat_content)}") # 发送提示消息 await self._send_text_with_revoke( group_id, f"⏳ 正在生成 [{yesterday.strftime('%Y-%m-%d')}]聊天总结… 😊", 5 ) # 生成总结 summary, image_path = await self._generate_summary( chat_content, group_name, message_stats=message_stats, ) if image_path: # 图片生成成功,发送图片 await self.bot.send_image_message(group_id, Path(image_path)) self._save_daily_summary_record( group_id, group_name, yesterday_start, yesterday_end, message_count, summary, str(image_path), ) self.LOG.info(f"成功发送群 {group_name} 的昨日总结图片") sent_groups += 1 else: # 图片生成失败,发送文本消息 if summary and len(summary.strip()) > 0: max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" if summary.strip() == "生成总结时出错": await self._send_text_with_revoke(group_id, f"❌ [{yesterday.strftime('%Y-%m-%d')}] 聊天总结生成失败,请稍后再试", 5) self.LOG.warning(f"群 {group_name} 的昨日总结生成失败,已发送可撤回失败提醒") failed_groups += 1 else: await self.bot.send_text_message(group_id, summary) self._save_daily_summary_record( group_id, group_name, yesterday_start, yesterday_end, message_count, summary, None, ) self.LOG.info(f"成功发送群 {group_name} 的昨日总结文本") sent_groups += 1 else: await self._send_text_with_revoke(group_id, f"❌ [{yesterday.strftime('%Y-%m-%d')}] 聊天总结生成失败,请稍后再试", 5) self.LOG.warning(f"群 {group_name} 的昨日总结无有效内容,已发送可撤回失败提醒") failed_groups += 1 # 避免请求过快 await asyncio.sleep(2) except Exception as e: self.LOG.error(f"为群 {group_id} 生成昨日总结失败: {e}", exc_info=True) failed_groups += 1 continue self.LOG.info("每日聊天总结任务执行完成") return { "total_groups": len(enabled_groups), "sent_groups": sent_groups, "failed_groups": failed_groups, "skipped_groups": skipped_groups, "min_message_count": min_message_count, } except Exception as e: self.LOG.error(f"每日聊天总结任务执行失败: {e}", exc_info=True) return {"total_groups": 0, "sent_groups": 0, "failed_groups": 1, "skipped_groups": 0}