import json import os import re import time from pathlib import Path from typing import Dict, Any, Tuple, Optional, List import requests from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.string_utils import remove_trailing_content from utils.wechat.contact_manager import ContactManager from utils.wechat.message_to_db import MessageStorage from utils.compress_chat_data import compress_chat_data from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.points_decorator import plugin_points_cost from utils.markdown_to_image import convert_md_str_to_image from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus from wechat_ipad import WechatAPIClient class MessageSummaryPlugin(MessagePluginInterface): """消息总结插件,用于生成群聊消息总结""" @property def name(self) -> str: return "群聊总结" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "使用AI生成群聊消息总结" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "#" @property def commands(self) -> List[str]: return ["总结", "summary"] def __init__(self): super().__init__() self.bot: WechatAPIClient = None self.revoke: MessageAutoRevoke = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" try: # 从插件配置中获取API密钥和URL api_config = self._config.get("api", {}) self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo") self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages") self.message_storage = MessageStorage() self.LOG.info(f"初始化 {self.name} 插件成功") return True except Exception as e: if hasattr(self, 'LOG'): self.LOG.error(f"初始化 {self.name} 插件失败: {e}") else: print(f"初始化 {self.name} 插件失败: {e}") return False def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True @plugin_stats_decorator(plugin_name="群聊总结") @plugin_points_cost(10, "群聊总结消耗积分", Feature.SUMMARY_CAPABILITY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" try: # 检查是否是总结命令 content = message.get("content", "") self.bot: WechatAPIClient = message.get("bot") if not content.startswith(self.command_prefix): return False, None command = content[len(self.command_prefix):].split()[0] if command not in self.commands: return False, None # 获取需要总结的内容 group_id = message.get("roomid") self.revoke: MessageAutoRevoke = message.get("revoke") if not group_id: await self.bot.send_text_message(group_id, "只支持群聊消息总结", message.get("sender")) return False, None # 权限判断 gbm: GroupBotManager = message.get("gbm") if gbm and gbm.get_group_permission(group_id, Feature.SUMMARY_CAPABILITY) == PermissionStatus.DISABLED: return False, None # 从消息历史中获取群聊记录 all_contacts: dict = message.get("all_contacts") group_members: dict = ContactManager.get_instance().get_group_members(group_id) chat_content = self.message_storage.get_messages(group_id, group_members) if len(chat_content) < 100: return False, None # 获取群名并处理 group_name = all_contacts.get(group_id, group_id) group_name = self._sanitize_group_name(group_name) client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, "⏳群消息总结中… 😊") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) # 创建线程异步处理总结生成和发送 await self._async_generate_and_send_summary(chat_content, group_name, group_id, message) return True, "异步总结已启动" except Exception as e: self.LOG.error(f"处理消息总结命令失败: {e}") return False, None async def _async_generate_and_send_summary(self, chat_content: str, group_name: str, group_id: str, message: Dict[str, Any]): """异步生成并发送总结""" try: # 生成总结 summary, image_path = await self._generate_summary(chat_content, group_name) if image_path: await self.bot.send_image_message(group_id, Path(image_path)) else: client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, "❌ 生成总结图片失败") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) except Exception as e: self.LOG.error(f"异步生成总结失败: {e}") client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, f"❌ 生成总结失败: {str(e)}") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) def _sanitize_group_name(self, group_name: str) -> str: """处理群名,去除特殊字符并限制长度""" # 去除特殊字符,只保留字母、数字、中文和基本标点 sanitized_name = re.sub(r'[^\w\s\u4e00-\u9fff,.,。]', '', group_name) # 限制长度为15个字符 if len(sanitized_name) > 15: sanitized_name = sanitized_name[:15] # 如果处理后为空,则使用默认名称 if not sanitized_name: sanitized_name = "群聊" return sanitized_name async def _generate_summary(self, chat_content: str, group_name: str) -> Tuple[str, Optional[str]]: """生成总结""" # Dify API配置 content_compress = chat_content try: content_compress = compress_chat_data(chat_content) self.LOG.info(f"压缩内容成功:{len(content_compress)}--{len(chat_content)}") except Exception as e: self.LOG.error(f"压缩内容失败:{e}") # 准备请求数据 data = { "inputs": {}, "query": f"请根据以下{group_name}群聊记录生成一份精华总结:\n\n{content_compress}", "response_mode": "blocking", # 使用阻塞模式,直接获取完整响应 "conversation_id": "", "user": group_name if group_name is not None else "message_summary_bot", "files": [] # 不包含文件 } self.LOG.info(f"群聊总结内容:{data}") # 设置请求头 headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json" } try: # 发送POST请求 response = requests.post(self._api_url, headers=headers, json=data) response.raise_for_status() # 检查请求是否成功 # 解析响应 response_data = response.json() self.LOG.info(f"Dify API响应状态码: {response.status_code}") self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") # 提取回答内容 answer = response_data.get("answer", "") # 去除广告内容pollinations.ai 的广告 answer = remove_trailing_content(answer) spath = "" # 提取token使用情况 metadata = response_data.get("metadata", {}) usage = metadata.get("usage", {}) if usage: prompt_tokens = usage.get("prompt_tokens", 0) completion_tokens = usage.get("completion_tokens", 0) total_tokens = usage.get("total_tokens", 0) # 添加token信息 tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}" answer += tokens_info try: # 使用唯一文件名并指定完整路径 timestamp = int(time.time()) output_path = f"summary_{timestamp}.png" # 构建完整的输出路径 spath = await convert_md_str_to_image(answer, output_path) self.LOG.info(f"成功生成图片: {spath}") except Exception as e: self.LOG.error(f"生成image失败:{e}", exc_info=True) spath = None # 返回文本内容和图片路径 return answer, spath except requests.exceptions.RequestException as e: self.LOG.error(f"请求Dify API时出错: {e}") return f"生成总结时出错: {str(e)}", None except json.JSONDecodeError as e: self.LOG.error(f"解析Dify API响应时出错: {e}") return "解析API响应时出错", None except Exception as e: self.LOG.error(f"处理总结时出现未知错误: {e}") return f"生成总结时出现未知错误: {str(e)}", None