import asyncio import html import json import re import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, Tuple, Optional, List from loguru import logger from markupsafe import Markup from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.message_summary_db import MessageSummaryDBOperator from utils.compress_chat_data import compress_chat_data from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.decorator.rate_limit_decorator import group_feature_rate_limit from utils.html_template_renderer import HtmlTemplateRenderer from utils.markdown_to_image import convert_md_str_to_image, html_to_image from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.string_utils import remove_reasoning_content, remove_trailing_content from utils.ai.unified_llm import UnifiedLLMClient from utils.wechat.contact_manager import ContactManager from utils.wechat.message_to_db import MessageStorage from wechat_ipad import WechatAPIClient class MessageSummaryPlugin(MessagePluginInterface): """消息总结插件,用于生成群聊消息总结""" # 功能权限常量 FEATURE_KEY = "SUMMARY_CAPABILITY" FEATURE_DESCRIPTION = "📝 群总结能力 [#总结]" @property def name(self) -> str: return "群聊总结" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "使用AI生成群聊消息总结" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "#" @property def commands(self) -> List[str]: return ["总结", "summary"] @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.message_storage = None self.message_summary_db = None self.revoke = None self._auto_revoke = None # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" try: # 从插件配置中获取API密钥和URL api_config = self._config.get("api", {}) self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo") self._api_base_url = str(api_config.get("api_base_url", "http://192.168.2.240/v1")).rstrip("/") self._api_mode = str(api_config.get("mode", "workflow")).strip().lower() default_endpoint = "workflows/run" if self._api_mode == "workflow" else "chat-messages" self._api_endpoint = str(api_config.get("endpoint", default_endpoint)).lstrip("/") self._api_url = api_config.get("api_url", f"{self._api_base_url}/{self._api_endpoint}") self._workflow_output_key = str(api_config.get("workflow_output_key", "text")).strip() self._response_mode = api_config.get("response_mode", "blocking") self._connect_timeout_seconds = int(api_config.get("connect_timeout_seconds", 10)) self._request_timeout_seconds = int(api_config.get("request_timeout_seconds", 180)) self._retry_delays_seconds = api_config.get("retry_delays_seconds", [10, 20]) # 输出阶段超时与体积保护:防止 Markdown 转图在异常环境下长时间卡死。 output_config = self._config.get("output", {}) self._image_render_timeout_seconds = int(output_config.get("image_render_timeout_seconds", 90)) # 默认只尝试 1 次,优先保证任务快速返回;需要更高成功率可在配置里提高。 self._image_render_retries = int(output_config.get("image_render_retries", 1)) # 输出模板配置: # 1. 默认使用 markdown,优先保证线上样式稳定; # 2. 若配置为 template 则走 HTML 模板生图; # 3. template 失败会自动回退到 markdown 模式,保证可用性。 self._summary_image_mode = str(output_config.get("summary_image_mode", "markdown")).strip().lower() self._summary_image_template_path = str( output_config.get("summary_image_template_path", "plugins/message_summary/templates/summary_card.html") ).strip() self.llm_client = UnifiedLLMClient(api_config) self._api_mode = self.llm_client.mode or self._api_mode self._response_mode = self.llm_client.response_mode or self._response_mode self._workflow_output_key = self.llm_client.workflow_output_key or self._workflow_output_key self.message_storage = MessageStorage() db_manager = context.get("db_manager") if db_manager: self.message_summary_db = MessageSummaryDBOperator(db_manager) self.LOG.debug(f"初始化 {self.name} 插件成功") return True except Exception as e: if hasattr(self, 'LOG'): self.LOG.error(f"初始化 {self.name} 插件失败: {e}") else: print(f"初始化 {self.name} 插件失败: {e}") return False def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True @plugin_stats_decorator(plugin_name="群聊总结") @plugin_points_cost(10, "群聊总结消耗积分", FEATURE_KEY) @group_feature_rate_limit(max_per_minute=30, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" try: # 检查是否是总结命令 content = str(message.get("content", "") or "").strip() self.bot: WechatAPIClient = message.get("bot") if not content.startswith(self.command_prefix): return False, None command_parts = content[len(self.command_prefix):].split() if not command_parts: return False, None command = command_parts[0] if command not in self.commands: return False, None # 获取需要总结的内容 group_id = message.get("roomid") self.revoke: MessageAutoRevoke = message.get("revoke") if not group_id: await self.bot.send_text_message(group_id, "只支持群聊消息总结", message.get("sender")) return False, None # 权限判断 gbm: GroupBotManager = message.get("gbm") if gbm and gbm.get_group_permission(group_id, self.feature) == PermissionStatus.DISABLED: return False, None # 从消息历史中获取群聊记录 all_contacts: dict = message.get("all_contacts") group_members: dict = ContactManager.get_instance().get_group_members(group_id) custom_date = self._parse_summary_date(command_parts[1] if len(command_parts) > 1 else "") if len(command_parts) > 1 and custom_date is None: await self.bot.send_text_message(group_id, "日期格式不对,用 `#总结 2026-04-08` 这种。", message.get("sender")) return False, None period_start, period_end = self._resolve_manual_summary_range(custom_date) message_count = self.message_storage.count_messages_by_date_range(group_id, period_start, period_end) message_stats = self.message_storage.get_message_stats_by_date_range(group_id, period_start, period_end) if message_count < 100: await self.bot.send_text_message( group_id, f"这段时间消息太少了,只有 {message_count} 条,不总结了。", message.get("sender"), ) return False, None chat_content = self.message_storage.get_messages_by_date_range(group_id, group_members, period_start, period_end) if not chat_content: await self.bot.send_text_message(group_id, "这段时间没捞到可总结的聊天记录。", message.get("sender")) return False, None # 获取群名并处理 group_name = all_contacts.get(group_id, group_id) group_name = self._sanitize_group_name(group_name) period_label = self._format_summary_period_label(period_start, period_end, custom_date is not None) client_msg_id, create_time, new_msg_id = await self.bot.send_text_message( group_id, f"⏳ 正在总结 [{period_label}] 的群消息… 😊", ) self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) # 创建线程异步处理总结生成和发送 res = await self._async_generate_and_send_summary( chat_content, group_name, group_id, message, period_start=period_start, period_end=period_end, message_count=message_count, message_stats=message_stats, ) if res: return True, "异步总结已启动" else: return False, "总结失败" except Exception as e: self.LOG.error(f"处理消息总结命令失败: {e}") return False, None def get_schedule_actions(self) -> List[Dict[str, Any]]: """声明群总结插件支持的后台可配置定时动作。""" return [ { "action_key": "daily_summary", "name": "昨日群聊总结推送", "description": "每天自动总结昨天群聊内容并发送到目标群", "trigger_type": "at_times", "trigger_config": {"time_list": ["09:00"]}, "target_scope": "all_enabled_groups", "target_config": {}, "payload": {"min_messages": 100}, # 保持与原先硬编码任务一致:默认启用。 "default_enabled": True, } ] async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]: """执行群总结定时动作,返回结构化执行结果。""" if action_key != "daily_summary": return { "success": False, "summary": f"不支持的动作: {action_key}", "detail": {"action_key": action_key}, } payload = context.get("payload") or {} # 使用后台可配置参数控制最低消息阈值,避免写死在代码里。 min_messages = int(payload.get("min_messages", 100)) target_groups = context.get("target_groups") or [] result = await self.daily_summary_job( target_groups=[str(g).strip() for g in target_groups if str(g).strip()], min_message_count=min_messages, ) return { "success": bool(result.get("failed_groups", 0) == 0), "summary": ( f"群总结完成: 目标{result.get('total_groups', 0)}群," f"发送成功{result.get('sent_groups', 0)}群,失败{result.get('failed_groups', 0)}群" ), "detail": result, } async def _async_generate_and_send_summary( self, chat_content: str, group_name: str, group_id: str, message: Dict[str, Any], period_start: Optional[datetime] = None, period_end: Optional[datetime] = None, message_count: Optional[int] = None, message_stats: Optional[Dict[str, int]] = None, ): """异步生成并发送总结""" try: # 生成总结 summary, image_path = await self._generate_summary(chat_content, group_name, message_stats=message_stats) if image_path: # 图片生成成功,发送图片 await self.bot.send_image_message(group_id, Path(image_path)) if period_start and period_end and message_count is not None: self._save_daily_summary_record( group_id, group_name, period_start, period_end, message_count, summary, str(image_path), ) self.LOG.info(f"成功发送图片总结到群 {group_id}") return True else: # 图片生成失败,发送文本消息 if summary and len(summary.strip()) > 0: # 截断过长的文本 max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, summary) self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 30) if period_start and period_end and message_count is not None: self._save_daily_summary_record( group_id, group_name, period_start, period_end, message_count, summary, None, ) self.LOG.info(f"图片生成失败,已发送文本总结到群 {group_id}") return True else: # 连文本内容都没有 client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, "❌ 生成总结失败,请稍后再试!") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) return False except Exception as e: self.LOG.error(f"异步生成总结失败: {e}") client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, f"❌ 生成总结失败,请稍后再试") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) return False @staticmethod def _parse_summary_date(raw: str) -> Optional[datetime]: text = str(raw or "").strip() if not text: return None try: return datetime.strptime(text, "%Y-%m-%d") except ValueError: return None @staticmethod def _resolve_manual_summary_range(custom_date: Optional[datetime]) -> Tuple[datetime, datetime]: now = datetime.now() if custom_date is not None: day_start = custom_date.replace(hour=0, minute=0, second=0, microsecond=0) day_end = custom_date.replace(hour=23, minute=59, second=59, microsecond=999999) return day_start, day_end yesterday = now - timedelta(days=1) start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) return start, now @staticmethod def _format_summary_period_label(start_time: datetime, end_time: datetime, is_custom_day: bool) -> str: if is_custom_day: return start_time.strftime("%Y-%m-%d") return f"{start_time.strftime('%Y-%m-%d 00:00')} 到现在" def _sanitize_group_name(self, group_name: str) -> str: """处理群名,去除特殊字符并限制长度""" # 去除特殊字符,只保留字母、数字、中文和基本标点 sanitized_name = re.sub(r'[^\w\s\u4e00-\u9fff,.,。]', '', group_name) # 限制长度为15个字符 if len(sanitized_name) > 15: sanitized_name = sanitized_name[:15] # 如果处理后为空,则使用默认名称 if not sanitized_name: sanitized_name = "群聊" return sanitized_name def _append_usage_info(self, answer: str, metadata: Dict[str, Any]) -> str: """把 token 统计追加到总结文本末尾""" if not answer or not answer.strip(): return answer usage = metadata.get("usage", {}) if metadata else {} if not usage: return answer prompt_tokens = usage.get("prompt_tokens") completion_tokens = usage.get("completion_tokens") total_tokens = usage.get("total_tokens") latency = usage.get("latency") info_parts = [] # chat-messages 风格的明细 if prompt_tokens is not None and completion_tokens is not None: info_parts.append(f"输入: {prompt_tokens}") info_parts.append(f"生成: {completion_tokens}") # workflow 常见只有 total_tokens if total_tokens is not None: info_parts.append(f"总: {total_tokens}") if latency is not None: info_parts.append(f"耗时: {latency}s") if not info_parts: return answer tokens_info = "\n\n【tokens】" + " ".join(info_parts) return answer + tokens_info def _clean_summary_output(self, answer: str) -> str: """清理总结输出中的思考内容和无关尾部内容""" if not answer: return answer cleaned = remove_reasoning_content(answer) cleaned = remove_trailing_content(cleaned) cleaned = re.sub(r'\n{3,}', '\n\n', cleaned).strip() return cleaned @staticmethod def _summary_markdown_to_html(summary_text: str) -> str: """把总结 Markdown 转为基础 HTML 片段(模板内部展示用)。""" # 这里不依赖第三方 markdown 库,保证在最小运行环境也能稳定渲染。 # 规则按“标题/列表/段落”三类做轻量转换,足够覆盖总结文本场景。 lines = str(summary_text or "").splitlines() html_parts: List[str] = [] list_items: List[str] = [] def flush_list() -> None: if not list_items: return html_parts.append("
{html.escape(line)}
") flush_list() return "".join(html_parts) def _render_summary_template_html(self, group_name: str, summary_text: str) -> str: """根据模板路径渲染总结图片 HTML。""" # 约束:模板只负责展示,正文仍然由模型生成并在此做安全转义后注入。 renderer = HtmlTemplateRenderer() summary_html = self._summary_markdown_to_html(summary_text) return renderer.render( self._summary_image_template_path, { "title": f"{group_name} 群聊总结", "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "summary_html": Markup(summary_html), }, ) async def _render_summary_image( self, answer: str, group_name: str, output_filename: str, total_timeout: int, ) -> str: """生成总结图片:模板优先,失败自动回退 Markdown。""" image_root = Path("temp") / "md2image" image_root.mkdir(parents=True, exist_ok=True) output_path = image_root / output_filename # 优先模板渲染,便于后续仅改 HTML 文件完成样式迭代。 if self._summary_image_mode != "markdown": try: html_content = self._render_summary_template_html(group_name=group_name, summary_text=answer) await asyncio.wait_for(html_to_image(html_content, str(output_path)), timeout=total_timeout) if not output_path.exists() or output_path.stat().st_size < 1024: raise RuntimeError("模板截图输出文件异常") return str(output_path.resolve()) except Exception as template_error: self.LOG.warning(f"模板渲染失败,回退 Markdown 转图: {template_error}") # 回退逻辑:保持旧链路,确保模板异常时仍能发送图片。 return await asyncio.wait_for( convert_md_str_to_image( answer, output_filename, max_retries=self._image_render_retries, render_timeout_seconds=self._image_render_timeout_seconds, html_timeout_seconds=min(30, self._image_render_timeout_seconds), ), timeout=total_timeout, ) def _stringify_output(self, value: Any) -> str: """把 workflow 输出统一转成文本""" if value is None: return "" if isinstance(value, str): return value.strip() if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False) return str(value).strip() def _parse_workflow_response(self, response_data: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: """解析 Dify workflow 响应""" payload = (response_data or {}).get("data", {}) or {} status = str(payload.get("status", "")).strip().lower() if status and status not in {"succeeded", "success", "partial-succeeded"}: error_message = payload.get("error") or response_data.get("message") or f"workflow status={status}" raise RuntimeError(str(error_message)) outputs = payload.get("outputs", {}) or {} answer = "" if self._workflow_output_key and outputs.get(self._workflow_output_key) is not None: answer = self._stringify_output(outputs.get(self._workflow_output_key)) elif outputs.get("text") is not None: answer = self._stringify_output(outputs.get("text")) elif outputs.get("answer") is not None: answer = self._stringify_output(outputs.get("answer")) elif outputs.get("result_json") is not None: answer = self._stringify_output(outputs.get("result_json")) elif outputs.get("result") is not None: answer = self._stringify_output(outputs.get("result")) else: for value in outputs.values(): answer = self._stringify_output(value) if answer: break metadata = { "usage": { "total_tokens": payload.get("total_tokens", 0), "latency": payload.get("elapsed_time", 0), } } return answer, metadata def _get_revoke_manager(self) -> Optional[MessageAutoRevoke]: """优先使用消息上下文中的撤回器,定时任务场景则懒初始化一个""" if self.revoke: return self.revoke if self._auto_revoke is None and self.bot: self._auto_revoke = MessageAutoRevoke(self.bot) return self._auto_revoke async def _send_text_with_revoke(self, target: str, content: str, delay: int) -> Tuple[int, int, int]: """发送文本并按需登记自动撤回""" client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, content) revoke_manager = self._get_revoke_manager() if revoke_manager: revoke_manager.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, delay) return client_msg_id, create_time, new_msg_id def _save_daily_summary_record( self, group_id: str, group_name: str, period_start: datetime, period_end: datetime, message_count: int, summary_text: str, image_path: Optional[str], ) -> bool: """保存每日总结到数据库""" if not self.message_summary_db: self.LOG.warning("群总结数据库未初始化,跳过总结入库") return False summary_record = { "chatroom_id": group_id, "group_name": group_name, "summary_type": "daily", "period_key": period_start.strftime("%Y-%m-%d"), "period_start": period_start.strftime("%Y-%m-%d %H:%M:%S"), "period_end": period_end.strftime("%Y-%m-%d %H:%M:%S"), "source_message_count": message_count, "summary_text": summary_text or "", "image_path": image_path, "meta": { "source": "message_summary_plugin", "summary_date": period_start.strftime("%Y-%m-%d"), "has_image": bool(image_path), }, } saved = self.message_summary_db.save_summary(summary_record) if saved: self.LOG.info(f"群总结入库成功: group_id={group_id}, period_key={summary_record['period_key']}") else: self.LOG.error(f"群总结入库失败: group_id={group_id}, period_key={summary_record['period_key']}") return saved async def _generate_summary( self, chat_content: str, group_name: str, message_stats: Optional[Dict[str, int]] = None, ) -> Tuple[str, Optional[str]]: """生成总结""" content_compress = chat_content try: content_compress = compress_chat_data(chat_content) self.LOG.info(f"压缩内容成功:{len(content_compress)}--{len(chat_content)}") except Exception as e: self.LOG.error(f"压缩内容失败:{e}") prompt = f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}" inputs = { "query": prompt, "group_name": group_name, "chat_content": content_compress, } self.LOG.info(f"群聊总结请求准备: group={group_name}, mode={self._api_mode}, response_mode={self._response_mode}") max_retries = len(self._retry_delays_seconds) + 1 for attempt in range(1, max_retries + 1): try: response = await asyncio.to_thread( self.llm_client.run, prompt, group_name if group_name is not None else "message_summary_bot", inputs, f"message_summary:{group_name}", ) if not response or not response.get("text"): raise RuntimeError(self.llm_client.last_error or "LLM 未返回有效总结内容") answer = self._clean_summary_output(response.get("text", "")) metadata = {"usage": response.get("usage", {}) or {}} spath = "" answer = self._prepend_stats_section(answer, message_stats or {}) answer = self._append_usage_info(answer, metadata) if answer and len(answer.strip()) > 0: try: timestamp = int(time.time()) output_path = f"summary_{timestamp}.png" self.LOG.info(f"开始生成图片: {output_path}") # 额外包一层总超时,确保就算底层依赖异常也不会把整个任务拖住。 total_timeout = max(30, self._image_render_timeout_seconds * self._image_render_retries + 10) # 统一走图片渲染入口:优先模板,失败自动降级到旧的 Markdown 转图链路。 spath = await self._render_summary_image( answer=answer, group_name=group_name, output_filename=output_path, total_timeout=total_timeout, ) self.LOG.info(f"成功生成图片: {spath}") except Exception as e: self.LOG.error(f"生成图片失败: {e}", exc_info=True) max_length = 2000 if len(answer) > max_length: answer = answer[:max_length] + "\n\n... (内容过长,已截断)" self.LOG.info("图片生成失败,将发送文本消息作为备选方案") spath = None else: spath = None return answer, spath except Exception as e: self.LOG.error(f"处理总结时出现未知错误: attempt={attempt}/{max_retries}, error={e}") if attempt < max_retries: delay = self._retry_delays_seconds[attempt - 1] self.LOG.warning(f"群总结生成失败,准备重试: attempt={attempt}/{max_retries}, delay={delay}s") await asyncio.sleep(delay) return "生成总结时出错", None @staticmethod def _prepend_stats_section(summary: str, message_stats: Dict[str, int]) -> str: stats = message_stats or {} if not summary or not summary.strip(): return summary pairs = [ ("总", int(stats.get("total_count") or 0)), ("人数", int(stats.get("participant_count") or 0)), ("文本", int(stats.get("text_count") or 0)), ("图片", int(stats.get("image_count") or 0)), ("视频", int(stats.get("video_count") or 0)), ("链接", int(stats.get("link_count") or 0)), ("表情", int(stats.get("emoji_count") or 0)), ] stats_line = " · ".join([f"**{label}** {value}" for label, value in pairs]) section_lines = [ "## 群概览", stats_line, ] section = "\n".join(section_lines) return f"{section}\n\n{summary.strip()}" async def daily_summary_job( self, target_groups: Optional[List[str]] = None, min_message_count: int = 100, ) -> Dict[str, Any]: """定时任务:总结指定群在昨日的聊天信息并推送。 Args: target_groups: 目标群列表;为空时自动取所有已开启权限的群。 min_message_count: 触发总结的最低消息条数阈值。 Returns: dict: 执行统计信息,便于后台调度日志展示。 """ try: if not self.bot: self.LOG.warning("每日聊天总结任务跳过:bot 尚未注入") return {"total_groups": 0, "sent_groups": 0, "failed_groups": 0, "skipped_groups": 0} self.LOG.info("开始执行每日聊天总结任务") # 计算昨天的时间范围 yesterday = datetime.now() - timedelta(days=1) yesterday_start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) yesterday_end = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999) self.LOG.info( f"总结时间范围: {yesterday_start.strftime('%Y-%m-%d %H:%M:%S')} 至 {yesterday_end.strftime('%Y-%m-%d %H:%M:%S')}") # 调度动作支持“全部启用群”与“指定群”两种模式;指定群也会再做权限校验,防止误推送。 if target_groups: enabled_groups = [] for group_id in target_groups: if GroupBotManager.get_group_permission(group_id, self.feature) == PermissionStatus.ENABLED: enabled_groups.append(group_id) self.LOG.info(f"按指定目标群执行总结,输入={len(target_groups)},权限通过={len(enabled_groups)}") else: all_groups = GroupBotManager.get_group_list() if not all_groups: self.LOG.info("没有群聊启用群机器人,跳过定时总结") return {"total_groups": 0, "sent_groups": 0, "failed_groups": 0, "skipped_groups": 0} enabled_groups = [] for group_id in all_groups: if GroupBotManager.get_group_permission(group_id, self.feature) == PermissionStatus.ENABLED: enabled_groups.append(group_id) if not enabled_groups: self.LOG.info("没有群聊开启定时总结功能,跳过") return {"total_groups": 0, "sent_groups": 0, "failed_groups": 0, "skipped_groups": 0} self.LOG.info(f"找到 {len(enabled_groups)} 个开启定时总结的群聊") sent_groups = 0 failed_groups = 0 skipped_groups = 0 # 为每个群生成总结 for group_id in enabled_groups: try: # 先统计昨天的消息数量 message_count = self.message_storage.count_messages_by_date_range( group_id, yesterday_start, yesterday_end ) # 消息低于阈值时跳过,阈值可由后台 payload 配置。 if message_count < min_message_count: self.LOG.info(f"群 {group_id} 昨天只有 {message_count} 条消息,不足{min_message_count}条,跳过总结") skipped_groups += 1 continue self.LOG.info(f"群 {group_id} 昨天有 {message_count} 条消息,开始获取内容") message_stats = self.message_storage.get_message_stats_by_date_range( group_id, yesterday_start, yesterday_end, ) # 获取群成员信息 group_members = ContactManager.get_instance().get_group_members(group_id) # 获取群名 group_name = ContactManager.get_instance().get_nickname(group_id) group_name = self._sanitize_group_name(group_name) # 获取昨天的聊天记录 chat_content = self.message_storage.get_messages_by_date_range( group_id, group_members, yesterday_start, yesterday_end ) if not chat_content: self.LOG.info(f"群 {group_id} 昨天聊天记录为空,跳过总结") continue self.LOG.info( f"开始为群 {group_name} 生成总结,消息数量: {message_count},内容长度: {len(chat_content)}") # 发送提示消息 await self._send_text_with_revoke( group_id, f"⏳ 正在生成 [{yesterday.strftime('%Y-%m-%d')}]聊天总结… 😊", 5 ) # 生成总结 summary, image_path = await self._generate_summary( chat_content, group_name, message_stats=message_stats, ) if image_path: # 图片生成成功,发送图片 await self.bot.send_image_message(group_id, Path(image_path)) self._save_daily_summary_record( group_id, group_name, yesterday_start, yesterday_end, message_count, summary, str(image_path), ) self.LOG.info(f"成功发送群 {group_name} 的昨日总结图片") sent_groups += 1 else: # 图片生成失败,发送文本消息 if summary and len(summary.strip()) > 0: max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" if summary.strip() == "生成总结时出错": await self._send_text_with_revoke(group_id, f"❌ [{yesterday.strftime('%Y-%m-%d')}] 聊天总结生成失败,请稍后再试", 5) self.LOG.warning(f"群 {group_name} 的昨日总结生成失败,已发送可撤回失败提醒") failed_groups += 1 else: await self.bot.send_text_message(group_id, summary) self._save_daily_summary_record( group_id, group_name, yesterday_start, yesterday_end, message_count, summary, None, ) self.LOG.info(f"成功发送群 {group_name} 的昨日总结文本") sent_groups += 1 else: await self._send_text_with_revoke(group_id, f"❌ [{yesterday.strftime('%Y-%m-%d')}] 聊天总结生成失败,请稍后再试", 5) self.LOG.warning(f"群 {group_name} 的昨日总结无有效内容,已发送可撤回失败提醒") failed_groups += 1 # 避免请求过快 await asyncio.sleep(2) except Exception as e: self.LOG.error(f"为群 {group_id} 生成昨日总结失败: {e}", exc_info=True) failed_groups += 1 continue self.LOG.info("每日聊天总结任务执行完成") return { "total_groups": len(enabled_groups), "sent_groups": sent_groups, "failed_groups": failed_groups, "skipped_groups": skipped_groups, "min_message_count": min_message_count, } except Exception as e: self.LOG.error(f"每日聊天总结任务执行失败: {e}", exc_info=True) return {"total_groups": 0, "sent_groups": 0, "failed_groups": 1, "skipped_groups": 0}