Files
abot/plugins/message_summary/main.py

193 lines
7.1 KiB
Python

import json
from typing import Dict, Any, Tuple, Optional, List
import requests
from message_storage.message_to_db import MessageStorage
from utils.compress_chat_data import compress_chat_data
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.markdown_to_image import convert_md_str_to_image
from plugins.stats_collector.decorators import plugin_stats_decorator
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
class MessageSummaryPlugin(MessagePluginInterface):
"""消息总结插件,用于生成群聊消息总结"""
@property
def name(self) -> str:
return "群聊总结"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "使用AI生成群聊消息总结"
@property
def author(self) -> str:
return "WeChatRobot Team"
@property
def command_prefix(self) -> Optional[str]:
return "#"
@property
def commands(self) -> List[str]:
return ["总结", "summary"]
def __init__(self):
super().__init__()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
try:
# 从插件配置中获取API密钥和URL
api_config = self._config.get("api", {})
self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo")
self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages")
self.message_storage = MessageStorage()
self.LOG.info(f"初始化 {self.name} 插件成功")
return True
except Exception as e:
if hasattr(self, 'LOG'):
self.LOG.error(f"初始化 {self.name} 插件失败: {e}")
else:
print(f"初始化 {self.name} 插件失败: {e}")
return False
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
@plugin_stats_decorator(plugin_name="群聊总结")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
try:
# 检查是否是总结命令
content = message.get("content", "")
if not content.startswith(self.command_prefix):
return False, None
command = content[len(self.command_prefix):].split()[0]
if command not in self.commands:
return False, None
# 获取需要总结的内容
group_id = message.get("roomid")
if not group_id:
# 直接发送消息
wcf = message.get("wcf")
if wcf:
wcf.send_text("只支持群聊消息总结", message.get("sender"))
return True, None
# 权限判断
gbm: GroupBotManager = message.get("gbm")
if gbm and gbm.get_group_permission(group_id, Feature.SUMMARY_CAPABILITY) == PermissionStatus.DISABLED:
return True, None
# 从消息历史中获取群聊记录
all_contacts: dict = message.get("all_contacts")
chat_content = self.message_storage.get_messages(group_id, all_contacts)
if len(chat_content) < 100:
return False, None
# 生成总结
summary, image_path = self._generate_summary(chat_content, all_contacts.get(group_id, group_id))
# 发送总结结果
wcf = message.get("wcf")
if wcf:
# if summary:
# wcf.send_text(f"总结已生成:\n{summary}", group_id, message.get("sender"))
if image_path:
wcf.send_file(image_path, group_id)
return True, None
except Exception as e:
self.LOG.error(f"处理消息总结命令失败: {e}")
return False, None
def _generate_summary(self, chat_content: str, group_id: str) -> Tuple[str, Optional[str]]:
"""生成总结"""
# Dify API配置
content_compress = chat_content
try:
content_compress = compress_chat_data(chat_content)
self.LOG.info(f"压缩内容成功:{len(content_compress)}--{len(chat_content)}")
except Exception as e:
self.LOG.error(f"压缩内容失败:{e}")
# 准备请求数据
data = {
"inputs": {},
"query": f"请根据以下{group_id}群聊记录生成一份精华总结:\n\n{content_compress}",
"response_mode": "blocking", # 使用阻塞模式,直接获取完整响应
"conversation_id": "",
"user": group_id if group_id is not None else "message_summary_bot",
"files": [] # 不包含文件
}
# 设置请求头
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json"
}
try:
# 发送POST请求
response = requests.post(self._api_url, headers=headers, json=data)
response.raise_for_status() # 检查请求是否成功
# 解析响应
response_data = response.json()
self.LOG.info(f"Dify API响应状态码: {response.status_code}")
self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}")
# 提取回答内容
answer = response_data.get("answer", "")
spath = ""
# 提取token使用情况
metadata = response_data.get("metadata", {})
usage = metadata.get("usage", {})
if usage:
prompt_tokens = usage.get("prompt_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
total_tokens = usage.get("total_tokens", 0)
# 添加token信息
tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}"
answer += tokens_info
try:
spath = convert_md_str_to_image(answer, "output.png")
except Exception as e:
self.LOG.error(f"生成image失败:{e}")
# 返回文本内容和图片路径
return answer, spath
except requests.exceptions.RequestException as e:
self.LOG.error(f"请求Dify API时出错: {e}")
return f"生成总结时出错: {str(e)}", None
except json.JSONDecodeError as e:
self.LOG.error(f"解析Dify API响应时出错: {e}")
return "解析API响应时出错", None
except Exception as e:
self.LOG.error(f"处理总结时出现未知错误: {e}")
return f"生成总结时出现未知错误: {str(e)}", None