- 重新新增群总结数据库操作类并自动建表 t_message_summary - 在群总结插件初始化时接入总结入库数据库对象 - 定时总结成功发送后自动写入数据库,保留文本结果、图片路径和消息数量 - 失败提醒不入库,避免脏数据进入总结表
497 lines
23 KiB
Python
497 lines
23 KiB
Python
import asyncio
|
||
import json
|
||
import re
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Dict, Any, Tuple, Optional, List
|
||
|
||
import aiohttp
|
||
from aiohttp import ClientTimeout
|
||
from loguru import logger
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from db.message_summary_db import MessageSummaryDBOperator
|
||
from utils.compress_chat_data import compress_chat_data
|
||
from utils.decorator.async_job import async_job
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.decorator.points_decorator import plugin_points_cost
|
||
from utils.decorator.rate_limit_decorator import group_feature_rate_limit
|
||
from utils.markdown_to_image import convert_md_str_to_image
|
||
from utils.revoke.message_auto_revoke import MessageAutoRevoke
|
||
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
|
||
from utils.string_utils import remove_trailing_content
|
||
from utils.wechat.contact_manager import ContactManager
|
||
from utils.wechat.message_to_db import MessageStorage
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class MessageSummaryPlugin(MessagePluginInterface):
|
||
"""消息总结插件,用于生成群聊消息总结"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "SUMMARY_CAPABILITY"
|
||
FEATURE_DESCRIPTION = "📝 群总结能力 [#总结]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "群聊总结"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "使用AI生成群聊消息总结"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "ABOT Team"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "#"
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return ["总结", "summary"]
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.message_storage = None
|
||
self.message_summary_db = None
|
||
self.revoke = None
|
||
self._auto_revoke = None
|
||
# 注册功能权限
|
||
self.feature = self.register_feature()
|
||
# 注册定时任务:每天早上9点总结昨天的聊天信息
|
||
async_job.at_times(["09:00"])(self.daily_summary_job)
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
try:
|
||
# 从插件配置中获取API密钥和URL
|
||
api_config = self._config.get("api", {})
|
||
self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo")
|
||
self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages")
|
||
self.message_storage = MessageStorage()
|
||
db_manager = context.get("db_manager")
|
||
if db_manager:
|
||
self.message_summary_db = MessageSummaryDBOperator(db_manager)
|
||
|
||
self.LOG.debug(f"初始化 {self.name} 插件成功")
|
||
return True
|
||
except Exception as e:
|
||
if hasattr(self, 'LOG'):
|
||
self.LOG.error(f"初始化 {self.name} 插件失败: {e}")
|
||
else:
|
||
print(f"初始化 {self.name} 插件失败: {e}")
|
||
return False
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
@plugin_stats_decorator(plugin_name="群聊总结")
|
||
@plugin_points_cost(10, "群聊总结消耗积分", FEATURE_KEY)
|
||
@group_feature_rate_limit(max_per_minute=30, feature_key=FEATURE_KEY)
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
try:
|
||
# 检查是否是总结命令
|
||
content = message.get("content", "")
|
||
self.bot: WechatAPIClient = message.get("bot")
|
||
if not content.startswith(self.command_prefix):
|
||
return False, None
|
||
|
||
command = content[len(self.command_prefix):].split()[0]
|
||
if command not in self.commands:
|
||
return False, None
|
||
# 获取需要总结的内容
|
||
group_id = message.get("roomid")
|
||
|
||
self.revoke: MessageAutoRevoke = message.get("revoke")
|
||
if not group_id:
|
||
await self.bot.send_text_message(group_id, "只支持群聊消息总结", message.get("sender"))
|
||
return False, None
|
||
# 权限判断
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
if gbm and gbm.get_group_permission(group_id, self.feature) == PermissionStatus.DISABLED:
|
||
return False, None
|
||
# 从消息历史中获取群聊记录
|
||
all_contacts: dict = message.get("all_contacts")
|
||
group_members: dict = ContactManager.get_instance().get_group_members(group_id)
|
||
|
||
chat_content = self.message_storage.get_messages(group_id, group_members)
|
||
if len(chat_content) < 100:
|
||
return False, None
|
||
|
||
# 获取群名并处理
|
||
group_name = all_contacts.get(group_id, group_id)
|
||
group_name = self._sanitize_group_name(group_name)
|
||
|
||
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, "⏳群消息总结中… 😊")
|
||
self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5)
|
||
# 创建线程异步处理总结生成和发送
|
||
res = await self._async_generate_and_send_summary(chat_content, group_name, group_id,
|
||
message)
|
||
if res:
|
||
return True, "异步总结已启动"
|
||
else:
|
||
return False, "总结失败"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"处理消息总结命令失败: {e}")
|
||
return False, None
|
||
|
||
async def _async_generate_and_send_summary(self, chat_content: str, group_name: str, group_id: str,
|
||
message: Dict[str, Any]):
|
||
"""异步生成并发送总结"""
|
||
try:
|
||
# 生成总结
|
||
summary, image_path = await self._generate_summary(chat_content, group_name)
|
||
|
||
if image_path:
|
||
# 图片生成成功,发送图片
|
||
await self.bot.send_image_message(group_id, Path(image_path))
|
||
self.LOG.info(f"成功发送图片总结到群 {group_id}")
|
||
return True
|
||
else:
|
||
# 图片生成失败,发送文本消息
|
||
if summary and len(summary.strip()) > 0:
|
||
# 截断过长的文本
|
||
max_length = 2000
|
||
if len(summary) > max_length:
|
||
summary = summary[:max_length] + "\n\n... (内容过长,已截断)"
|
||
|
||
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, summary)
|
||
self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 30)
|
||
self.LOG.info(f"图片生成失败,已发送文本总结到群 {group_id}")
|
||
return True
|
||
else:
|
||
# 连文本内容都没有
|
||
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id,
|
||
"❌ 生成总结失败,请稍后再试!")
|
||
self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5)
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"异步生成总结失败: {e}")
|
||
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id,
|
||
f"❌ 生成总结失败,请稍后再试")
|
||
self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5)
|
||
return False
|
||
|
||
def _sanitize_group_name(self, group_name: str) -> str:
|
||
"""处理群名,去除特殊字符并限制长度"""
|
||
# 去除特殊字符,只保留字母、数字、中文和基本标点
|
||
sanitized_name = re.sub(r'[^\w\s\u4e00-\u9fff,.,。]', '', group_name)
|
||
# 限制长度为15个字符
|
||
if len(sanitized_name) > 15:
|
||
sanitized_name = sanitized_name[:15]
|
||
# 如果处理后为空,则使用默认名称
|
||
if not sanitized_name:
|
||
sanitized_name = "群聊"
|
||
return sanitized_name
|
||
|
||
def _get_revoke_manager(self) -> Optional[MessageAutoRevoke]:
|
||
"""优先使用消息上下文中的撤回器,定时任务场景则懒初始化一个"""
|
||
if self.revoke:
|
||
return self.revoke
|
||
if self._auto_revoke is None and self.bot:
|
||
self._auto_revoke = MessageAutoRevoke(self.bot)
|
||
return self._auto_revoke
|
||
|
||
async def _send_text_with_revoke(self, target: str, content: str, delay: int) -> Tuple[int, int, int]:
|
||
"""发送文本并按需登记自动撤回"""
|
||
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(target, content)
|
||
revoke_manager = self._get_revoke_manager()
|
||
if revoke_manager:
|
||
revoke_manager.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, delay)
|
||
return client_msg_id, create_time, new_msg_id
|
||
|
||
def _save_daily_summary_record(
|
||
self,
|
||
group_id: str,
|
||
group_name: str,
|
||
period_start: datetime,
|
||
period_end: datetime,
|
||
message_count: int,
|
||
summary_text: str,
|
||
image_path: Optional[str],
|
||
) -> bool:
|
||
"""保存每日总结到数据库"""
|
||
if not self.message_summary_db:
|
||
self.LOG.warning("群总结数据库未初始化,跳过总结入库")
|
||
return False
|
||
|
||
summary_record = {
|
||
"chatroom_id": group_id,
|
||
"group_name": group_name,
|
||
"summary_type": "daily",
|
||
"period_key": period_start.strftime("%Y-%m-%d"),
|
||
"period_start": period_start.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"period_end": period_end.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"source_message_count": message_count,
|
||
"summary_text": summary_text or "",
|
||
"image_path": image_path,
|
||
"meta": {
|
||
"source": "message_summary_plugin",
|
||
"summary_date": period_start.strftime("%Y-%m-%d"),
|
||
"has_image": bool(image_path),
|
||
},
|
||
}
|
||
saved = self.message_summary_db.save_summary(summary_record)
|
||
if saved:
|
||
self.LOG.info(f"群总结入库成功: group_id={group_id}, period_key={summary_record['period_key']}")
|
||
else:
|
||
self.LOG.error(f"群总结入库失败: group_id={group_id}, period_key={summary_record['period_key']}")
|
||
return saved
|
||
|
||
async def _generate_summary(self, chat_content: str, group_name: str) -> Tuple[str, Optional[str]]:
|
||
"""生成总结"""
|
||
# Dify API配置
|
||
content_compress = chat_content
|
||
try:
|
||
content_compress = compress_chat_data(chat_content)
|
||
self.LOG.info(f"压缩内容成功:{len(content_compress)}--{len(chat_content)}")
|
||
except Exception as e:
|
||
self.LOG.error(f"压缩内容失败:{e}")
|
||
|
||
# 准备请求数据
|
||
data = {
|
||
"inputs": {},
|
||
"query": f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}",
|
||
"response_mode": "blocking", # 使用阻塞模式,直接获取完整响应
|
||
"conversation_id": "",
|
||
"user": group_name if group_name is not None else "message_summary_bot",
|
||
"files": [] # 不包含文件
|
||
}
|
||
|
||
self.LOG.info(f"群聊总结内容:{data}")
|
||
# 设置请求头
|
||
headers = {
|
||
"Authorization": f"Bearer {self._api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
max_retries = 3
|
||
retry_delays = [2, 4]
|
||
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
custom_timeout = ClientTimeout(total=None, connect=10, sock_read=300)
|
||
conn = aiohttp.TCPConnector(keepalive_timeout=60) # 保持连接活跃
|
||
async with aiohttp.ClientSession(connector=conn, timeout=custom_timeout) as session:
|
||
async with session.post(self._api_url, headers=headers, json=data) as response:
|
||
response.raise_for_status() # 检查请求是否成功
|
||
response_data = await response.json()
|
||
|
||
self.LOG.info(f"Dify API响应状态码: {response.status}, attempt={attempt}")
|
||
self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}")
|
||
|
||
# 提取回答内容
|
||
answer = response_data.get("answer", "")
|
||
# 去除广告内容pollinations.ai 的广告
|
||
# answer = remove_trailing_content(answer)
|
||
spath = ""
|
||
# 提取token使用情况
|
||
metadata = response_data.get("metadata", {})
|
||
usage = metadata.get("usage", {})
|
||
|
||
if usage:
|
||
prompt_tokens = usage.get("prompt_tokens", 0)
|
||
completion_tokens = usage.get("completion_tokens", 0)
|
||
total_tokens = usage.get("total_tokens", 0)
|
||
|
||
# 添加token信息
|
||
tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}"
|
||
answer += tokens_info
|
||
try:
|
||
# 使用唯一文件名并指定完整路径
|
||
timestamp = int(time.time())
|
||
output_path = f"summary_{timestamp}.png"
|
||
# 构建完整的输出路径
|
||
self.LOG.info(f"开始生成图片: {output_path}")
|
||
spath = await convert_md_str_to_image(answer, output_path)
|
||
self.LOG.info(f"成功生成图片: {spath}")
|
||
except Exception as e:
|
||
self.LOG.error(f"生成图片失败: {e}", exc_info=True)
|
||
# 如果图片生成失败,尝试发送纯文本消息
|
||
try:
|
||
# 截断过长的文本,避免消息太长
|
||
max_length = 2000
|
||
if len(answer) > max_length:
|
||
answer = answer[:max_length] + "\n\n... (内容过长,已截断)"
|
||
self.LOG.info("图片生成失败,将发送文本消息作为备选方案")
|
||
spath = None # 设置为None,让调用方知道需要发送文本
|
||
except Exception as fallback_error:
|
||
self.LOG.error(f"备选文本发送也失败: {fallback_error}")
|
||
spath = None
|
||
# 返回文本内容和图片路径
|
||
return answer, spath
|
||
|
||
except aiohttp.ClientError as e:
|
||
self.LOG.error(f"请求Dify API时出错: attempt={attempt}/{max_retries}, error={e}")
|
||
except json.JSONDecodeError as e:
|
||
self.LOG.error(f"解析Dify API响应时出错: attempt={attempt}/{max_retries}, error={e}")
|
||
except Exception as e:
|
||
self.LOG.error(f"处理总结时出现未知错误: attempt={attempt}/{max_retries}, error={e}")
|
||
|
||
if attempt < max_retries:
|
||
delay = retry_delays[attempt - 1] if attempt - 1 < len(retry_delays) else retry_delays[-1]
|
||
self.LOG.warning(f"群总结生成失败,准备重试: attempt={attempt}/{max_retries}, delay={delay}s")
|
||
await asyncio.sleep(delay)
|
||
|
||
return "生成总结时出错", None
|
||
|
||
async def daily_summary_job(self):
|
||
"""定时任务:每天早上9点总结昨天的聊天信息"""
|
||
try:
|
||
self.LOG.info("开始执行每日聊天总结任务")
|
||
|
||
# 计算昨天的时间范围
|
||
yesterday = datetime.now() - timedelta(days=1)
|
||
yesterday_start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
yesterday_end = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999)
|
||
|
||
self.LOG.info(
|
||
f"总结时间范围: {yesterday_start.strftime('%Y-%m-%d %H:%M:%S')} 至 {yesterday_end.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
|
||
# 获取所有启用了群机器人的群聊
|
||
all_groups = GroupBotManager.get_group_list()
|
||
|
||
if not all_groups:
|
||
self.LOG.info("没有群聊启用群机器人,跳过定时总结")
|
||
return
|
||
|
||
# 筛选出开启了总结功能的群聊
|
||
enabled_groups = []
|
||
for group_id in all_groups:
|
||
if GroupBotManager.get_group_permission(group_id, self.feature) == PermissionStatus.ENABLED:
|
||
enabled_groups.append(group_id)
|
||
|
||
if not enabled_groups:
|
||
self.LOG.info("没有群聊开启定时总结功能,跳过")
|
||
return
|
||
|
||
self.LOG.info(f"找到 {len(enabled_groups)} 个开启定时总结的群聊")
|
||
|
||
# 为每个群生成总结
|
||
for group_id in enabled_groups:
|
||
try:
|
||
# 先统计昨天的消息数量
|
||
message_count = self.message_storage.count_messages_by_date_range(
|
||
group_id,
|
||
yesterday_start,
|
||
yesterday_end
|
||
)
|
||
|
||
# 消息少于50条,跳过总结
|
||
if message_count < 100:
|
||
self.LOG.info(f"群 {group_id} 昨天只有 {message_count} 条消息,不足50条,跳过总结")
|
||
continue
|
||
|
||
self.LOG.info(f"群 {group_id} 昨天有 {message_count} 条消息,开始获取内容")
|
||
|
||
# 获取群成员信息
|
||
group_members = ContactManager.get_instance().get_group_members(group_id)
|
||
|
||
# 获取群名
|
||
group_name = ContactManager.get_instance().get_nickname(group_id)
|
||
group_name = self._sanitize_group_name(group_name)
|
||
|
||
# 获取昨天的聊天记录
|
||
chat_content = self.message_storage.get_messages_by_date_range(
|
||
group_id,
|
||
group_members,
|
||
yesterday_start,
|
||
yesterday_end
|
||
)
|
||
|
||
if not chat_content:
|
||
self.LOG.info(f"群 {group_id} 昨天聊天记录为空,跳过总结")
|
||
continue
|
||
|
||
self.LOG.info(
|
||
f"开始为群 {group_name} 生成总结,消息数量: {message_count},内容长度: {len(chat_content)}")
|
||
|
||
# 发送提示消息
|
||
await self._send_text_with_revoke(
|
||
group_id,
|
||
f"⏳ 正在生成 [{yesterday.strftime('%Y-%m-%d')}]聊天总结… 😊",
|
||
5
|
||
)
|
||
|
||
# 生成总结
|
||
summary, image_path = await self._generate_summary(chat_content, group_name)
|
||
|
||
if image_path:
|
||
# 图片生成成功,发送图片
|
||
await self.bot.send_image_message(group_id, Path(image_path))
|
||
self._save_daily_summary_record(
|
||
group_id,
|
||
group_name,
|
||
yesterday_start,
|
||
yesterday_end,
|
||
message_count,
|
||
summary,
|
||
str(image_path),
|
||
)
|
||
self.LOG.info(f"成功发送群 {group_name} 的昨日总结图片")
|
||
else:
|
||
# 图片生成失败,发送文本消息
|
||
if summary and len(summary.strip()) > 0:
|
||
max_length = 2000
|
||
if len(summary) > max_length:
|
||
summary = summary[:max_length] + "\n\n... (内容过长,已截断)"
|
||
|
||
if summary.strip() == "生成总结时出错":
|
||
await self._send_text_with_revoke(group_id, f"❌ [{yesterday.strftime('%Y-%m-%d')}] 聊天总结生成失败,请稍后再试", 5)
|
||
self.LOG.warning(f"群 {group_name} 的昨日总结生成失败,已发送可撤回失败提醒")
|
||
else:
|
||
await self.bot.send_text_message(group_id, summary)
|
||
self._save_daily_summary_record(
|
||
group_id,
|
||
group_name,
|
||
yesterday_start,
|
||
yesterday_end,
|
||
message_count,
|
||
summary,
|
||
None,
|
||
)
|
||
self.LOG.info(f"成功发送群 {group_name} 的昨日总结文本")
|
||
else:
|
||
await self._send_text_with_revoke(group_id, f"❌ [{yesterday.strftime('%Y-%m-%d')}] 聊天总结生成失败,请稍后再试", 5)
|
||
self.LOG.warning(f"群 {group_name} 的昨日总结无有效内容,已发送可撤回失败提醒")
|
||
|
||
# 避免请求过快
|
||
await asyncio.sleep(2)
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"为群 {group_id} 生成昨日总结失败: {e}", exc_info=True)
|
||
continue
|
||
|
||
self.LOG.info("每日聊天总结任务执行完成")
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"每日聊天总结任务执行失败: {e}", exc_info=True)
|