import asyncio import json import re import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, Tuple, Optional, List import aiohttp from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.compress_chat_data import compress_chat_data from utils.decorator.async_job import async_job from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.decorator.rate_limit_decorator import group_feature_rate_limit from utils.markdown_to_image import convert_md_str_to_image from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.string_utils import remove_trailing_content from utils.wechat.contact_manager import ContactManager from utils.wechat.message_to_db import MessageStorage from wechat_ipad import WechatAPIClient class MessageSummaryPlugin(MessagePluginInterface): """消息总结插件,用于生成群聊消息总结""" # 功能权限常量 FEATURE_KEY = "SUMMARY_CAPABILITY" FEATURE_DESCRIPTION = "📝 群总结能力 [#总结]" @property def name(self) -> str: return "群聊总结" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "使用AI生成群聊消息总结" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "#" @property def commands(self) -> List[str]: return ["总结", "summary"] @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.message_storage = None self.revoke = None # 注册功能权限 self.feature = self.register_feature() # 注册定时任务:每天早上9点总结昨天的聊天信息 async_job.at_times(["09:00"])(self.daily_summary_job) def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" try: # 从插件配置中获取API密钥和URL api_config = self._config.get("api", {}) self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo") self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages") self.message_storage = MessageStorage() self.LOG.info(f"初始化 {self.name} 插件成功") return True except Exception as e: if hasattr(self, 'LOG'): self.LOG.error(f"初始化 {self.name} 插件失败: {e}") else: print(f"初始化 {self.name} 插件失败: {e}") return False def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True @plugin_stats_decorator(plugin_name="群聊总结") @plugin_points_cost(10, "群聊总结消耗积分", FEATURE_KEY) @group_feature_rate_limit(max_per_minute=1, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" try: # 检查是否是总结命令 content = message.get("content", "") self.bot: WechatAPIClient = message.get("bot") if not content.startswith(self.command_prefix): return False, None command = content[len(self.command_prefix):].split()[0] if command not in self.commands: return False, None # 获取需要总结的内容 group_id = message.get("roomid") self.revoke: MessageAutoRevoke = message.get("revoke") if not group_id: await self.bot.send_text_message(group_id, "只支持群聊消息总结", message.get("sender")) return False, None # 权限判断 gbm: GroupBotManager = message.get("gbm") if gbm and gbm.get_group_permission(group_id, self.feature) == PermissionStatus.DISABLED: return False, None # 从消息历史中获取群聊记录 all_contacts: dict = message.get("all_contacts") group_members: dict = ContactManager.get_instance().get_group_members(group_id) chat_content = self.message_storage.get_messages(group_id, group_members) if len(chat_content) < 100: return False, None # 获取群名并处理 group_name = all_contacts.get(group_id, group_id) group_name = self._sanitize_group_name(group_name) client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, "⏳群消息总结中… 😊") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) # 创建线程异步处理总结生成和发送 res = await self._async_generate_and_send_summary(chat_content, group_name, group_id, message) if res: return True, "异步总结已启动" else: return False, "总结失败" except Exception as e: self.LOG.error(f"处理消息总结命令失败: {e}") return False, None async def _async_generate_and_send_summary(self, chat_content: str, group_name: str, group_id: str, message: Dict[str, Any]): """异步生成并发送总结""" try: # 生成总结 summary, image_path = await self._generate_summary(chat_content, group_name) if image_path: # 图片生成成功,发送图片 await self.bot.send_image_message(group_id, Path(image_path)) self.LOG.info(f"成功发送图片总结到群 {group_id}") return True else: # 图片生成失败,发送文本消息 if summary and len(summary.strip()) > 0: # 截断过长的文本 max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, summary) self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 30) self.LOG.info(f"图片生成失败,已发送文本总结到群 {group_id}") return True else: # 连文本内容都没有 client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, "❌ 生成总结失败,请稍后再试!") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) return False except Exception as e: self.LOG.error(f"异步生成总结失败: {e}") client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, f"❌ 生成总结失败,请稍后再试") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) return False def _sanitize_group_name(self, group_name: str) -> str: """处理群名,去除特殊字符并限制长度""" # 去除特殊字符,只保留字母、数字、中文和基本标点 sanitized_name = re.sub(r'[^\w\s\u4e00-\u9fff,.,。]', '', group_name) # 限制长度为15个字符 if len(sanitized_name) > 15: sanitized_name = sanitized_name[:15] # 如果处理后为空,则使用默认名称 if not sanitized_name: sanitized_name = "群聊" return sanitized_name async def _generate_summary(self, chat_content: str, group_name: str) -> Tuple[str, Optional[str]]: """生成总结""" # Dify API配置 content_compress = chat_content try: content_compress = compress_chat_data(chat_content) self.LOG.info(f"压缩内容成功:{len(content_compress)}--{len(chat_content)}") except Exception as e: self.LOG.error(f"压缩内容失败:{e}") # 准备请求数据 data = { "inputs": {}, "query": f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}", "response_mode": "blocking", # 使用阻塞模式,直接获取完整响应 "conversation_id": "", "user": group_name if group_name is not None else "message_summary_bot", "files": [] # 不包含文件 } self.LOG.info(f"群聊总结内容:{data}") # 设置请求头 headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json" } try: async with aiohttp.ClientSession() as session: async with session.post(self._api_url, headers=headers, json=data) as response: response.raise_for_status() # 检查请求是否成功 response_data = await response.json() self.LOG.info(f"Dify API响应状态码: {response.status}") self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") # 提取回答内容 answer = response_data.get("answer", "") # 去除广告内容pollinations.ai 的广告 # answer = remove_trailing_content(answer) spath = "" # 提取token使用情况 metadata = response_data.get("metadata", {}) usage = metadata.get("usage", {}) if usage: prompt_tokens = usage.get("prompt_tokens", 0) completion_tokens = usage.get("completion_tokens", 0) total_tokens = usage.get("total_tokens", 0) # 添加token信息 tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}" answer += tokens_info try: # 使用唯一文件名并指定完整路径 timestamp = int(time.time()) output_path = f"summary_{timestamp}.png" # 构建完整的输出路径 self.LOG.info(f"开始生成图片: {output_path}") spath = await convert_md_str_to_image(answer, output_path) self.LOG.info(f"成功生成图片: {spath}") except Exception as e: self.LOG.error(f"生成图片失败: {e}", exc_info=True) # 如果图片生成失败,尝试发送纯文本消息 try: # 截断过长的文本,避免消息太长 max_length = 2000 if len(answer) > max_length: answer = answer[:max_length] + "\n\n... (内容过长,已截断)" self.LOG.info("图片生成失败,将发送文本消息作为备选方案") spath = None # 设置为None,让调用方知道需要发送文本 except Exception as fallback_error: self.LOG.error(f"备选文本发送也失败: {fallback_error}") spath = None # 返回文本内容和图片路径 return answer, spath except aiohttp.ClientError as e: self.LOG.error(f"请求Dify API时出错: {e}") return f"生成总结时出错: {str(e)}", None except json.JSONDecodeError as e: self.LOG.error(f"解析Dify API响应时出错: {e}") return "解析API响应时出错", None except Exception as e: self.LOG.error(f"处理总结时出现未知错误: {e}") return f"生成总结时出现未知错误: {str(e)}", None async def daily_summary_job(self): """定时任务:每天早上9点总结昨天的聊天信息""" try: self.LOG.info("开始执行每日聊天总结任务") # 计算昨天的时间范围 yesterday = datetime.now() - timedelta(days=1) yesterday_start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) yesterday_end = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999) self.LOG.info(f"总结时间范围: {yesterday_start.strftime('%Y-%m-%d %H:%M:%S')} 至 {yesterday_end.strftime('%Y-%m-%d %H:%M:%S')}") # 获取所有启用了群机器人的群聊 all_groups = GroupBotManager.get_group_list() if not all_groups: self.LOG.info("没有群聊启用群机器人,跳过定时总结") return # 筛选出开启了总结功能的群聊 enabled_groups = [] for group_id in all_groups: if GroupBotManager.get_group_permission(group_id, self.feature) == PermissionStatus.ENABLED: enabled_groups.append(group_id) if not enabled_groups: self.LOG.info("没有群聊开启定时总结功能,跳过") return self.LOG.info(f"找到 {len(enabled_groups)} 个开启定时总结的群聊") # 获取所有联系人 all_contacts = ContactManager.get_instance().get_all_contacts() # 为每个群生成总结 for group_id in enabled_groups: try: # 先统计昨天的消息数量 message_count = self.message_storage.count_messages_by_date_range( group_id, yesterday_start, yesterday_end ) # 消息少于50条,跳过总结 if message_count < 50: self.LOG.info(f"群 {group_id} 昨天只有 {message_count} 条消息,不足50条,跳过总结") continue self.LOG.info(f"群 {group_id} 昨天有 {message_count} 条消息,开始获取内容") # 获取群名 group_name = all_contacts.get(group_id, group_id) group_name = self._sanitize_group_name(group_name) # 获取昨天的聊天记录 chat_content = self.message_storage.get_messages_by_date_range( group_id, all_contacts, yesterday_start, yesterday_end ) if not chat_content: self.LOG.info(f"群 {group_id} 昨天聊天记录为空,跳过总结") continue self.LOG.info(f"开始为群 {group_name} 生成总结,消息数量: {message_count},内容长度: {len(chat_content)}") # 发送提示消息 await self.bot.send_text_message( group_id, f"⏳ 正在生成昨日聊天总结 ({yesterday.strftime('%Y-%m-%d')})… 😊" ) # 生成总结 summary, image_path = await self._generate_summary(chat_content, group_name) if image_path: # 图片生成成功,发送图片 await self.bot.send_image_message(group_id, Path(image_path)) self.LOG.info(f"成功发送群 {group_name} 的昨日总结图片") else: # 图片生成失败,发送文本消息 if summary and len(summary.strip()) > 0: max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" await self.bot.send_text_message(group_id, summary) self.LOG.info(f"成功发送群 {group_name} 的昨日总结文本") # 避免请求过快 await asyncio.sleep(2) except Exception as e: self.LOG.error(f"为群 {group_id} 生成昨日总结失败: {e}", exc_info=True) continue self.LOG.info("每日聊天总结任务执行完成") except Exception as e: self.LOG.error(f"每日聊天总结任务执行失败: {e}", exc_info=True)