import asyncio import json import re import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, Tuple, Optional, List import aiohttp from aiohttp import ClientTimeout from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.compress_chat_data import compress_chat_data from utils.decorator.async_job import async_job from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.decorator.rate_limit_decorator import rate_limit from utils.markdown_to_image import convert_md_str_to_image from utils.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.string_utils import remove_trailing_content from utils.wechat.contact_manager import ContactManager from utils.wechat.message_to_db import MessageStorage from wechat_ipad import WechatAPI @plugin_stats_decorator class MessageSummaryPlugin(MessagePluginInterface): description = "消息总结" author = "Liu" version = "0.0.8" def __init__(self): super().__init__() self.plugin_name = "message_summary" self.feature = "summary" self.message_storage = MessageStorage.get_instance() self.revoke = MessageAutoRevoke.get_instance() self._api_url = None self._api_key = None async def initialize(self, bot: WechatAPI, config: dict) -> bool: self.bot = bot self.config = config self._api_url = config.get("api_url") self._api_key = config.get("api_key") if not self._api_url or not self._api_key: logger.error("Dify API配置缺失") return False return True @property def status(self) -> PluginStatus: return PluginStatus.ENABLED @property def keywords(self) -> List[str]: return ["总结", "summary"] @rate_limit(calls=1, period=10) @plugin_points_cost(cost=0) async def run(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: content = message.get("content", "") sender = message.get("sender") group_id = message.get("group_id") if not content or not group_id: return False, None if content.lstrip("/") not in self.keywords: return False, None gbm: GroupBotManager = message.get("gbm") if gbm and gbm.get_group_permission(group_id, self.feature) == PermissionStatus.DISABLED: return False, None self.LOG.info(f"收到群 {group_id} 总结请求") all_contacts: dict = message.get("all_contacts") group_members: dict = ContactManager.get_instance().get_group_members(group_id) chat_content = self.message_storage.get_messages(group_id, group_members) if len(chat_content) < 100: return False, None group_name = all_contacts.get(group_id, group_id) group_name = self._sanitize_group_name(group_name) res = await self._async_generate_and_send_summary(chat_content, group_name, group_id, sender) return True, res async def _async_generate_and_send_summary(self, chat_content: str, group_name: str, group_id: str, sender: str = None): try: summary, image_path = await self._generate_summary(chat_content, group_name) if image_path: await self.bot.send_image_message(group_id, Path(image_path)) return True else: if summary and len(summary.strip()) > 0: max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, summary) self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 10) return True client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, f"❌ 生成总结失败,请稍后再试") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) return False except Exception as e: self.LOG.error(f"生成或发送总结失败: {e}", exc_info=True) client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, f"❌ 生成总结失败,请稍后再试") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) return False def _sanitize_group_name(self, group_name: str) -> str: sanitized_name = re.sub(r'[^\w\s\u4e00-\u9fff,.,。]', '', group_name) if len(sanitized_name) > 15: sanitized_name = sanitized_name[:15] if not sanitized_name: sanitized_name = "群聊" return sanitized_name async def _generate_summary(self, chat_content: str, group_name: str) -> Tuple[str, Optional[str]]: """生成总结""" content_compress = chat_content try: content_compress = compress_chat_data(chat_content) self.LOG.info(f"压缩内容成功:{len(content_compress)}--{len(chat_content)}") except Exception as e: self.LOG.error(f"压缩内容失败:{e}") data = { "inputs": {}, "query": f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}", "response_mode": "blocking", "conversation_id": "", "user": group_name if group_name is not None else "message_summary_bot", "files": [] } self.LOG.info(f"群聊总结内容:{data}") headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json" } try: custom_timeout = ClientTimeout(total=None, connect=10, sock_read=300) conn = aiohttp.TCPConnector(keepalive_timeout=60) async with aiohttp.ClientSession(connector=conn, timeout=custom_timeout) as session: async with session.post(self._api_url, headers=headers, json=data) as response: response.raise_for_status() response_data = await response.json() self.LOG.info(f"Dify API响应状态码: {response.status}") self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") answer = response_data.get("answer", "") spath = "" metadata = response_data.get("metadata", {}) usage = metadata.get("usage", {}) if usage: prompt_tokens = usage.get("prompt_tokens", 0) completion_tokens = usage.get("completion_tokens", 0) total_tokens = usage.get("total_tokens", 0) tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}" answer += tokens_info try: timestamp = int(time.time()) output_path = f"summary_{timestamp}.png" self.LOG.info(f"开始生成图片: {output_path}") spath = await convert_md_str_to_image(answer, output_path) self.LOG.info(f"成功生成图片: {spath}") except Exception as e: self.LOG.error(f"生成图片失败: {e}", exc_info=True) try: max_length = 2000 if len(answer) > max_length: answer = answer[:max_length] + "\n\n... (内容过长,已截断)" self.LOG.info("图片生成失败,将发送文本消息作为备选方案") spath = None except Exception as fallback_error: self.LOG.error(f"备选文本发送也失败: {fallback_error}") spath = None return answer, spath except aiohttp.ClientError as e: self.LOG.error(f"请求Dify API时出错: {e}") return f"生成总结时出错", None except json.JSONDecodeError as e: self.LOG.error(f"解析Dify API响应时出错: {e}") return "解析API响应时出错", None except Exception as e: self.LOG.error(f"处理总结时出现未知错误: {e}") return f"生成总结时出现未知错误", None async def daily_summary_job(self): """定时任务:每天早上9点总结昨天的聊天信息""" try: self.LOG.info("开始执行每日聊天总结任务") yesterday = datetime.now() - timedelta(days=1) yesterday_start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) yesterday_end = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999) self.LOG.info( f"总结时间范围: {yesterday_start.strftime('%Y-%m-%d %H:%M:%S')} 至 {yesterday_end.strftime('%Y-%m-%d %H:%M:%S')}") all_groups = GroupBotManager.get_group_list() if not all_groups: self.LOG.info("没有群聊启用群机器人,跳过定时总结") return enabled_groups = [] for group_id in all_groups: if GroupBotManager.get_group_permission(group_id, self.feature) == PermissionStatus.ENABLED: enabled_groups.append(group_id) if not enabled_groups: self.LOG.info("没有群聊开启定时总结功能,跳过") return self.LOG.info(f"找到 {len(enabled_groups)} 个开启定时总结的群聊") for group_id in enabled_groups: try: message_count = self.message_storage.count_messages_by_date_range( group_id, yesterday_start, yesterday_end ) if message_count < 100: self.LOG.info(f"群 {group_id} 昨天只有 {message_count} 条消息,不足50条,跳过总结") continue self.LOG.info(f"群 {group_id} 昨天有 {message_count} 条消息,开始获取内容") group_members = ContactManager.get_instance().get_group_members(group_id) group_name = ContactManager.get_instance().get_nickname(group_id) group_name = self._sanitize_group_name(group_name) chat_content = self.message_storage.get_messages_by_date_range( group_id, group_members, yesterday_start, yesterday_end ) if not chat_content: self.LOG.info(f"群 {group_name} 昨天没有有效消息,跳过") continue self.LOG.info( f"获取到 {message_count} 条消息(时间范围:{yesterday_start.strftime('%Y-%m-%d %H:%M:%S')} 至 {yesterday_end.strftime('%Y-%m-%d %H:%M:%S')}),格式化后长度: {len(chat_content)}") self.LOG.info( f"开始为群 {group_name} 生成总结,消息数量: {message_count},内容长度: {len(chat_content)}") summary, image_path = await self._generate_summary(chat_content, group_name) if image_path: await self.bot.send_image_message(group_id, Path(image_path)) self.LOG.info(f"成功发送群 {group_name} 的昨日总结图片") else: if summary and len(summary.strip()) > 0: max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" await self.bot.send_text_message(group_id, summary) self.LOG.info(f"成功发送群 {group_name} 的昨日总结文本") await asyncio.sleep(2) except Exception as group_error: self.LOG.error(f"处理群 {group_id} 总结时出错: {group_error}", exc_info=True) continue self.LOG.info("每日聊天总结任务执行完成") except Exception as e: self.LOG.error(f"执行每日聊天总结任务时出错: {e}", exc_info=True)