""" 群聊总结插件 基于AI的群聊消息总结功能,支持定时总结和手动触发 """ import tomllib import asyncio import aiohttp import pymysql from pathlib import Path from datetime import datetime, timedelta from loguru import logger from typing import List, Dict, Optional from utils.plugin_base import PluginBase from utils.decorators import on_text_message, schedule class ChatRoomSummary(PluginBase): """群聊总结插件""" description = "AI群聊总结 - 自动总结群聊话题和活跃度" author = "Assistant" version = "1.0.0" def __init__(self): super().__init__() self.config = None self.db_config = None async def async_init(self): """异步初始化""" try: config_path = Path(__file__).parent / "config.toml" if not config_path.exists(): logger.error(f"群聊总结插件配置文件不存在: {config_path}") return with open(config_path, "rb") as f: self.config = tomllib.load(f) self.db_config = self.config["database"] logger.success("群聊总结插件已加载") # 测试数据库连接 try: with self.get_db_connection() as conn: logger.info("数据库连接测试成功") except Exception as e: logger.error(f"数据库连接测试失败: {e}") except Exception as e: logger.error(f"群聊总结插件初始化失败: {e}") self.config = None def get_db_connection(self): """获取数据库连接""" return pymysql.connect( host=self.db_config["host"], port=self.db_config["port"], user=self.db_config["user"], password=self.db_config["password"], database=self.db_config["database"], charset=self.db_config["charset"], autocommit=True ) @on_text_message(priority=85) async def handle_summary_command(self, bot, message: dict): """处理总结命令""" if self.config is None: return True content = message.get("Content", "").strip() from_wxid = message.get("FromWxid", "") sender_wxid = message.get("SenderWxid", "") is_group = message.get("IsGroup", False) if not is_group or not (content in ["/总结", "/群聊总结", "/昨日总结"] or content.startswith("/总结 ")): return True if not self.config["behavior"]["enabled"]: return True enabled_groups = self.config["behavior"]["enabled_groups"] disabled_groups = self.config["behavior"]["disabled_groups"] if from_wxid in disabled_groups or (enabled_groups and from_wxid not in enabled_groups): return True is_today = content.startswith("/总结 今日") summary_type = "今日" if is_today else "昨日" logger.info(f"收到{summary_type}群聊总结请求: {from_wxid}") await bot.send_text(from_wxid, f"🤖 正在生成{summary_type}群聊总结,请稍候...") try: summary = await self._generate_summary(from_wxid, is_today) if summary: await self._send_summary_as_chat_record(bot, from_wxid, summary, summary_type) else: await bot.send_text(from_wxid, f"❌ 生成{summary_type}总结失败,可能是聊天记录不足或AI服务异常") except Exception as e: logger.error(f"{summary_type}群聊总结失败: {e}") await bot.send_text(from_wxid, f"❌ {summary_type}总结失败: {str(e)}") return False @schedule('cron', hour=9, minute=0) async def daily_summary_task(self, bot): """每日定时总结任务""" if not self.config or not self.config["behavior"]["auto_summary_enabled"]: return logger.info("开始执行每日群聊总结任务") enabled_groups = self.config["behavior"]["enabled_groups"] disabled_groups = self.config["behavior"]["disabled_groups"] if not enabled_groups: logger.info("未配置启用自动总结的群聊,跳过任务") return success_count = 0 fail_count = 0 for group_id in enabled_groups: if group_id in disabled_groups: continue try: logger.info(f"为群聊 {group_id} 生成总结") summary = await asyncio.wait_for( self._generate_summary(group_id), timeout=600 ) if summary: await self._send_summary_as_chat_record(bot, group_id, summary, "昨日") logger.success(f"群聊 {group_id} 总结发送成功") success_count += 1 else: logger.warning(f"群聊 {group_id} 总结生成失败(消息不足或其他原因)") fail_count += 1 await asyncio.sleep(60) except asyncio.TimeoutError: logger.error(f"群聊 {group_id} 总结超时(>600s)") fail_count += 1 except Exception as e: logger.error(f"群聊 {group_id} 总结失败: {e}") import traceback logger.error(traceback.format_exc()) fail_count += 1 logger.info(f"每日群聊总结任务完成 - 成功: {success_count}, 失败: {fail_count}") async def _generate_summary(self, group_id: str, is_today: bool = False) -> Optional[str]: """生成群聊总结""" try: if is_today: messages = await self._get_today_messages(group_id) time_desc = "今日" else: messages = await self._get_yesterday_messages(group_id) time_desc = "昨日" if len(messages) < self.config["behavior"]["min_messages"]: logger.info(f"群聊 {group_id} {time_desc}消息数量不足 ({len(messages)} < {self.config['behavior']['min_messages']})") return None formatted_messages = self._format_messages(messages) summary = await self._call_ai_api(formatted_messages, group_id, time_desc) return summary except Exception as e: logger.error(f"生成总结失败: {e}") return None async def _get_yesterday_messages(self, group_id: str) -> List[Dict]: """获取昨日群聊消息""" try: today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) yesterday_start = today - timedelta(days=1) yesterday_end = today with self.get_db_connection() as conn: with conn.cursor(pymysql.cursors.DictCursor) as cursor: sql = """ SELECT sender_wxid, nickname, content, create_time, msg_type FROM messages WHERE group_id = %s AND is_group = 1 AND msg_type = 'text' AND create_time >= %s AND create_time < %s AND LENGTH(TRIM(content)) > 0 ORDER BY create_time ASC """ cursor.execute(sql, (group_id, yesterday_start, yesterday_end)) messages = cursor.fetchall() logger.info(f"获取到群聊 {group_id} 昨日消息 {len(messages)} 条") return messages except Exception as e: logger.error(f"获取昨日消息失败: {e}") return [] async def _get_today_messages(self, group_id: str) -> List[Dict]: """获取今日群聊消息""" try: today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) tomorrow = today + timedelta(days=1) with self.get_db_connection() as conn: with conn.cursor(pymysql.cursors.DictCursor) as cursor: sql = """ SELECT sender_wxid, nickname, content, create_time, msg_type FROM messages WHERE group_id = %s AND is_group = 1 AND msg_type = 'text' AND create_time >= %s AND create_time < %s AND LENGTH(TRIM(content)) > 0 ORDER BY create_time ASC """ cursor.execute(sql, (group_id, today, tomorrow)) messages = cursor.fetchall() logger.info(f"获取到群聊 {group_id} 今日消息 {len(messages)} 条") return messages except Exception as e: logger.error(f"获取今日消息失败: {e}") return [] def _format_messages(self, messages: List[Dict]) -> str: """格式化消息为AI可理解的格式""" formatted_lines = [] for msg in messages: create_time = msg['create_time'] if isinstance(create_time, datetime): time_str = create_time.strftime("%H:%M:%S") else: time_str = str(create_time) nickname = msg.get('nickname') or msg['sender_wxid'][-8:] content = msg['content'].replace('\n', '。').strip() if len(content) > 200: content = content[:200] + "..." formatted_line = f'[{time_str}] {{"{nickname}": "{content}"}}--end--' formatted_lines.append(formatted_line) return '\n'.join(formatted_lines) async def _call_ai_api(self, formatted_messages: str, group_id: str, time_desc: str = "昨日") -> Optional[str]: """调用AI API生成总结""" try: system_prompt = """你是一个中文的群聊总结的助手,你可以为一个微信的群聊记录,提取并总结每个时间段大家在重点讨论的话题内容。 每一行代表一个人的发言,每一行的格式为: [time] {"nickname": "content"}--end-- 请帮我将给出的群聊内容总结成一个今日的群聊报告,包含不多于10个的话题的总结(如果还有更多话题,可以在后面简单补充)。每个话题包含以下内容: - 话题名(50字以内,带序号1️⃣2️⃣3️⃣,同时附带热度,以🔥数量表示) - 参与者(不超过5个人,将重复的人名去重) - 时间段(从几点到几点) - 过程(50到200字左右) - 评价(50字以下) - 分割线: ------------ 另外有以下要求: 1. 每个话题结束使用 ------------ 分割 2. 使用中文冒号 3. 无需大标题 4. 开始给出本群讨论风格的整体评价,例如活跃、太水、太黄、太暴力、话题不集中、无聊诸如此类""" user_content = f"群聊记录如下:\n{formatted_messages}" api_config = self.config["api"] payload = { "model": api_config["model"], "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "max_tokens": api_config["max_tokens"], "temperature": api_config.get("temperature", 0.7) } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_config['api_key']}" } timeout = aiohttp.ClientTimeout(total=api_config["timeout"]) proxy = None proxy_config = self.config.get("proxy", {}) if proxy_config.get("enabled", False): proxy_type = proxy_config.get("type", "http") proxy_host = proxy_config.get("host", "127.0.0.1") proxy_port = proxy_config.get("port", 7890) proxy = f"{proxy_type}://{proxy_host}:{proxy_port}" async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( api_config["base_url"], json=payload, headers=headers, proxy=proxy ) as resp: if resp.status != 200: error_text = await resp.text() logger.error(f"AI API 错误: {resp.status}, {error_text}") return None result = await resp.json() if "choices" not in result or not result["choices"]: logger.error("AI API 返回格式错误") return None content = result["choices"][0]["message"]["content"] return content.strip() except Exception as e: logger.error(f"调用AI API失败: {e}") return None async def _send_summary_as_chat_record(self, bot, group_id: str, summary: str, summary_type: str): """将总结以聊天记录格式发送""" try: stats = await self._get_chat_stats(group_id, summary_type == "今日") header = f"📊 #{summary_type}群聊总结\n让我们一起来看看群友们都聊了什么有趣的话题吧~\n" if stats: stats_text = f"\n📈 数据统计:\n• 发言人数:{stats['active_users']} 人\n• 消息总数:{stats['total_messages']} 条\n• 平均每人:{stats['avg_messages']:.1f} 条\n\n" else: stats_text = "\n" full_message = header + stats_text + summary # 直接使用聊天记录格式 await self._send_chat_records(bot, group_id, f"{summary_type}群聊总结", full_message) logger.success(f"{summary_type}群聊总结已发送到 {group_id}") except Exception as e: logger.error(f"发送{summary_type}总结失败: {e}") async def _generate_md_screenshot(self, group_id: str, summary_type: str, content: str) -> Optional[str]: """生成MD文件并截图""" try: from PIL import Image, ImageDraw, ImageFont # 创建输出目录(插件文件夹内) plugin_dir = Path(__file__).parent output_dir = plugin_dir / "images" output_dir.mkdir(exist_ok=True) # 清理表情符号 clean_content = content emoji_replacements = { '📊': '', '📈': '', '•': '·', '🔥': '', '1️⃣': '1.', '2️⃣': '2.', '3️⃣': '3.', '4️⃣': '4.', '5️⃣': '5.', '❌': '×', '✅': '√' } for emoji, replacement in emoji_replacements.items(): clean_content = clean_content.replace(emoji, replacement) lines = clean_content.split('\n') # 图片设置 width = 800 padding = 30 line_height = 28 card_padding = 20 # 计算足够的高度 estimated_height = len(lines) * line_height * 2 + 300 # 增加更多空间 img = Image.new('RGB', (width, estimated_height), '#f5f5f5') draw = ImageDraw.Draw(img) # 字体设置 try: title_font = ImageFont.truetype("msyhbd.ttc", 22) header_font = ImageFont.truetype("msyhbd.ttc", 18) content_font = ImageFont.truetype("msyh.ttc", 16) except: title_font = ImageFont.load_default() header_font = ImageFont.load_default() content_font = ImageFont.load_default() # 绘制顶部标题栏 header_height = 60 draw.rectangle([(0, 0), (width, header_height)], fill='#ffd700') # 标题文字 main_title = "群聊总结" title_bbox = draw.textbbox((0, 0), main_title, font=title_font) title_width = title_bbox[2] - title_bbox[0] title_x = (width - title_width) // 2 draw.text((title_x, 18), main_title, fill='#333333', font=title_font) # 内容区域 y = header_height + padding for line in lines: if not line.strip(): y += line_height // 2 continue # 去掉开头的表情符号标记 display_line = line.strip() if '总结' in line and line.startswith('#'): # 主标题 - 跳过,已在顶部显示 continue elif '数据统计' in line: # 数据统计卡片 card_height = 120 draw.rectangle([(padding, y), (width-padding, y+card_height)], fill='#e8f5e8', outline='#27ae60', width=2) draw.text((padding+card_padding, y+15), "数据统计", fill='#27ae60', font=header_font) y += 45 elif display_line.startswith(('·', '-')) and ('发言人数' in line or '消息总数' in line or '平均每人' in line): # 统计数据 draw.text((padding+card_padding, y), display_line, fill='#333333', font=content_font) y += line_height elif display_line.startswith(('1.', '2.', '3.', '4.', '5.')): # 话题标题卡片 if y > header_height + padding + 50: # 不是第一个卡片 y += 15 # 卡片间距 card_height = 40 draw.rectangle([(padding, y), (width-padding, y+card_height)], fill='#e3f2fd', outline='#2196f3', width=2) draw.text((padding+card_padding, y+10), display_line, fill='#1976d2', font=header_font) y += card_height + 10 elif display_line.startswith('参与者:'): wrapped_lines = self._wrap_text(display_line, width-padding*2-card_padding*2, content_font, draw) for wrapped_line in wrapped_lines: draw.text((padding+card_padding, y), wrapped_line, fill='#666666', font=content_font) y += line_height elif display_line.startswith('时间段:'): wrapped_lines = self._wrap_text(display_line, width-padding*2-card_padding*2, content_font, draw) for wrapped_line in wrapped_lines: draw.text((padding+card_padding, y), wrapped_line, fill='#666666', font=content_font) y += line_height elif display_line.startswith('过程:'): wrapped_lines = self._wrap_text(display_line, width-padding*2-card_padding*2, content_font, draw) for wrapped_line in wrapped_lines: draw.text((padding+card_padding, y), wrapped_line, fill='#333333', font=content_font) y += line_height elif display_line.startswith('评价:'): wrapped_lines = self._wrap_text(display_line, width-padding*2-card_padding*2, content_font, draw) for wrapped_line in wrapped_lines: draw.text((padding+card_padding, y), wrapped_line, fill='#666666', font=content_font) y += line_height y += 10 # 额外间距 elif '----' in line: # 分割线 draw.line([(padding+20, y+5), (width-padding-20, y+5)], fill='#cccccc', width=1) y += 15 else: # 其他内容也进行换行处理 wrapped_lines = self._wrap_text(display_line, width-padding*2-card_padding*2, content_font, draw) for wrapped_line in wrapped_lines: draw.text((padding+card_padding, y), wrapped_line, fill='#333333', font=content_font) y += line_height # 保存图片 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") group_short = group_id.split('@')[0][-8:] image_path = output_dir / f"summary_{group_short}_{timestamp}.jpg" img.save(image_path, 'JPEG', quality=90) logger.success(f"截图已生成: {image_path}") return str(image_path) except ImportError: logger.error("需要安装: pip install Pillow") return None except Exception as e: logger.error(f"截图失败: {e}") return None def _wrap_text(self, text, max_width, font, draw): """文本换行处理""" words = text.split() lines = [] current_line = "" for word in words: test_line = current_line + (" " if current_line else "") + word bbox = draw.textbbox((0, 0), test_line, font=font) if bbox[2] - bbox[0] <= max_width: current_line = test_line else: if current_line: lines.append(current_line) current_line = word if current_line: lines.append(current_line) return lines if lines else [text] async def _send_chat_records(self, bot, from_wxid: str, title: str, content: str): """发送聊天记录格式消息""" try: import uuid import time import hashlib import xml.etree.ElementTree as ET is_group = from_wxid.endswith("@chatroom") max_length = 800 content_parts = [] if len(content) <= max_length: content_parts = [content] else: lines = content.split('\n') current_part = "" for line in lines: if len(current_part + line + '\n') > max_length: if current_part: content_parts.append(current_part.strip()) current_part = line + '\n' else: content_parts.append(line[:max_length]) current_part = line[max_length:] + '\n' else: current_part += line + '\n' if current_part.strip(): content_parts.append(current_part.strip()) recordinfo = ET.Element("recordinfo") info_el = ET.SubElement(recordinfo, "info") info_el.text = title is_group_el = ET.SubElement(recordinfo, "isChatRoom") is_group_el.text = "1" if is_group else "0" datalist = ET.SubElement(recordinfo, "datalist") datalist.set("count", str(len(content_parts))) desc_el = ET.SubElement(recordinfo, "desc") desc_el.text = title fromscene_el = ET.SubElement(recordinfo, "fromscene") fromscene_el.text = "3" for i, part in enumerate(content_parts): di = ET.SubElement(datalist, "dataitem") di.set("datatype", "1") di.set("dataid", uuid.uuid4().hex) src_local_id = str((int(time.time() * 1000) % 90000) + 10000) new_msg_id = str(int(time.time() * 1000) + i) create_time = str(int(time.time()) - len(content_parts) + i) ET.SubElement(di, "srcMsgLocalid").text = src_local_id ET.SubElement(di, "sourcetime").text = time.strftime("%Y-%m-%d %H:%M", time.localtime(int(create_time))) ET.SubElement(di, "fromnewmsgid").text = new_msg_id ET.SubElement(di, "srcMsgCreateTime").text = create_time ET.SubElement(di, "sourcename").text = "AI助手" ET.SubElement(di, "sourceheadurl").text = "" ET.SubElement(di, "datatitle").text = part ET.SubElement(di, "datadesc").text = part ET.SubElement(di, "datafmt").text = "text" ET.SubElement(di, "ischatroom").text = "1" if is_group else "0" dataitemsource = ET.SubElement(di, "dataitemsource") ET.SubElement(dataitemsource, "hashusername").text = hashlib.sha256(from_wxid.encode("utf-8")).hexdigest() record_xml = ET.tostring(recordinfo, encoding="unicode") appmsg_parts = [ "", f"{title}", f"{title}", "19", "https://support.weixin.qq.com/cgi-bin/mmsupport-bin/readtemplate?t=page/favorite_record__w_unsupport", "", f"", "0", "" ] appmsg_xml = "".join(appmsg_parts) await bot._send_data_async(11214, {"to_wxid": from_wxid, "content": appmsg_xml}) logger.success(f"已发送聊天记录: {title}") except Exception as e: logger.error(f"发送聊天记录失败: {e}") async def _get_chat_stats(self, group_id: str, is_today: bool = False) -> Optional[Dict]: """获取群聊统计信息""" try: if is_today: today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) tomorrow = today + timedelta(days=1) start_time, end_time = today, tomorrow else: today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) yesterday_start = today - timedelta(days=1) start_time, end_time = yesterday_start, today with self.get_db_connection() as conn: with conn.cursor() as cursor: sql = """ SELECT COUNT(DISTINCT sender_wxid) as active_users, COUNT(*) as total_messages FROM messages WHERE group_id = %s AND is_group = 1 AND msg_type = 'text' AND create_time >= %s AND create_time < %s """ cursor.execute(sql, (group_id, start_time, end_time)) result = cursor.fetchone() if result: active_users, total_messages = result avg_messages = total_messages / max(active_users, 1) return { "active_users": active_users, "total_messages": total_messages, "avg_messages": avg_messages } except Exception as e: logger.error(f"获取群聊统计失败: {e}") return None def get_llm_tools(self) -> List[dict]: """返回LLM工具定义,供AIChat插件调用""" return [ { "type": "function", "function": { "name": "generate_summary", "description": "生成群聊总结,可以选择今日或昨日的聊天记录", "parameters": { "type": "object", "properties": { "time_period": { "type": "string", "description": "时间段,可选值:'today'(今日)或'yesterday'(昨日)", "enum": ["today", "yesterday"], "default": "yesterday" } }, "required": [] } } } ] async def execute_llm_tool(self, tool_name: str, arguments: dict, bot, from_wxid: str) -> dict: """执行LLM工具调用,供AIChat插件调用""" try: if self.config is None: return {"success": False, "message": "群聊总结插件未初始化"} if not self.config["behavior"]["enabled"]: return {"success": False, "message": "群聊总结插件未启用"} # 检查群聊权限 is_group = from_wxid.endswith("@chatroom") if not is_group: return {"success": False, "message": "群聊总结只能在群聊中使用"} enabled_groups = self.config["behavior"]["enabled_groups"] disabled_groups = self.config["behavior"]["disabled_groups"] if from_wxid in disabled_groups or (enabled_groups and from_wxid not in enabled_groups): return {"success": False, "message": "此群聊未启用总结功能"} if tool_name == "generate_summary": time_period = arguments.get("time_period", "yesterday") is_today = time_period == "today" summary_type = "今日" if is_today else "昨日" logger.info(f"LLM工具调用生成{summary_type}群聊总结: {from_wxid}") await bot.send_text(from_wxid, f"🤖 正在生成{summary_type}群聊总结,请稍候...") summary = await self._generate_summary(from_wxid, is_today) if summary: await self._send_summary_as_chat_record(bot, from_wxid, summary, summary_type) return {"success": True, "message": f"{summary_type}群聊总结已生成并发送"} else: await bot.send_text(from_wxid, f"❌ 生成{summary_type}总结失败,可能是聊天记录不足或AI服务异常") return {"success": False, "message": f"生成{summary_type}总结失败"} else: return None # 不是本插件的工具,返回None让其他插件处理 except Exception as e: logger.error(f"LLM工具执行失败: {e}") return {"success": False, "message": f"执行失败: {str(e)}"}