# -*- coding: utf-8 -*- import json import re import xml.etree.ElementTree as ET from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator from db.member_context_db import MemberContextDBOperator from db.message_storage import MessageStorageDB from plugins.ai_auto_response.memory.group_memory_profile import GroupMemoryService from plugins.member_context.service import MemberContextService from utils.ai.unified_llm import UnifiedLLMClient from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.decorator.rate_limit_decorator import group_feature_rate_limit, user_feature_rate_limit from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.wechat.contact_manager import ContactManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.message import WxMessage class MemberRoastService: """成员锐评服务。 设计目标: 1. 尽量复用现有“成员画像 + 群画像 + 最近消息”能力,避免再做一套平行画像系统; 2. 锐评时优先吃稳定画像,再辅以最近 50 条发言做“当期状态”补充; 3. 这样生成出来的内容既不像随机骂人,也不会完全被旧画像绑死。 """ def __init__(self, db_manager: DBConnectionManager, plugin_config: Optional[Dict[str, Any]] = None): self.db_manager = db_manager self.plugin_config = plugin_config or {} self.contacts_db = ContactsDBOperator(db_manager) self.member_context_db = MemberContextDBOperator(db_manager) self.message_db = MessageStorageDB(db_manager) self.member_context_service = MemberContextService(db_manager, plugin_config or {}) # 群画像服务本身已经带“按天/按总结刷新”的快照逻辑,直接复用即可。 self.group_memory_service = GroupMemoryService(db_manager, {}) self.LOG = logger profile_cfg = self.plugin_config.get("profile", {}) or {} self.sample_days = max(int(profile_cfg.get("sample_days", 30) or 30), 1) self.message_limit = max(int(profile_cfg.get("message_limit", 50) or 50), 1) self.min_message_count = max(int(profile_cfg.get("min_message_count", 8) or 8), 1) self.context_stale_hours = max(int(profile_cfg.get("context_stale_hours", 24) or 24), 1) def build_roast_payload(self, group_id: str, target_wxid: str) -> Tuple[bool, Dict[str, Any]]: """构建锐评所需的完整画像载荷。""" member_info = self.contacts_db.get_chatroom_member_info(group_id, target_wxid) or {} display_name = ( member_info.get("display_name") or member_info.get("nick_name") or ContactManager.get_instance().get_group_name(group_id, target_wxid) or target_wxid ) group_profile = self.contacts_db.get_chatroom_profile(group_id) or {} group_name = str(group_profile.get("nick_name") or "").strip() recent_messages = self.message_db.get_member_recent_messages( group_id, target_wxid, days=self.sample_days, limit=self.message_limit, include_today=True, ) or [] if len(recent_messages) < self.min_message_count: return False, { "error": ( f"素材不够,最近只找到 {len(recent_messages)} 条有效发言," f"至少需要 {self.min_message_count} 条才能锐评得像样。" ) } member_context = self._load_or_refresh_member_context(group_id, target_wxid) group_memory_profile = self.group_memory_service.build_group_memory_profile(group_id, group_name=group_name) active_dates = self.message_db.get_member_active_dates(group_id, target_wxid, days=min(self.sample_days, 180)) or [] payload = { "group_id": group_id, "group_name": group_name, "target_wxid": target_wxid, "display_name": display_name, "member_context": member_context or {}, "group_memory_profile": group_memory_profile or {}, "recent_messages": recent_messages, "message_count": len(recent_messages), "active_days": len(active_dates), "last_active_at": recent_messages[-1].get("timestamp") if recent_messages else "", } return True, payload def _load_or_refresh_member_context(self, group_id: str, target_wxid: str) -> Dict[str, Any]: """读取成员画像,必要时做一次轻刷新。 这里不强制每次都实时重建: 1. 优先使用已有画像,保证调用速度; 2. 只有画像不存在,或者已明显过期时,才重新构建; 3. 这样既兼顾体验,也能让锐评尽量吃到比较新的“人设信息”。 """ member_context = self.member_context_db.get_member_context(group_id, target_wxid) or {} if not member_context: refreshed = self.member_context_service.build_member_context( group_id, target_wxid, days=self.sample_days, limit=self.message_limit, ensure_group_daily=True, enable_weekly_digest=True, enable_monthly_digest=True, ) self.member_context_db.save_member_context(refreshed) return refreshed last_profiled_at = self._safe_parse_datetime(str(member_context.get("last_profiled_at", "") or "")) if not last_profiled_at: return member_context if datetime.now() - last_profiled_at <= timedelta(hours=self.context_stale_hours): return member_context try: refreshed = self.member_context_service.build_member_context( group_id, target_wxid, days=self.sample_days, limit=self.message_limit, ensure_group_daily=False, enable_weekly_digest=True, enable_monthly_digest=True, ) self.member_context_db.save_member_context(refreshed) return refreshed except Exception as e: # 画像刷新失败时回退旧画像: # 1. 锐评功能本身不应因为画像补刷新失败而整体不可用; # 2. 旧画像 + 最近消息 仍然足够支撑一版可用输出。 self.LOG.warning(f"[成员锐评] 画像过期后刷新失败,回退旧画像: group={group_id}, wxid={target_wxid}, error={e}") return member_context @staticmethod def _safe_parse_datetime(value: str) -> Optional[datetime]: """安全解析时间字符串。""" text = str(value or "").strip() if not text: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): try: return datetime.strptime(text, fmt) except ValueError: continue return None class MemberRoastPlugin(MessagePluginInterface): """成员锐评插件。 用户场景: 1. @机器人 锐评一下 @某人 2. @机器人 锐评一下我 3. @机器人 锐评一下 张三 玩法目标: 1. 利用现有成员画像和群画像,让模型“骂得像认识这个人”; 2. 保持群聊传播性,输出足够犀利,但不要跨到恶意辱骂; 3. 尽量做到一句命令就能引爆围观,而不需要复杂多轮交互。 """ FEATURE_KEY = "MEMBER_ROAST" FEATURE_DESCRIPTION = "🗡️ 成员锐评 [@机器人 锐评一下 @某人]" @property def name(self) -> str: return "成员锐评" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "结合成员画像与最近发言,对指定群成员进行幽默锐评。" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.feature = self.register_feature() self.service: Optional[MemberRoastService] = None self.llm_client: Optional[UnifiedLLMClient] = None self.bot: Optional[WechatAPIClient] = None self.revoke: Optional[MessageAutoRevoke] = None self.contacts_db: Optional[ContactsDBOperator] = None self.enable = True self.command_format = "" self.max_output_chars = 320 self.min_output_chars = 140 self.sharpness_level = "high" self.name_match_min_chars = 2 def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件。""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") cfg = self._config.get("MemberRoast", {}) or {} self.enable = bool(cfg.get("enable", True)) self._commands = cfg.get("command", ["锐评一下", "锐评", "吐槽一下", "锐评我", "吐槽我"]) self.command_format = str( cfg.get( "command_format", "锐评插件指令:\n@机器人 锐评一下 @某人\n@机器人 锐评一下我", ) ) style_cfg = self._config.get("style", {}) or {} self.max_output_chars = max(int(style_cfg.get("max_output_chars", 320) or 320), 80) self.min_output_chars = max(int(style_cfg.get("min_output_chars", 140) or 140), 40) self.sharpness_level = str(style_cfg.get("sharpness_level", "high") or "high").strip().lower() profile_cfg = self._config.get("profile", {}) or {} self.name_match_min_chars = max(int(profile_cfg.get("name_match_min_chars", 2) or 2), 1) db_manager = context.get("db_manager") if not db_manager: self.LOG.error(f"[{self.name}] 缺少 db_manager,初始化失败") return False self.contacts_db = ContactsDBOperator(db_manager) self.service = MemberRoastService(db_manager, self._config) self.llm_client = UnifiedLLMClient(self._config.get("llm", {}) or {"scene": "chat.main"}) self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """判断消息是否可能触发锐评。 这里不只看首词: 1. 用户实际会说“@机器人 锐评一下 @张三”; 2. 这时首词往往是 @机器人,而不是“锐评一下”; 3. 因此需要做关键字级别的宽匹配。 """ if not self.enable: return False roomid = str(message.get("roomid", "") or "").strip() if not roomid: return False content = self._normalize_text(message.get("content", "")) if not content: return False return bool(re.search(r"(锐评(?:一下)?|吐槽一下|锐评我|吐槽我)", content)) @plugin_stats_decorator(plugin_name="成员锐评") @plugin_points_cost(10, "成员锐评消耗积分", FEATURE_KEY) @group_feature_rate_limit(max_per_minute=12, feature_key=FEATURE_KEY) @user_feature_rate_limit(max_per_minute=4, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理锐评请求。""" content = self._normalize_text(message.get("content", "")) sender = str(message.get("sender", "") or "").strip() roomid = str(message.get("roomid", "") or "").strip() self.bot = message.get("bot") self.revoke = message.get("revoke") gbm: GroupBotManager = message.get("gbm") if not roomid: return False, "仅支持群聊" if not self.bot: return False, "bot 未初始化" if not self.service or not self.llm_client or not self.contacts_db: return False, "服务未初始化" if gbm and roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" # 群里更推荐 @机器人 触发,避免普通聊天里出现“锐评”二字时误打到插件。 if not bool(message.get("is_at", False)) and not content.startswith(("锐评", "吐槽")): return False, None target_ok, target_payload = self._resolve_target_member(message, content) if not target_ok: await self._send_fail_message(roomid, sender, target_payload.get("error", "目标解析失败")) return False, target_payload.get("error", "目标解析失败") target_wxid = str(target_payload.get("target_wxid") or "").strip() target_name = str(target_payload.get("target_name") or target_wxid).strip() if not target_wxid: await self._send_fail_message(roomid, sender, "没找到要锐评的人。") return False, "未找到目标" wait_msg = f"🧐 正在翻 {target_name} 的群聊黑历史,稍等我组织一下语言…" client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(roomid, wait_msg, sender) if self.revoke: self.revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 5) ok, roast_payload = self.service.build_roast_payload(roomid, target_wxid) if not ok: await self._send_fail_message(roomid, sender, roast_payload.get("error", "素材不足,今天先放他一马。")) return False, roast_payload.get("error", "素材不足") requester_name = ContactManager.get_instance().get_group_name(roomid, sender) or sender roast_text = self._generate_roast_text(roast_payload, requester_name=requester_name) if not roast_text: await self._send_fail_message(roomid, sender, "模型今天嘴有点钝,稍后再来试试。") return False, "模型输出为空" final_text = self._post_process_roast_text(roast_text, target_name) client_msg_id, create_time, new_msg_id = await self.bot.send_at_message(roomid, final_text, [sender]) if self.revoke: self.revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 120) return True, "锐评成功" def _resolve_target_member(self, message: Dict[str, Any], content: str) -> Tuple[bool, Dict[str, Any]]: """解析本次要被锐评的目标成员。""" sender = str(message.get("sender", "") or "").strip() roomid = str(message.get("roomid", "") or "").strip() wx_msg: WxMessage = message.get("full_wx_msg") raw_xml = wx_msg.msg_source if wx_msg else "" at_users = self._parse_at_users(raw_xml) bot_wxid = str(getattr(self.bot, "wxid", "") or "").strip() # @列表里如果带机器人自己,需要先排除掉,再决定是否还有真正的目标成员。 target_at_users = [uid for uid in at_users if uid and uid != bot_wxid] if len(target_at_users) > 1: return False, {"error": "一次只能锐评一个人,别让我开群体AOE。"} if len(target_at_users) == 1: target_wxid = target_at_users[0] target_name = self._get_member_display_name(roomid, target_wxid) return True, {"target_wxid": target_wxid, "target_name": target_name} # 没有真正的 @目标时,优先支持“锐评一下我”。 normalized = content.replace(" ", "").replace("\u2005", "") if any(keyword in normalized for keyword in ("锐评一下我", "锐评我", "吐槽我", "吐槽一下我")): return True, {"target_wxid": sender, "target_name": self._get_member_display_name(roomid, sender)} candidate_name = self._extract_name_candidate(content) if not candidate_name: return False, {"error": "请 @一个人,或者直接说“锐评一下我”。"} matched = self._match_member_by_name(roomid, candidate_name) if not matched: return False, {"error": f"没在群里找到“{candidate_name}”这个人。"} if len(matched) > 1: names = "、".join([item["target_name"] for item in matched[:3]]) return False, {"error": f"“{candidate_name}”对应了多个人:{names},建议直接 @TA。"} return True, matched[0] def _extract_name_candidate(self, content: str) -> str: """从文本里提取未 @ 时的目标名称候选。""" text = self._normalize_text(content) # 先去掉常见的 @机器人文案残留,尽量只保留命令后面真正的目标名。 text = re.sub(r"^@\S+\s*", "", text) text = re.sub(r"(锐评(?:一下)?|吐槽一下|吐槽我|锐评我)", " ", text, count=1) text = re.sub(r"[@#::,,。.!!??\[\]\(\)]", " ", text) text = re.sub(r"\s+", " ", text).strip() if len(text) < self.name_match_min_chars: return "" return text def _match_member_by_name(self, roomid: str, candidate_name: str) -> List[Dict[str, str]]: """按群昵称/微信昵称模糊匹配成员。 匹配优先级: 1. 完全匹配; 2. 去空格后完全匹配; 3. 包含匹配。 """ if not self.contacts_db: return [] candidate = self._normalize_name(candidate_name) if not candidate: return [] members = self.contacts_db.get_chatroom_member_list(roomid) or [] exact_matches: List[Dict[str, str]] = [] contains_matches: List[Dict[str, str]] = [] for member in members: wxid = str(member.get("wxid") or "").strip() if not wxid: continue display_name = str(member.get("display_name") or "").strip() nick_name = str(member.get("nick_name") or "").strip() for raw_name in [display_name, nick_name]: normalized = self._normalize_name(raw_name) if not normalized: continue payload = {"target_wxid": wxid, "target_name": display_name or nick_name or wxid} if normalized == candidate: exact_matches.append(payload) break if candidate in normalized: contains_matches.append(payload) break if exact_matches: return self._dedup_target_list(exact_matches) return self._dedup_target_list(contains_matches) @staticmethod def _dedup_target_list(items: List[Dict[str, str]]) -> List[Dict[str, str]]: """按 wxid 去重。""" result: List[Dict[str, str]] = [] seen = set() for item in items: wxid = str(item.get("target_wxid") or "").strip() if not wxid or wxid in seen: continue seen.add(wxid) result.append(item) return result def _generate_roast_text(self, payload: Dict[str, Any], requester_name: str = "") -> str: """调用大模型生成锐评文案。""" system_prompt = self._build_system_prompt() user_prompt = self._build_user_prompt(payload, requester_name=requester_name) text = self.llm_client.chat( system_prompt=system_prompt, user_prompt=user_prompt, user_id=f"member_roast::{payload.get('group_id', '')}::{payload.get('target_wxid', '')}", ) return str(text or "").strip() def _build_system_prompt(self) -> str: """构建系统提示词。 这份提示词重点控制三件事: 1. 输出要像群友毒舌,而不是客服分析; 2. 必须建立在已有素材上,不能瞎编私货; 3. 锐评要有梗,但不能跨到恶意羞辱。 """ sharpness_hint = "允许明显犀利、能扎心,但必须像熟人调侃,不能恶毒。" if self.sharpness_level == "medium": sharpness_hint = "允许轻到中度犀利,以调侃为主,不要真伤人。" elif self.sharpness_level == "low": sharpness_hint = "以幽默吐槽为主,别太重。" return f""" 你是微信群里的“人设观察员”,擅长用幽默、犀利、带梗的口吻总结一个人的群聊人设。 你的任务不是分析报告,而是写一段会让群友围观、会心一笑、觉得“这人还真就是这样”的锐评。 核心要求: 1. 只能基于给定的成员画像、群画像和最近发言素材输出,不要编造未出现过的经历、人设、职业、家庭、隐私。 2. {sharpness_hint} 3. 可以用捧杀、阴阳、轻微毒舌、反转、梗化总结,但不能出现脏话、恶意辱骂、羞辱外貌、攻击疾病、地域、性别、民族、宗教等受保护属性。 4. 不要写成正经心理测评,不要出现“根据数据分析”“从画像看”这种机器味句式。 5. 优先抓:群聊人设、常见话术、反复出现的行为模式、擅长点、翻车点、群内角色。 6. 如果素材不够扎实,就收一点,写成“轻评版”,不要硬编大招。 输出格式要求: 1. 第一行固定为:`【锐评 目标昵称】` 2. 正文写 4 到 6 行,每行一句,适合直接发群里。 3. 最后一行必须是“一句话绝杀”式收尾。 4. 总长度控制在 {self.min_output_chars} 到 {self.max_output_chars} 字之间,宁可短狠,不要注水。 """.strip() def _build_user_prompt(self, payload: Dict[str, Any], requester_name: str = "") -> str: """拼装用户提示词。""" member_context = payload.get("member_context", {}) or {} group_memory_profile = payload.get("group_memory_profile", {}) or {} recent_messages = payload.get("recent_messages", []) or [] meta = member_context.get("meta", {}) or {} group_style = group_memory_profile.get("style_profile", {}) or {} # 最近 50 条发言是这次锐评最关键的“即时素材”: # 1. 画像决定“这个人长期像谁”; # 2. 最近发言决定“这阵子他又在发什么病”; # 3. 两者结合,模型才更容易产出既稳定又有当期节目效果的锐评。 recent_lines = [] for idx, item in enumerate(recent_messages, start=1): ts = item.get("timestamp") if isinstance(ts, datetime): ts_text = ts.strftime("%m-%d %H:%M") else: ts_text = str(ts or "")[5:16] if str(ts or "") else "" content = str(item.get("content", "") or "").replace("\n", " ").strip() if not content: continue recent_lines.append(f"{idx}. [{ts_text}] {content}") prompt_payload = { "任务说明": "请基于以下真实素材,为目标成员写一段有传播性的群聊锐评。", "目标成员": { "昵称": payload.get("display_name", ""), "最近样本消息数": payload.get("message_count", 0), "活跃天数": payload.get("active_days", 0), "最后活跃时间": payload.get("last_active_at", ""), }, "成员长期画像": { "摘要": member_context.get("summary_text", ""), "活跃等级": member_context.get("activity_level", ""), "发言模式": member_context.get("message_pattern", ""), "互动风格": member_context.get("interaction_style", ""), "回复建议": member_context.get("response_style_hint", ""), "长期主题": member_context.get("topics_of_interest", []), "近期关注": member_context.get("recent_focus", []), "稳定特征": meta.get("stable_traits", []), "习惯模式": meta.get("habit_patterns", []), "表达标记": meta.get("expression_profile", []), "技能侧重点": meta.get("skill_profile", []), "群内角色": meta.get("group_role", ""), "气质倾向": meta.get("temperament_tendency", ""), "近期状态": meta.get("recent_state", []), }, "群聊背景": { "群名": payload.get("group_name", ""), "长期主题": group_memory_profile.get("focus_topics", []), "群风格": group_style, "群摘要": group_memory_profile.get("summary_text", ""), }, "最近发言样本": recent_lines, "额外要求": [ "要像熟人看破不说破,不要像机器写分析。", "可以先抬后杀,也可以连续抓几个典型症状。", "如果这人明显偏技术、答疑、摸鱼、抽象、嘴硬、复读机、群气氛组,请点出来。", f"发起请求的人是:{requester_name}" if requester_name else "", ], } return json.dumps(prompt_payload, ensure_ascii=False, indent=2) def _post_process_roast_text(self, roast_text: str, target_name: str) -> str: """清洗模型输出,统一成更适合直接发群的格式。""" text = str(roast_text or "").strip() text = re.sub(r"^```(?:json|markdown|text)?", "", text, flags=re.IGNORECASE).strip() text = re.sub(r"```$", "", text).strip() text = re.sub(r"\n{3,}", "\n\n", text) # 如果模型忘了带头标题,这里兜底补上,保证最终输出辨识度稳定。 if not text.startswith("【锐评"): text = f"【锐评 {target_name}】\n{text}" # 模型有时会过度铺陈,这里做一次保底裁切,优先保持群聊传播性。 if len(text) > self.max_output_chars + 80: text = text[: self.max_output_chars + 80].rstrip(",,、;; ").rstrip() + "。" return text async def _send_fail_message(self, roomid: str, sender: str, fail_text: str) -> None: """统一发送失败提示。""" client_msg_id, create_time, new_msg_id = await self.bot.send_at_message(roomid, f"❌ {fail_text}", [sender]) if self.revoke: self.revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 8) def _get_member_display_name(self, roomid: str, wxid: str) -> str: """获取群内成员展示名。""" if self.contacts_db: member = self.contacts_db.get_chatroom_member_info(roomid, wxid) or {} display_name = str(member.get("display_name") or member.get("nick_name") or "").strip() if display_name: return display_name return ContactManager.get_instance().get_group_name(roomid, wxid) or wxid @staticmethod def _parse_at_users(raw_xml: str) -> List[str]: """解析消息 XML 里的 @用户列表。""" raw_xml = str(raw_xml or "").strip() if not raw_xml: return [] at_user_list_text = "" try: root = ET.fromstring(raw_xml) node = root.find(".//atuserlist") if node is not None and node.text: at_user_list_text = str(node.text).strip() except Exception: match = re.search(r"", raw_xml, flags=re.IGNORECASE | re.DOTALL) if match: at_user_list_text = str(match.group(1) or "").strip() if not at_user_list_text: return [] seen = set() result: List[str] = [] for user_id in re.split(r"[,\s;]+", at_user_list_text): normalized = str(user_id or "").strip() if not normalized or normalized in seen: continue seen.add(normalized) result.append(normalized) return result @staticmethod def _normalize_text(value: Any) -> str: """清洗文本中的多余空白。""" text = str(value or "").replace("\u2005", " ").replace("\xa0", " ") text = re.sub(r"\s+", " ", text) return text.strip() @staticmethod def _normalize_name(value: str) -> str: """规范化昵称用于模糊匹配。""" text = str(value or "").strip().lower() text = re.sub(r"\s+", "", text) return text