diff --git a/plugins/message_summary/main.py b/plugins/message_summary/main.py index eae0d6c..ae9ff61 100644 --- a/plugins/message_summary/main.py +++ b/plugins/message_summary/main.py @@ -16,199 +16,112 @@ from utils.compress_chat_data import compress_chat_data from utils.decorator.async_job import async_job from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost -from utils.decorator.rate_limit_decorator import group_feature_rate_limit +from utils.decorator.rate_limit_decorator import rate_limit from utils.markdown_to_image import convert_md_str_to_image -from utils.revoke.message_auto_revoke import MessageAutoRevoke +from utils.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.string_utils import remove_trailing_content from utils.wechat.contact_manager import ContactManager from utils.wechat.message_to_db import MessageStorage -from wechat_ipad import WechatAPIClient +from wechat_ipad import WechatAPI +@plugin_stats_decorator class MessageSummaryPlugin(MessagePluginInterface): - """消息总结插件,用于生成群聊消息总结""" - - # 功能权限常量 - FEATURE_KEY = "SUMMARY_CAPABILITY" - FEATURE_DESCRIPTION = "📝 群总结能力 [#总结]" - - @property - def name(self) -> str: - return "群聊总结" - - @property - def version(self) -> str: - return "1.0.0" - - @property - def description(self) -> str: - return "使用AI生成群聊消息总结" - - @property - def author(self) -> str: - return "ABOT Team" - - @property - def command_prefix(self) -> Optional[str]: - return "#" - - @property - def commands(self) -> List[str]: - return ["总结", "summary"] - - @property - def feature_key(self) -> Optional[str]: - return self.FEATURE_KEY - - @property - def feature_description(self) -> Optional[str]: - return self.FEATURE_DESCRIPTION + description = "消息总结" + author = "Liu" + version = "0.0.8" def __init__(self): super().__init__() - self.message_storage = None - self.revoke = None - # 注册功能权限 - self.feature = self.register_feature() - # 注册定时任务:每天早上9点总结昨天的聊天信息 - # async_job.at_times(["09:00"])(self.daily_summary_job) + self.plugin_name = "message_summary" + self.feature = "summary" + self.message_storage = MessageStorage.get_instance() + self.revoke = MessageAutoRevoke.get_instance() + self._api_url = None + self._api_key = None - def initialize(self, context: Dict[str, Any]) -> bool: - """初始化插件""" - try: - # 从插件配置中获取API密钥和URL - api_config = self._config.get("api", {}) - self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo") - self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages") - self.message_storage = MessageStorage() - - self.LOG.debug(f"初始化 {self.name} 插件成功") - return True - except Exception as e: - if hasattr(self, 'LOG'): - self.LOG.error(f"初始化 {self.name} 插件失败: {e}") - else: - print(f"初始化 {self.name} 插件失败: {e}") + async def initialize(self, bot: WechatAPI, config: dict) -> bool: + self.bot = bot + self.config = config + self._api_url = config.get("api_url") + self._api_key = config.get("api_key") + if not self._api_url or not self._api_key: + logger.error("Dify API配置缺失") return False - - def start(self) -> bool: - """启动插件""" - self.LOG.debug(f"[{self.name}] 插件已启动") - self.status = PluginStatus.RUNNING return True - def stop(self) -> bool: - """停止插件""" - self.LOG.info(f"[{self.name}] 插件已停止") - self.status = PluginStatus.STOPPED - return True + @property + def status(self) -> PluginStatus: + return PluginStatus.ENABLED - @plugin_stats_decorator(plugin_name="群聊总结") - @plugin_points_cost(10, "群聊总结消耗积分", FEATURE_KEY) - @group_feature_rate_limit(max_per_minute=30, feature_key=FEATURE_KEY) - async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """处理消息""" - try: - # 检查是否是总结命令 - content = message.get("content", "") - self.bot: WechatAPIClient = message.get("bot") - if not content.startswith(self.command_prefix): - return False, None + @property + def keywords(self) -> List[str]: + return ["总结", "summary"] - command = content[len(self.command_prefix):].split()[0] - if command not in self.commands: - return False, None - # 获取需要总结的内容 - group_id = message.get("roomid") - - self.revoke: MessageAutoRevoke = message.get("revoke") - if not group_id: - await self.bot.send_text_message(group_id, "只支持群聊消息总结", message.get("sender")) - return False, None - # 权限判断 - gbm: GroupBotManager = message.get("gbm") - if gbm and gbm.get_group_permission(group_id, self.feature) == PermissionStatus.DISABLED: - return False, None - # 从消息历史中获取群聊记录 - all_contacts: dict = message.get("all_contacts") - group_members: dict = ContactManager.get_instance().get_group_members(group_id) - - chat_content = self.message_storage.get_messages(group_id, group_members) - if len(chat_content) < 100: - return False, None - - # 获取群名并处理 - group_name = all_contacts.get(group_id, group_id) - group_name = self._sanitize_group_name(group_name) - - client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, "⏳群消息总结中… 😊") - self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) - # 创建线程异步处理总结生成和发送 - res = await self._async_generate_and_send_summary(chat_content, group_name, group_id, - message) - if res: - return True, "异步总结已启动" - else: - return False, "总结失败" - - except Exception as e: - self.LOG.error(f"处理消息总结命令失败: {e}") + @rate_limit(calls=1, period=10) + @plugin_points_cost(cost=0) + async def run(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + content = message.get("content", "") + sender = message.get("sender") + group_id = message.get("group_id") + if not content or not group_id: + return False, None + if content.lstrip("/") not in self.keywords: return False, None - async def _async_generate_and_send_summary(self, chat_content: str, group_name: str, group_id: str, - message: Dict[str, Any]): - """异步生成并发送总结""" - try: - # 生成总结 - summary, image_path = await self._generate_summary(chat_content, group_name) + gbm: GroupBotManager = message.get("gbm") + if gbm and gbm.get_group_permission(group_id, self.feature) == PermissionStatus.DISABLED: + return False, None + self.LOG.info(f"收到群 {group_id} 总结请求") + all_contacts: dict = message.get("all_contacts") + group_members: dict = ContactManager.get_instance().get_group_members(group_id) + chat_content = self.message_storage.get_messages(group_id, group_members) + if len(chat_content) < 100: + return False, None + + group_name = all_contacts.get(group_id, group_id) + group_name = self._sanitize_group_name(group_name) + res = await self._async_generate_and_send_summary(chat_content, group_name, group_id, sender) + return True, res + + async def _async_generate_and_send_summary(self, chat_content: str, group_name: str, group_id: str, + sender: str = None): + try: + summary, image_path = await self._generate_summary(chat_content, group_name) if image_path: - # 图片生成成功,发送图片 await self.bot.send_image_message(group_id, Path(image_path)) - self.LOG.info(f"成功发送图片总结到群 {group_id}") return True else: - # 图片生成失败,发送文本消息 if summary and len(summary.strip()) > 0: - # 截断过长的文本 max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" - client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, summary) - self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 30) - self.LOG.info(f"图片生成失败,已发送文本总结到群 {group_id}") + self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 10) return True - else: - # 连文本内容都没有 - client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, - "❌ 生成总结失败,请稍后再试!") - self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) - return False - + client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, + f"❌ 生成总结失败,请稍后再试") + self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) + return False except Exception as e: - self.LOG.error(f"异步生成总结失败: {e}") + self.LOG.error(f"生成或发送总结失败: {e}", exc_info=True) client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(group_id, f"❌ 生成总结失败,请稍后再试") self.revoke.add_message_to_revoke(group_id, client_msg_id, create_time, new_msg_id, 5) return False def _sanitize_group_name(self, group_name: str) -> str: - """处理群名,去除特殊字符并限制长度""" - # 去除特殊字符,只保留字母、数字、中文和基本标点 sanitized_name = re.sub(r'[^\w\s\u4e00-\u9fff,.,。]', '', group_name) - # 限制长度为15个字符 if len(sanitized_name) > 15: sanitized_name = sanitized_name[:15] - # 如果处理后为空,则使用默认名称 if not sanitized_name: sanitized_name = "群聊" return sanitized_name async def _generate_summary(self, chat_content: str, group_name: str) -> Tuple[str, Optional[str]]: """生成总结""" - # Dify API配置 content_compress = chat_content try: content_compress = compress_chat_data(chat_content) @@ -216,18 +129,16 @@ class MessageSummaryPlugin(MessagePluginInterface): except Exception as e: self.LOG.error(f"压缩内容失败:{e}") - # 准备请求数据 data = { "inputs": {}, "query": f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}", - "response_mode": "blocking", # 使用阻塞模式,直接获取完整响应 + "response_mode": "blocking", "conversation_id": "", "user": group_name if group_name is not None else "message_summary_bot", - "files": [] # 不包含文件 + "files": [] } self.LOG.info(f"群聊总结内容:{data}") - # 设置请求头 headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json" @@ -235,21 +146,17 @@ class MessageSummaryPlugin(MessagePluginInterface): try: custom_timeout = ClientTimeout(total=None, connect=10, sock_read=300) - conn = aiohttp.TCPConnector(keepalive_timeout=60) # 保持连接活跃 + conn = aiohttp.TCPConnector(keepalive_timeout=60) async with aiohttp.ClientSession(connector=conn, timeout=custom_timeout) as session: async with session.post(self._api_url, headers=headers, json=data) as response: - response.raise_for_status() # 检查请求是否成功 + response.raise_for_status() response_data = await response.json() self.LOG.info(f"Dify API响应状态码: {response.status}") self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") - # 提取回答内容 answer = response_data.get("answer", "") - # 去除广告内容pollinations.ai 的广告 - # answer = remove_trailing_content(answer) spath = "" - # 提取token使用情况 metadata = response_data.get("metadata", {}) usage = metadata.get("usage", {}) @@ -257,32 +164,25 @@ class MessageSummaryPlugin(MessagePluginInterface): prompt_tokens = usage.get("prompt_tokens", 0) completion_tokens = usage.get("completion_tokens", 0) total_tokens = usage.get("total_tokens", 0) - - # 添加token信息 tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}" answer += tokens_info try: - # 使用唯一文件名并指定完整路径 timestamp = int(time.time()) output_path = f"summary_{timestamp}.png" - # 构建完整的输出路径 self.LOG.info(f"开始生成图片: {output_path}") spath = await convert_md_str_to_image(answer, output_path) self.LOG.info(f"成功生成图片: {spath}") except Exception as e: self.LOG.error(f"生成图片失败: {e}", exc_info=True) - # 如果图片生成失败,尝试发送纯文本消息 try: - # 截断过长的文本,避免消息太长 max_length = 2000 if len(answer) > max_length: answer = answer[:max_length] + "\n\n... (内容过长,已截断)" self.LOG.info("图片生成失败,将发送文本消息作为备选方案") - spath = None # 设置为None,让调用方知道需要发送文本 + spath = None except Exception as fallback_error: self.LOG.error(f"备选文本发送也失败: {fallback_error}") spath = None - # 返回文本内容和图片路径 return answer, spath except aiohttp.ClientError as e: @@ -301,104 +201,65 @@ class MessageSummaryPlugin(MessagePluginInterface): """定时任务:每天早上9点总结昨天的聊天信息""" try: self.LOG.info("开始执行每日聊天总结任务") - - # 计算昨天的时间范围 yesterday = datetime.now() - timedelta(days=1) yesterday_start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) yesterday_end = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999) - self.LOG.info( f"总结时间范围: {yesterday_start.strftime('%Y-%m-%d %H:%M:%S')} 至 {yesterday_end.strftime('%Y-%m-%d %H:%M:%S')}") - - # 获取所有启用了群机器人的群聊 all_groups = GroupBotManager.get_group_list() - if not all_groups: self.LOG.info("没有群聊启用群机器人,跳过定时总结") return - - # 筛选出开启了总结功能的群聊 enabled_groups = [] for group_id in all_groups: if GroupBotManager.get_group_permission(group_id, self.feature) == PermissionStatus.ENABLED: enabled_groups.append(group_id) - if not enabled_groups: self.LOG.info("没有群聊开启定时总结功能,跳过") return - self.LOG.info(f"找到 {len(enabled_groups)} 个开启定时总结的群聊") - - # 为每个群生成总结 for group_id in enabled_groups: try: - # 先统计昨天的消息数量 message_count = self.message_storage.count_messages_by_date_range( group_id, yesterday_start, yesterday_end ) - - # 消息少于50条,跳过总结 if message_count < 100: self.LOG.info(f"群 {group_id} 昨天只有 {message_count} 条消息,不足50条,跳过总结") continue - self.LOG.info(f"群 {group_id} 昨天有 {message_count} 条消息,开始获取内容") - - # 获取群成员信息 group_members = ContactManager.get_instance().get_group_members(group_id) - - # 获取群名 group_name = ContactManager.get_instance().get_nickname(group_id) group_name = self._sanitize_group_name(group_name) - - # 获取昨天的聊天记录 chat_content = self.message_storage.get_messages_by_date_range( group_id, group_members, yesterday_start, yesterday_end ) - if not chat_content: - self.LOG.info(f"群 {group_id} 昨天聊天记录为空,跳过总结") + self.LOG.info(f"群 {group_name} 昨天没有有效消息,跳过") continue - + self.LOG.info( + f"获取到 {message_count} 条消息(时间范围:{yesterday_start.strftime('%Y-%m-%d %H:%M:%S')} 至 {yesterday_end.strftime('%Y-%m-%d %H:%M:%S')}),格式化后长度: {len(chat_content)}") self.LOG.info( f"开始为群 {group_name} 生成总结,消息数量: {message_count},内容长度: {len(chat_content)}") - - # 发送提示消息 - await self.bot.send_text_message( - group_id, - f"⏳ 正在生成 [{yesterday.strftime('%Y-%m-%d')}]聊天总结… 😊" - ) - - # 生成总结 summary, image_path = await self._generate_summary(chat_content, group_name) - if image_path: - # 图片生成成功,发送图片 await self.bot.send_image_message(group_id, Path(image_path)) self.LOG.info(f"成功发送群 {group_name} 的昨日总结图片") else: - # 图片生成失败,发送文本消息 if summary and len(summary.strip()) > 0: max_length = 2000 if len(summary) > max_length: summary = summary[:max_length] + "\n\n... (内容过长,已截断)" - await self.bot.send_text_message(group_id, summary) self.LOG.info(f"成功发送群 {group_name} 的昨日总结文本") - - # 避免请求过快 await asyncio.sleep(2) - - except Exception as e: - self.LOG.error(f"为群 {group_id} 生成昨日总结失败: {e}", exc_info=True) + except Exception as group_error: + self.LOG.error(f"处理群 {group_id} 总结时出错: {group_error}", exc_info=True) continue - self.LOG.info("每日聊天总结任务执行完成") - except Exception as e: - self.LOG.error(f"每日聊天总结任务执行失败: {e}", exc_info=True) + self.LOG.error(f"执行每日聊天总结任务时出错: {e}", exc_info=True) diff --git a/utils/markdown_to_image.py b/utils/markdown_to_image.py index f2adb07..709e87e 100644 --- a/utils/markdown_to_image.py +++ b/utils/markdown_to_image.py @@ -10,6 +10,8 @@ import asyncio import re from loguru import logger +META_KEYWORDS = ["群", "群名", "时间", "日期", "成员", "消息", "统计", "总结", "来源", "生成", "记录"] + async def safe_close_browser(browser, timeout: float = 4.0) -> None: if not browser: @@ -67,35 +69,44 @@ async def safe_close_browser(browser, timeout: float = 4.0) -> None: logger.warning(f"force kill failed: {e}") +def _clean_text(html: str) -> str: + return re.sub(r'\s+', ' ', re.sub(r'<.*?>', ' ', html)).strip() + + +def _looks_like_meta(html: str) -> bool: + clean = _clean_text(html) + if not clean: + return False + if any(k in clean for k in META_KEYWORDS): + return True + return len(clean) <= 80 + + def _split_hero(html_body: str): title_match = re.search(r'
(.*?)
', remain, re.S | re.I) - meta_parts = [] - used = 0 - for para in paragraphs[:3]: - clean = re.sub(r'<.*?>', '', para).strip() - if not clean: - continue - if len(clean) <= 80 or any(k in clean for k in ["群", "时间", "日期", "成员", "消息", "统计", "总结", "来源"]): - meta_parts.append(para.strip()) - used += 1 - else: + block_pattern = re.compile(r'^\s*(<(?:p|blockquote|ul|ol)[^>]*>.*?(?:p|blockquote|ul|ol)>)', re.S | re.I) + meta_blocks = [] + for _ in range(4): + m = block_pattern.match(remain) + if not m: break + block = m.group(1) + if not _looks_like_meta(block): + break + meta_blocks.append(block.strip()) + remain = remain[m.end():].strip() - hero_meta = ".*?
){' + str(used) + r'}', '', remain, count=1, flags=re.S | re.I).strip() - - return hero_title, hero_meta, remain + hero_meta = ''.join(meta_blocks) + hero_enabled = bool(title_match or meta_blocks) + return hero_title, hero_meta, remain, hero_enabled async def md_str_to_html_content(md_content): - """将 Markdown 字符串转换为更美观的 HTML 内容。""" html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite']) - hero_title, hero_meta, remain_html = _split_hero(html_body) + hero_title, hero_meta, remain_html, hero_enabled = _split_hero(html_body) css = """ """ + hero_html = '' + content_class = 'content hero-active' if hero_enabled else 'content' + if hero_enabled: + hero_html = f''' +