920 lines
45 KiB
Python
920 lines
45 KiB
Python
# -*- coding: utf-8 -*-
|
||
import json
|
||
import re
|
||
import xml.etree.ElementTree as ET
|
||
from collections import Counter
|
||
from datetime import datetime, timedelta
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
from loguru import logger
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from db.connection import DBConnectionManager
|
||
from db.contacts_db import ContactsDBOperator
|
||
from db.member_digest_db import MemberDigestDBOperator
|
||
from db.member_context_db import MemberContextDBOperator
|
||
from db.message_storage import MessageStorageDB
|
||
from db.message_summary_db import MessageSummaryDBOperator
|
||
from plugins.ai_auto_response.memory.group_memory_profile import GroupMemoryService
|
||
from plugins.member_context.service import MemberContextService
|
||
from utils.ai.unified_llm import UnifiedLLMClient
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.decorator.points_decorator import plugin_points_cost
|
||
from utils.decorator.rate_limit_decorator import group_feature_rate_limit, user_feature_rate_limit
|
||
from utils.revoke.message_auto_revoke import MessageAutoRevoke
|
||
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
|
||
from utils.wechat.contact_manager import ContactManager
|
||
from wechat_ipad import WechatAPIClient
|
||
from wechat_ipad.models.message import WxMessage
|
||
|
||
|
||
class MemberRoastService:
|
||
"""成员锐评服务。
|
||
|
||
设计目标:
|
||
1. 尽量复用现有“成员画像 + 群画像 + 最近消息”能力,避免再做一套平行画像系统;
|
||
2. 锐评时优先吃稳定画像,再辅以最近 50 条发言做“当期状态”补充;
|
||
3. 这样生成出来的内容既不像随机骂人,也不会完全被旧画像绑死。
|
||
"""
|
||
|
||
def __init__(self, db_manager: DBConnectionManager, plugin_config: Optional[Dict[str, Any]] = None):
|
||
self.db_manager = db_manager
|
||
self.plugin_config = plugin_config or {}
|
||
self.contacts_db = ContactsDBOperator(db_manager)
|
||
self.member_digest_db = MemberDigestDBOperator(db_manager)
|
||
self.member_context_db = MemberContextDBOperator(db_manager)
|
||
self.message_db = MessageStorageDB(db_manager)
|
||
self.message_summary_db = MessageSummaryDBOperator(db_manager)
|
||
self.member_context_service = MemberContextService(db_manager, plugin_config or {})
|
||
# 群画像服务本身已经带“按天/按总结刷新”的快照逻辑,直接复用即可。
|
||
self.group_memory_service = GroupMemoryService(db_manager, {})
|
||
self.LOG = logger
|
||
|
||
profile_cfg = self.plugin_config.get("profile", {}) or {}
|
||
self.sample_days = max(int(profile_cfg.get("sample_days", 30) or 30), 1)
|
||
# 最近发言样本默认提升到 200 条:
|
||
# 1. 锐评比普通摘要更依赖口头禅、重复行为和阶段性状态;
|
||
# 2. 样本过少时,模型容易只抓住最近几句偶发发言,导致人设偏差;
|
||
# 3. 因此这里放宽窗口,让模型更容易看出“长期稳定抽象点”。
|
||
self.message_limit = max(int(profile_cfg.get("message_limit", 200) or 200), 1)
|
||
self.min_message_count = max(int(profile_cfg.get("min_message_count", 8) or 8), 1)
|
||
self.context_stale_hours = max(int(profile_cfg.get("context_stale_hours", 24) or 24), 1)
|
||
self.history_profile_days = max(int(profile_cfg.get("history_profile_days", 60) or 60), 1)
|
||
self.history_member_digest_limit = max(int(profile_cfg.get("history_member_digest_limit", 16) or 16), 1)
|
||
self.history_group_summary_limit = max(int(profile_cfg.get("history_group_summary_limit", 10) or 10), 1)
|
||
|
||
def build_roast_payload(self, group_id: str, target_wxid: str) -> Tuple[bool, Dict[str, Any]]:
|
||
"""构建锐评所需的完整画像载荷。"""
|
||
member_info = self.contacts_db.get_chatroom_member_info(group_id, target_wxid) or {}
|
||
display_name = (
|
||
member_info.get("display_name")
|
||
or member_info.get("nick_name")
|
||
or ContactManager.get_instance().get_group_name(group_id, target_wxid)
|
||
or target_wxid
|
||
)
|
||
group_profile = self.contacts_db.get_chatroom_profile(group_id) or {}
|
||
group_name = str(group_profile.get("nick_name") or "").strip()
|
||
|
||
recent_messages = self.message_db.get_member_recent_messages(
|
||
group_id,
|
||
target_wxid,
|
||
days=self.sample_days,
|
||
limit=self.message_limit,
|
||
include_today=True,
|
||
) or []
|
||
|
||
if len(recent_messages) < self.min_message_count:
|
||
return False, {
|
||
"error": (
|
||
f"素材不够,最近只找到 {len(recent_messages)} 条有效发言,"
|
||
f"至少需要 {self.min_message_count} 条才能锐评得像样。"
|
||
)
|
||
}
|
||
|
||
member_context = self._load_or_refresh_member_context(group_id, target_wxid)
|
||
historical_member_profile = self._build_historical_member_profile(group_id, target_wxid)
|
||
group_memory_profile = self.group_memory_service.build_group_memory_profile(group_id, group_name=group_name)
|
||
historical_group_profile = self._build_historical_group_profile(group_id)
|
||
active_dates = self.message_db.get_member_active_dates(group_id, target_wxid, days=min(self.sample_days, 180)) or []
|
||
|
||
payload = {
|
||
"group_id": group_id,
|
||
"group_name": group_name,
|
||
"target_wxid": target_wxid,
|
||
"display_name": display_name,
|
||
"member_context": member_context or {},
|
||
"historical_member_profile": historical_member_profile or {},
|
||
"group_memory_profile": group_memory_profile or {},
|
||
"historical_group_profile": historical_group_profile or {},
|
||
"recent_messages": recent_messages,
|
||
"message_count": len(recent_messages),
|
||
"active_days": len(active_dates),
|
||
"last_active_at": recent_messages[-1].get("timestamp") if recent_messages else "",
|
||
}
|
||
return True, payload
|
||
|
||
def _load_or_refresh_member_context(self, group_id: str, target_wxid: str) -> Dict[str, Any]:
|
||
"""读取成员画像,必要时做一次轻刷新。
|
||
|
||
这里不强制每次都实时重建:
|
||
1. 优先使用已有画像,保证调用速度;
|
||
2. 只有画像不存在,或者已明显过期时,才重新构建;
|
||
3. 这样既兼顾体验,也能让锐评尽量吃到比较新的“人设信息”。
|
||
"""
|
||
member_context = self.member_context_db.get_member_context(group_id, target_wxid) or {}
|
||
if not member_context:
|
||
refreshed = self.member_context_service.build_member_context(
|
||
group_id,
|
||
target_wxid,
|
||
days=self.sample_days,
|
||
limit=self.message_limit,
|
||
ensure_group_daily=True,
|
||
enable_weekly_digest=True,
|
||
enable_monthly_digest=True,
|
||
)
|
||
self.member_context_db.save_member_context(refreshed)
|
||
return refreshed
|
||
|
||
last_profiled_at = self._safe_parse_datetime(str(member_context.get("last_profiled_at", "") or ""))
|
||
if not last_profiled_at:
|
||
return member_context
|
||
|
||
if datetime.now() - last_profiled_at <= timedelta(hours=self.context_stale_hours):
|
||
return member_context
|
||
|
||
try:
|
||
refreshed = self.member_context_service.build_member_context(
|
||
group_id,
|
||
target_wxid,
|
||
days=self.sample_days,
|
||
limit=self.message_limit,
|
||
ensure_group_daily=False,
|
||
enable_weekly_digest=True,
|
||
enable_monthly_digest=True,
|
||
)
|
||
self.member_context_db.save_member_context(refreshed)
|
||
return refreshed
|
||
except Exception as e:
|
||
# 画像刷新失败时回退旧画像:
|
||
# 1. 锐评功能本身不应因为画像补刷新失败而整体不可用;
|
||
# 2. 旧画像 + 最近消息 仍然足够支撑一版可用输出。
|
||
self.LOG.warning(f"[成员锐评] 画像过期后刷新失败,回退旧画像: group={group_id}, wxid={target_wxid}, error={e}")
|
||
return member_context
|
||
|
||
@staticmethod
|
||
def _safe_parse_datetime(value: str) -> Optional[datetime]:
|
||
"""安全解析时间字符串。"""
|
||
text = str(value or "").strip()
|
||
if not text:
|
||
return None
|
||
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
|
||
try:
|
||
return datetime.strptime(text, fmt)
|
||
except ValueError:
|
||
continue
|
||
return None
|
||
|
||
def _build_historical_member_profile(self, group_id: str, target_wxid: str) -> Dict[str, Any]:
|
||
"""构建近两个月成员历史画像摘要。
|
||
|
||
数据来源优先级:
|
||
1. 成员周/月摘要,代表更稳定的阶段画像;
|
||
2. 少量近期日摘要,用来补充“最近几次明显状态变化”;
|
||
3. 最终输出压缩成结构化摘要,避免把几十条 digest 原文直接塞给模型。
|
||
"""
|
||
since_time = datetime.now() - timedelta(days=self.history_profile_days)
|
||
weekly_digests = self.member_digest_db.list_digests(group_id, target_wxid, "weekly", limit=self.history_member_digest_limit) or []
|
||
monthly_digests = self.member_digest_db.list_digests(group_id, target_wxid, "monthly", limit=max(self.history_member_digest_limit, 8)) or []
|
||
daily_digests = self.member_digest_db.list_digests(group_id, target_wxid, "daily", limit=min(self.history_member_digest_limit, 8)) or []
|
||
|
||
def in_range(item: Dict[str, Any]) -> bool:
|
||
end_time = self._safe_parse_datetime(str(item.get("period_end", "") or ""))
|
||
if not end_time:
|
||
return False
|
||
return end_time >= since_time
|
||
|
||
weekly_digests = [item for item in weekly_digests if in_range(item)]
|
||
monthly_digests = [item for item in monthly_digests if in_range(item)]
|
||
daily_digests = [item for item in daily_digests if in_range(item)]
|
||
|
||
focus_keys = [
|
||
"long_term_topics", "stable_topics", "topics", "identity_traits", "skill_profile",
|
||
"problem_solving_profile", "habit_patterns", "expression_profile", "recent_state",
|
||
"engagement_traits", "reply_preferences", "long_term_reply_preferences",
|
||
]
|
||
phrase_keys = [
|
||
"group_role", "temperament_tendency", "decision_profile", "message_pattern",
|
||
"interaction_style", "phase_state",
|
||
]
|
||
|
||
list_counter: Dict[str, Dict[str, int]] = {key: {} for key in focus_keys}
|
||
phrase_counter: Dict[str, Dict[str, int]] = {key: {} for key in phrase_keys}
|
||
timeline: List[str] = []
|
||
|
||
def push_count(bucket: Dict[str, Dict[str, int]], key: str, value: str, weight: int = 1) -> None:
|
||
normalized = str(value or "").strip()
|
||
if not normalized:
|
||
return
|
||
bucket[key][normalized] = bucket[key].get(normalized, 0) + weight
|
||
|
||
# 月摘要权重大于周摘要,周摘要又高于日摘要。
|
||
weighted_sources = [
|
||
(monthly_digests, 3),
|
||
(weekly_digests, 2),
|
||
(daily_digests, 1),
|
||
]
|
||
for digest_list, weight in weighted_sources:
|
||
for item in digest_list:
|
||
structured = item.get("structured", {}) or {}
|
||
summary_text = str(item.get("summary_text", "") or "").strip()
|
||
period_key = str(item.get("period_key", "") or "")
|
||
if summary_text:
|
||
timeline.append(f"{period_key}: {summary_text[:80]}")
|
||
|
||
for key in focus_keys:
|
||
values = structured.get(key, []) or []
|
||
if not isinstance(values, list):
|
||
values = [values]
|
||
for value in values:
|
||
push_count(list_counter, key, str(value or ""), weight=weight)
|
||
|
||
for key in phrase_keys:
|
||
push_count(phrase_counter, key, str(structured.get(key, "") or ""), weight=weight)
|
||
|
||
def top_items(counter_map: Dict[str, int], limit: int = 4) -> List[str]:
|
||
ordered = sorted(counter_map.items(), key=lambda item: (-item[1], len(item[0])))
|
||
return [name for name, _ in ordered[:limit]]
|
||
|
||
history_summary = {
|
||
"history_days": self.history_profile_days,
|
||
"source_counts": {
|
||
"monthly": len(monthly_digests),
|
||
"weekly": len(weekly_digests),
|
||
"daily": len(daily_digests),
|
||
},
|
||
"stable_topics": top_items(list_counter["long_term_topics"] or list_counter["stable_topics"] or list_counter["topics"]),
|
||
"identity_traits": top_items(list_counter["identity_traits"]),
|
||
"skill_profile": top_items(list_counter["skill_profile"]),
|
||
"problem_solving_profile": top_items(list_counter["problem_solving_profile"]),
|
||
"habit_patterns": top_items(list_counter["habit_patterns"]),
|
||
"expression_profile": top_items(list_counter["expression_profile"]),
|
||
"recent_state": top_items(list_counter["recent_state"]),
|
||
"engagement_traits": top_items(list_counter["engagement_traits"]),
|
||
"reply_preferences": top_items(list_counter["reply_preferences"] or list_counter["long_term_reply_preferences"]),
|
||
"group_role": top_items(phrase_counter["group_role"], limit=2),
|
||
"temperament_tendency": top_items(phrase_counter["temperament_tendency"], limit=2),
|
||
"decision_profile": top_items(phrase_counter["decision_profile"], limit=2),
|
||
"message_pattern": top_items(phrase_counter["message_pattern"], limit=2),
|
||
"interaction_style": top_items(phrase_counter["interaction_style"], limit=2),
|
||
"phase_state": top_items(phrase_counter["phase_state"], limit=3),
|
||
"timeline": timeline[:8],
|
||
}
|
||
return history_summary
|
||
|
||
def _build_historical_group_profile(self, group_id: str) -> Dict[str, Any]:
|
||
"""构建近两个月群历史总结摘要。"""
|
||
rows = self._load_recent_group_summaries(group_id)
|
||
topic_counter: Dict[str, int] = {}
|
||
timeline: List[str] = []
|
||
|
||
for row in rows:
|
||
period_key = str(row.get("period_key", "") or "")
|
||
summary_text = str(row.get("summary_text", "") or "").strip()
|
||
if summary_text:
|
||
timeline.append(f"{period_key}: {summary_text[:90]}")
|
||
meta = row.get("meta", {}) or {}
|
||
for key in ("topics", "focus_topics", "top_topics", "keywords"):
|
||
values = meta.get(key, []) or []
|
||
if not isinstance(values, list):
|
||
values = [values]
|
||
for value in values:
|
||
normalized = str(value or "").strip()
|
||
if not normalized:
|
||
continue
|
||
topic_counter[normalized] = topic_counter.get(normalized, 0) + 1
|
||
|
||
ordered_topics = sorted(topic_counter.items(), key=lambda item: (-item[1], len(item[0])))
|
||
return {
|
||
"history_days": self.history_profile_days,
|
||
"summary_count": len(rows),
|
||
"focus_topics": [name for name, _ in ordered_topics[:8]],
|
||
"timeline": timeline[:8],
|
||
}
|
||
|
||
def _load_recent_group_summaries(self, group_id: str) -> List[Dict[str, Any]]:
|
||
"""读取近两个月群总结记录。"""
|
||
since_time = datetime.now() - timedelta(days=self.history_profile_days)
|
||
sql = """
|
||
SELECT *
|
||
FROM t_message_summary
|
||
WHERE chatroom_id = %s
|
||
AND summary_type IN ('daily', 'manual')
|
||
AND period_end >= %s
|
||
ORDER BY period_end DESC, update_time DESC
|
||
LIMIT %s
|
||
"""
|
||
rows = self.message_summary_db.execute_query(
|
||
sql,
|
||
(group_id, since_time.strftime("%Y-%m-%d %H:%M:%S"), self.history_group_summary_limit),
|
||
) or []
|
||
return [self.message_summary_db._deserialize_row(dict(row)) or {} for row in rows]
|
||
|
||
|
||
class MemberRoastPlugin(MessagePluginInterface):
|
||
"""成员锐评插件。
|
||
|
||
用户场景:
|
||
1. 锐评一下 @某人
|
||
2. 锐评一下我
|
||
|
||
玩法目标:
|
||
1. 利用现有成员画像和群画像,让模型“骂得像认识这个人”;
|
||
2. 保持群聊传播性,输出足够犀利,但不要跨到恶意辱骂;
|
||
3. 尽量做到一句命令就能引爆围观,而不需要复杂多轮交互。
|
||
"""
|
||
|
||
FEATURE_KEY = "MEMBER_ROAST"
|
||
FEATURE_DESCRIPTION = "🗡️ 成员锐评 [锐评一下 @某人]"
|
||
RECENT_MESSAGE_STOPWORDS = {
|
||
"这个", "那个", "就是", "然后", "但是", "还是", "我们", "你们", "他们", "自己", "一下",
|
||
"已经", "没有", "一个", "可以", "什么", "怎么", "今天", "昨天", "现在", "时候", "知道",
|
||
"觉得", "真的", "感觉", "不是", "还有", "因为", "所以", "这里", "那里", "一下子", "的话",
|
||
"and", "the", "for", "with", "that", "this", "from", "have", "just", "like",
|
||
}
|
||
# 最近消息提炼后的各项上限统一收在这里:
|
||
# 1. 方便后续继续调 token 成本时只改一处;
|
||
# 2. 避免不同方法里散落硬编码,导致线上效果不一致;
|
||
# 3. 上限偏保守,优先保“稳定特征”而不是堆材料。
|
||
RECENT_REPEAT_LIMIT = 6
|
||
RECENT_KEYWORD_LIMIT = 12
|
||
RECENT_SAMPLE_LIMIT = 8
|
||
PROMPT_TIMELINE_LIMIT = 6
|
||
PROMPT_TEXT_LIMIT = 120
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "成员锐评"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "结合成员画像与最近发言,对指定群成员进行幽默锐评。"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "ABOT Team"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return ""
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.feature = self.register_feature()
|
||
self.service: Optional[MemberRoastService] = None
|
||
self.llm_client: Optional[UnifiedLLMClient] = None
|
||
self.bot: Optional[WechatAPIClient] = None
|
||
self.revoke: Optional[MessageAutoRevoke] = None
|
||
self.contacts_db: Optional[ContactsDBOperator] = None
|
||
self.enable = True
|
||
self.command_format = ""
|
||
self.max_output_chars = 320
|
||
self.min_output_chars = 140
|
||
self.sharpness_level = "high"
|
||
# 这里给一个默认值兜底:
|
||
# 1. `_build_user_prompt` 会直接使用该窗口天数;
|
||
# 2. 如果插件刚构造、但还没完整初始化就被调用,至少不会因为属性不存在直接报错;
|
||
# 3. 真正运行时仍会在 `initialize` 里按配置覆盖。
|
||
self.history_profile_days = 60
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件。"""
|
||
self.LOG = logger
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
cfg = self._config.get("MemberRoast", {}) or {}
|
||
self.enable = bool(cfg.get("enable", True))
|
||
self._commands = cfg.get("command", ["锐评一下", "锐评", "吐槽一下", "锐评我", "吐槽我"])
|
||
self.command_format = str(
|
||
cfg.get(
|
||
"command_format",
|
||
"锐评插件指令:\n锐评一下 @某人\n锐评一下我",
|
||
)
|
||
)
|
||
|
||
style_cfg = self._config.get("style", {}) or {}
|
||
self.max_output_chars = max(int(style_cfg.get("max_output_chars", 320) or 320), 80)
|
||
self.min_output_chars = max(int(style_cfg.get("min_output_chars", 140) or 140), 40)
|
||
self.sharpness_level = str(style_cfg.get("sharpness_level", "high") or "high").strip().lower()
|
||
|
||
profile_cfg = self._config.get("profile", {}) or {}
|
||
# 历史窗口需要同步到插件实例本身:
|
||
# 1. prompt 组装阶段会直接引用它;
|
||
# 2. 之前只有 service 上有这个值,运行时存在属性缺失风险;
|
||
# 3. 这里和 service 保持同一配置口径,避免“两边窗口不一致”。
|
||
self.history_profile_days = max(int(profile_cfg.get("history_profile_days", 60) or 60), 1)
|
||
|
||
db_manager = context.get("db_manager")
|
||
if not db_manager:
|
||
self.LOG.error(f"[{self.name}] 缺少 db_manager,初始化失败")
|
||
return False
|
||
|
||
self.contacts_db = ContactsDBOperator(db_manager)
|
||
self.service = MemberRoastService(db_manager, self._config)
|
||
self.llm_client = UnifiedLLMClient(self._config.get("llm", {}) or {"scene": "chat.main"})
|
||
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""判断消息是否可能触发锐评。
|
||
|
||
触发策略这里刻意收紧成“文本命令开头”:
|
||
1. 不再支持“@机器人 锐评一下 ...”,避免和群聊类 AI 插件抢消息;
|
||
2. 也不再做“句中出现锐评”这类宽匹配,避免普通聊天误触;
|
||
3. 只有以“锐评/吐槽”开头的文本指令,才进入后续处理。
|
||
"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
roomid = str(message.get("roomid", "") or "").strip()
|
||
if not roomid:
|
||
return False
|
||
|
||
content = self._normalize_text(message.get("content", ""))
|
||
if not content:
|
||
return False
|
||
|
||
return self._is_text_command_trigger(content)
|
||
|
||
@plugin_stats_decorator(plugin_name="成员锐评")
|
||
@plugin_points_cost(10, "成员锐评消耗积分", FEATURE_KEY)
|
||
@group_feature_rate_limit(max_per_minute=12, feature_key=FEATURE_KEY)
|
||
@user_feature_rate_limit(max_per_minute=4, feature_key=FEATURE_KEY)
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理锐评请求。"""
|
||
content = self._normalize_text(message.get("content", ""))
|
||
sender = str(message.get("sender", "") or "").strip()
|
||
roomid = str(message.get("roomid", "") or "").strip()
|
||
self.bot = message.get("bot")
|
||
self.revoke = message.get("revoke")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
|
||
if not roomid:
|
||
return False, "仅支持群聊"
|
||
if not self.bot:
|
||
return False, "bot 未初始化"
|
||
if not self.service or not self.llm_client or not self.contacts_db:
|
||
return False, "服务未初始化"
|
||
if gbm and roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
# 这里只接纯文本指令入口:
|
||
# 1. 不再依赖 @机器人;
|
||
# 2. 非命令句首一律放过,尽量不给别的插件和正常聊天制造干扰。
|
||
if not self._is_text_command_trigger(content):
|
||
return False, None
|
||
|
||
target_ok, target_payload = self._resolve_target_member(message, content)
|
||
if not target_ok:
|
||
await self._send_fail_message(roomid, sender, target_payload.get("error", "目标解析失败"))
|
||
return False, target_payload.get("error", "目标解析失败")
|
||
|
||
target_wxid = str(target_payload.get("target_wxid") or "").strip()
|
||
target_name = str(target_payload.get("target_name") or target_wxid).strip()
|
||
if not target_wxid:
|
||
await self._send_fail_message(roomid, sender, "没找到要锐评的人。")
|
||
return False, "未找到目标"
|
||
|
||
wait_msg = f"🧐 正在翻 {target_name} 的群聊黑历史,稍等我组织一下语言…"
|
||
client_msg_id, create_time, new_msg_id = await self.bot.send_text_message(roomid, wait_msg, sender)
|
||
if self.revoke:
|
||
self.revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 5)
|
||
|
||
ok, roast_payload = self.service.build_roast_payload(roomid, target_wxid)
|
||
if not ok:
|
||
await self._send_fail_message(roomid, sender, roast_payload.get("error", "素材不足,今天先放他一马。"))
|
||
return False, roast_payload.get("error", "素材不足")
|
||
|
||
requester_name = ContactManager.get_instance().get_group_name(roomid, sender) or sender
|
||
roast_text = self._generate_roast_text(roast_payload, requester_name=requester_name)
|
||
if not roast_text:
|
||
await self._send_fail_message(roomid, sender, "模型今天嘴有点钝,稍后再来试试。")
|
||
return False, "模型输出为空"
|
||
|
||
final_text = self._post_process_roast_text(roast_text, target_name)
|
||
client_msg_id, create_time, new_msg_id = await self.bot.send_at_message(roomid, final_text, [sender])
|
||
if self.revoke:
|
||
self.revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 120)
|
||
return True, "锐评成功"
|
||
|
||
def _resolve_target_member(self, message: Dict[str, Any], content: str) -> Tuple[bool, Dict[str, Any]]:
|
||
"""解析本次要被锐评的目标成员。"""
|
||
sender = str(message.get("sender", "") or "").strip()
|
||
roomid = str(message.get("roomid", "") or "").strip()
|
||
wx_msg: WxMessage = message.get("full_wx_msg")
|
||
raw_xml = wx_msg.msg_source if wx_msg else ""
|
||
at_users = self._parse_at_users(raw_xml)
|
||
bot_wxid = str(getattr(self.bot, "wxid", "") or "").strip()
|
||
|
||
# @列表里如果带机器人自己,需要先排除掉,再决定是否还有真正的目标成员。
|
||
target_at_users = [uid for uid in at_users if uid and uid != bot_wxid]
|
||
if len(target_at_users) > 1:
|
||
return False, {"error": "一次只能锐评一个人,别让我开群体AOE。"}
|
||
if len(target_at_users) == 1:
|
||
target_wxid = target_at_users[0]
|
||
target_name = self._get_member_display_name(roomid, target_wxid)
|
||
return True, {"target_wxid": target_wxid, "target_name": target_name}
|
||
|
||
# 没有真正的 @目标时,优先支持“锐评一下我”。
|
||
normalized = content.replace(" ", "").replace("\u2005", "")
|
||
if any(keyword in normalized for keyword in ("锐评一下我", "锐评我", "吐槽我", "吐槽一下我")):
|
||
return True, {"target_wxid": sender, "target_name": self._get_member_display_name(roomid, sender)}
|
||
|
||
# 这里按你的要求明确收口:
|
||
# 1. 不再支持“锐评一下 张三”这种纯名字匹配;
|
||
# 2. 必须 @ 目标成员,或者直接说“锐评一下我”;
|
||
# 3. 这样可以避免重名误判,也能让指令边界更清晰。
|
||
return False, {"error": "请使用“锐评一下 @某人”或“锐评一下我”,不支持直接写名字。"}
|
||
|
||
@staticmethod
|
||
def _is_text_command_trigger(content: str) -> bool:
|
||
"""判断是否是插件允许的文本命令开头。"""
|
||
normalized = str(content or "").strip()
|
||
if not normalized:
|
||
return False
|
||
return bool(re.match(r"^(锐评(?:一下)?|吐槽(?:一下)?)(?:我|\s|$)", normalized))
|
||
|
||
def _generate_roast_text(self, payload: Dict[str, Any], requester_name: str = "") -> str:
|
||
"""调用大模型生成锐评文案。"""
|
||
system_prompt = self._build_system_prompt()
|
||
user_prompt = self._build_user_prompt(payload, requester_name=requester_name)
|
||
text = self.llm_client.chat(
|
||
system_prompt=system_prompt,
|
||
user_prompt=user_prompt,
|
||
user_id=f"member_roast::{payload.get('group_id', '')}::{payload.get('target_wxid', '')}",
|
||
)
|
||
return str(text or "").strip()
|
||
|
||
def _build_system_prompt(self) -> str:
|
||
"""构建系统提示词。
|
||
|
||
这份提示词重点控制三件事:
|
||
1. 输出要像群友毒舌,而不是客服分析;
|
||
2. 必须建立在已有素材上,不能瞎编私货;
|
||
3. 锐评要有梗,但不能跨到恶意羞辱。
|
||
"""
|
||
sharpness_hint = "允许明显犀利、能扎心,但必须像熟人调侃,不能恶毒。"
|
||
if self.sharpness_level == "medium":
|
||
sharpness_hint = "允许轻到中度犀利,以调侃为主,不要真伤人。"
|
||
elif self.sharpness_level == "low":
|
||
sharpness_hint = "以幽默吐槽为主,别太重。"
|
||
|
||
return f"""
|
||
你是微信群里的“人设观察员”,擅长用幽默、犀利、带梗的口吻总结一个人的群聊人设。
|
||
|
||
你的任务不是分析报告,而是写一段会让群友围观、会心一笑、觉得“这人还真就是这样”的锐评。
|
||
|
||
核心要求:
|
||
1. 只能基于给定的成员画像、群画像和最近发言素材输出,不要编造未出现过的经历、人设、职业、家庭、隐私。
|
||
2. {sharpness_hint}
|
||
3. 可以用捧杀、阴阳、轻微毒舌、反转、梗化总结,但不能出现脏话、恶意辱骂、羞辱外貌、攻击疾病、地域、性别、民族、宗教等受保护属性。
|
||
4. 不要写成正经心理测评,不要出现“根据数据分析”“从画像看”这种机器味句式。
|
||
5. 优先抓:群聊人设、常见话术、反复出现的行为模式、擅长点、翻车点、群内角色。
|
||
6. 如果素材不够扎实,就收一点,写成“轻评版”,不要硬编大招。
|
||
|
||
输出格式要求:
|
||
1. 第一行固定为:`【锐评 目标昵称】`
|
||
2. 正文写 4 到 6 行,每行一句,适合直接发群里。
|
||
3. 最后一行必须是“一句话绝杀”式收尾。
|
||
4. 总长度控制在 {self.min_output_chars} 到 {self.max_output_chars} 字之间,宁可短狠,不要注水。
|
||
""".strip()
|
||
|
||
def _build_user_prompt(self, payload: Dict[str, Any], requester_name: str = "") -> str:
|
||
"""拼装用户提示词。"""
|
||
member_context = payload.get("member_context", {}) or {}
|
||
group_memory_profile = payload.get("group_memory_profile", {}) or {}
|
||
historical_member_profile = payload.get("historical_member_profile", {}) or {}
|
||
historical_group_profile = payload.get("historical_group_profile", {}) or {}
|
||
recent_messages = payload.get("recent_messages", []) or []
|
||
meta = member_context.get("meta", {}) or {}
|
||
group_style = group_memory_profile.get("style_profile", {}) or {}
|
||
|
||
# 最近 200 条消息不再原样整包塞给模型:
|
||
# 1. 原样传会让 token 体积迅速膨胀;
|
||
# 2. 模型也容易被偶发句子带偏,出现“抓住一条就开始瞎判”的幻觉;
|
||
# 3. 这里先做结构化提炼,再保留少量代表句,既省 token,也更稳。
|
||
recent_message_profile = self._build_recent_message_profile(recent_messages)
|
||
|
||
prompt_payload = {
|
||
"任务说明": "请基于以下真实素材,为目标成员写一段有传播性的群聊锐评。",
|
||
"目标成员": {
|
||
"昵称": payload.get("display_name", ""),
|
||
"最近样本消息数": payload.get("message_count", 0),
|
||
"活跃天数": payload.get("active_days", 0),
|
||
"最后活跃时间": payload.get("last_active_at", ""),
|
||
},
|
||
"成员长期画像": {
|
||
"摘要": member_context.get("summary_text", ""),
|
||
"活跃等级": member_context.get("activity_level", ""),
|
||
"发言模式": member_context.get("message_pattern", ""),
|
||
"互动风格": member_context.get("interaction_style", ""),
|
||
"回复建议": member_context.get("response_style_hint", ""),
|
||
"长期主题": member_context.get("topics_of_interest", []),
|
||
"近期关注": member_context.get("recent_focus", []),
|
||
"稳定特征": meta.get("stable_traits", []),
|
||
"习惯模式": meta.get("habit_patterns", []),
|
||
"表达标记": meta.get("expression_profile", []),
|
||
"技能侧重点": meta.get("skill_profile", []),
|
||
"群内角色": meta.get("group_role", ""),
|
||
"气质倾向": meta.get("temperament_tendency", ""),
|
||
"近期状态": meta.get("recent_state", []),
|
||
},
|
||
"成员近两个月历史画像": {
|
||
"窗口天数": historical_member_profile.get("history_days", self.history_profile_days),
|
||
"摘要来源数": historical_member_profile.get("source_counts", {}),
|
||
"稳定主题": historical_member_profile.get("stable_topics", []),
|
||
"身份特征": historical_member_profile.get("identity_traits", []),
|
||
"技能画像": historical_member_profile.get("skill_profile", []),
|
||
"处理问题方式": historical_member_profile.get("problem_solving_profile", []),
|
||
"习惯模式": historical_member_profile.get("habit_patterns", []),
|
||
"表达标签": historical_member_profile.get("expression_profile", []),
|
||
"互动特征": historical_member_profile.get("engagement_traits", []),
|
||
"长期回复偏好": historical_member_profile.get("reply_preferences", []),
|
||
"长期群内角色": historical_member_profile.get("group_role", []),
|
||
"长期气质倾向": historical_member_profile.get("temperament_tendency", []),
|
||
"长期发言模式": historical_member_profile.get("message_pattern", []),
|
||
"长期互动风格": historical_member_profile.get("interaction_style", []),
|
||
"阶段变化轨迹": historical_member_profile.get("phase_state", []),
|
||
"历史时间线": (historical_member_profile.get("timeline", []) or [])[: self.PROMPT_TIMELINE_LIMIT],
|
||
},
|
||
"群聊背景": {
|
||
"群名": payload.get("group_name", ""),
|
||
"长期主题": group_memory_profile.get("focus_topics", []),
|
||
"群风格": group_style,
|
||
"群摘要": group_memory_profile.get("summary_text", ""),
|
||
},
|
||
"群近两个月历史背景": {
|
||
"窗口天数": historical_group_profile.get("history_days", self.history_profile_days),
|
||
"总结条数": historical_group_profile.get("summary_count", 0),
|
||
"历史关注主题": historical_group_profile.get("focus_topics", []),
|
||
"历史时间线": (historical_group_profile.get("timeline", []) or [])[: self.PROMPT_TIMELINE_LIMIT],
|
||
},
|
||
"最近200条发言提炼": recent_message_profile,
|
||
"额外要求": [
|
||
"要像熟人看破不说破,不要像机器写分析。",
|
||
"可以先抬后杀,也可以连续抓几个典型症状。",
|
||
"如果这人明显偏技术、答疑、摸鱼、抽象、嘴硬、复读机、群气氛组,请点出来。",
|
||
"必须同时参考“当前成员画像”和“近两个月历史画像”,如果两者有反差,要把这种反差写出来。",
|
||
"优先依据“高频模式、重复措辞、代表句”下结论,不要因为单条偶发发言脑补大设定。",
|
||
"如果最近发言提炼和长期画像冲突,允许描述为“最近状态跑偏了”,不要硬判成永久特征。",
|
||
f"发起请求的人是:{requester_name}" if requester_name else "",
|
||
],
|
||
}
|
||
# 最终再做一次 prompt 压缩:
|
||
# 1. 去掉空字段,避免模型看到大量“空壳键名”;
|
||
# 2. 限制超长文本和列表,防止历史摘要把上下文挤爆;
|
||
# 3. 使用紧凑 JSON,而不是缩进版,直接减少 token。
|
||
compact_payload = self._compact_prompt_payload(prompt_payload)
|
||
return json.dumps(compact_payload, ensure_ascii=False, separators=(",", ":"))
|
||
|
||
def _compact_prompt_payload(self, value: Any) -> Any:
|
||
"""压缩 prompt 载荷,减少无效 token 并降低幻觉诱因。"""
|
||
if isinstance(value, dict):
|
||
compact_dict: Dict[str, Any] = {}
|
||
for key, item in value.items():
|
||
compact_item = self._compact_prompt_payload(item)
|
||
if compact_item in ("", [], {}, None):
|
||
continue
|
||
compact_dict[key] = compact_item
|
||
return compact_dict
|
||
|
||
if isinstance(value, list):
|
||
compact_list: List[Any] = []
|
||
for item in value:
|
||
compact_item = self._compact_prompt_payload(item)
|
||
if compact_item in ("", [], {}, None):
|
||
continue
|
||
compact_list.append(compact_item)
|
||
return compact_list
|
||
|
||
if isinstance(value, str):
|
||
text = re.sub(r"\s+", " ", value).strip()
|
||
if len(text) <= self.PROMPT_TEXT_LIMIT:
|
||
return text
|
||
# 长文本只保留前半段关键信息:
|
||
# 1. 这里主要针对摘要、时间线这类字段;
|
||
# 2. 它们的作用是“提供背景”,不是让模型逐字精读;
|
||
# 3. 截断后仍然保留前部主结论,性价比更高。
|
||
return text[: self.PROMPT_TEXT_LIMIT].rstrip(",,;;、 ") + "…"
|
||
|
||
return value
|
||
|
||
def _build_recent_message_profile(self, recent_messages: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""把最近消息压缩成更适合给模型的结构化画像。
|
||
|
||
压缩目标:
|
||
1. 尽量保留“重复出现的稳定模式”,而不是平均分配注意力给 200 条原文;
|
||
2. 用高频短语、关键词、问句/感叹句比例、代表句来降低模型幻觉概率;
|
||
3. 让模型先看结论,再看少量样本佐证,减少 token 消耗。
|
||
"""
|
||
normalized_texts: List[str] = []
|
||
repeated_sentence_counter: Counter[str] = Counter()
|
||
keyword_counter: Counter[str] = Counter()
|
||
punct_counter: Counter[str] = Counter()
|
||
representative_samples: List[str] = []
|
||
|
||
for item in recent_messages:
|
||
text = str(item.get("content", "") or "").replace("\n", " ").strip()
|
||
if not text:
|
||
continue
|
||
text = re.sub(r"\s+", " ", text)
|
||
normalized_texts.append(text)
|
||
|
||
# 统计“几乎原样重复”的短句,这类内容对群聊人设识别价值很高,
|
||
# 比如复读某个梗、固定口头禅、常见抱怨模板。
|
||
repeat_key = re.sub(r"\s+", "", text)
|
||
if 2 <= len(repeat_key) <= 24:
|
||
repeated_sentence_counter[repeat_key] += 1
|
||
|
||
for token in self._extract_recent_message_tokens(text):
|
||
keyword_counter[token] += 1
|
||
|
||
punct_counter["question"] += text.count("?") + text.count("?")
|
||
punct_counter["exclaim"] += text.count("!") + text.count("!")
|
||
punct_counter["ellipsis"] += text.count("…") + text.count("...")
|
||
|
||
representative_samples = self._pick_representative_samples(normalized_texts)
|
||
total = max(len(normalized_texts), 1)
|
||
|
||
return {
|
||
"样本条数": len(normalized_texts),
|
||
"高频短句": [
|
||
item[: self.PROMPT_TEXT_LIMIT]
|
||
for item, count in repeated_sentence_counter.most_common(self.RECENT_REPEAT_LIMIT)
|
||
if count >= 2
|
||
],
|
||
"高频关键词": [
|
||
item
|
||
for item, count in keyword_counter.most_common(self.RECENT_KEYWORD_LIMIT)
|
||
if count >= 2
|
||
],
|
||
"近期语气指标": {
|
||
"问句占比": round(punct_counter["question"] / total, 3),
|
||
"感叹句占比": round(punct_counter["exclaim"] / total, 3),
|
||
"省略号占比": round(punct_counter["ellipsis"] / total, 3),
|
||
},
|
||
# 代表句只保留少量,有利于模型“看证据”,又不至于把 token 烧在长聊天流水上。
|
||
"代表句样本": representative_samples[: self.RECENT_SAMPLE_LIMIT],
|
||
}
|
||
|
||
def _extract_recent_message_tokens(self, text: str) -> List[str]:
|
||
"""从单条消息中提取较稳定的关键词。
|
||
|
||
规则尽量保守:
|
||
1. 中文按 2~6 字连续片段抓取,避免单字噪声;
|
||
2. 英文/数字词保留长度 >= 3 的 token;
|
||
3. 过滤掉常见虚词,减少模型被“这个、那个、然后”之类词误导。
|
||
"""
|
||
tokens: List[str] = []
|
||
ascii_tokens = re.findall(r"[A-Za-z0-9_./-]{3,32}", text)
|
||
chinese_tokens = re.findall(r"[\u4e00-\u9fa5]{2,6}", text)
|
||
|
||
for token in ascii_tokens + chinese_tokens:
|
||
normalized = str(token or "").strip().lower()
|
||
if not normalized:
|
||
continue
|
||
if normalized in self.RECENT_MESSAGE_STOPWORDS:
|
||
continue
|
||
if normalized.isdigit():
|
||
continue
|
||
tokens.append(normalized)
|
||
return tokens
|
||
|
||
@staticmethod
|
||
def _pick_representative_samples(texts: List[str]) -> List[str]:
|
||
"""挑选少量最能体现人设的代表句。
|
||
|
||
选取策略不追求复杂模型,只做确定性压缩:
|
||
1. 先保留问句、感叹句、较长句、包含“技术/问题/吐槽”味道的句子;
|
||
2. 再做去重,避免 12 条样本里 8 条都是同一种废话。
|
||
"""
|
||
scored: List[Tuple[int, str]] = []
|
||
for text in texts:
|
||
score = 0
|
||
if "?" in text or "?" in text:
|
||
score += 3
|
||
if "!" in text or "!" in text:
|
||
score += 2
|
||
if len(text) >= 18:
|
||
score += 2
|
||
if any(keyword in text.lower() for keyword in ["报错", "问题", "哈哈", "笑死", "离谱", "摸鱼", "接口", "配置", "版本", "怎么"]):
|
||
score += 2
|
||
scored.append((score, text))
|
||
|
||
scored.sort(key=lambda item: (-item[0], -len(item[1])))
|
||
result: List[str] = []
|
||
seen = set()
|
||
for _, text in scored:
|
||
normalized = re.sub(r"\s+", "", text)
|
||
if normalized in seen:
|
||
continue
|
||
seen.add(normalized)
|
||
result.append(text[: MemberRoastPlugin.PROMPT_TEXT_LIMIT])
|
||
if len(result) >= MemberRoastPlugin.RECENT_SAMPLE_LIMIT:
|
||
break
|
||
return result
|
||
|
||
def _post_process_roast_text(self, roast_text: str, target_name: str) -> str:
|
||
"""清洗模型输出,统一成更适合直接发群的格式。"""
|
||
text = str(roast_text or "").strip()
|
||
text = re.sub(r"^```(?:json|markdown|text)?", "", text, flags=re.IGNORECASE).strip()
|
||
text = re.sub(r"```$", "", text).strip()
|
||
text = re.sub(r"\n{3,}", "\n\n", text)
|
||
|
||
# 如果模型忘了带头标题,这里兜底补上,保证最终输出辨识度稳定。
|
||
if not text.startswith("【锐评"):
|
||
text = f"【锐评 {target_name}】\n{text}"
|
||
|
||
# 模型有时会过度铺陈,这里做一次保底裁切,优先保持群聊传播性。
|
||
if len(text) > self.max_output_chars + 80:
|
||
text = text[: self.max_output_chars + 80].rstrip(",,、;; ").rstrip() + "。"
|
||
return text
|
||
|
||
async def _send_fail_message(self, roomid: str, sender: str, fail_text: str) -> None:
|
||
"""统一发送失败提示。"""
|
||
client_msg_id, create_time, new_msg_id = await self.bot.send_at_message(roomid, f"❌ {fail_text}", [sender])
|
||
if self.revoke:
|
||
self.revoke.add_message_to_revoke(roomid, client_msg_id, create_time, new_msg_id, 8)
|
||
|
||
def _get_member_display_name(self, roomid: str, wxid: str) -> str:
|
||
"""获取群内成员展示名。"""
|
||
if self.contacts_db:
|
||
member = self.contacts_db.get_chatroom_member_info(roomid, wxid) or {}
|
||
display_name = str(member.get("display_name") or member.get("nick_name") or "").strip()
|
||
if display_name:
|
||
return display_name
|
||
return ContactManager.get_instance().get_group_name(roomid, wxid) or wxid
|
||
|
||
@staticmethod
|
||
def _parse_at_users(raw_xml: str) -> List[str]:
|
||
"""解析消息 XML 里的 @用户列表。"""
|
||
raw_xml = str(raw_xml or "").strip()
|
||
if not raw_xml:
|
||
return []
|
||
|
||
at_user_list_text = ""
|
||
try:
|
||
root = ET.fromstring(raw_xml)
|
||
node = root.find(".//atuserlist")
|
||
if node is not None and node.text:
|
||
at_user_list_text = str(node.text).strip()
|
||
except Exception:
|
||
match = re.search(r"<atuserlist><!\[CDATA\[(.*?)\]\]></atuserlist>", raw_xml, flags=re.IGNORECASE | re.DOTALL)
|
||
if match:
|
||
at_user_list_text = str(match.group(1) or "").strip()
|
||
|
||
if not at_user_list_text:
|
||
return []
|
||
|
||
seen = set()
|
||
result: List[str] = []
|
||
for user_id in re.split(r"[,\s;]+", at_user_list_text):
|
||
normalized = str(user_id or "").strip()
|
||
if not normalized or normalized in seen:
|
||
continue
|
||
seen.add(normalized)
|
||
result.append(normalized)
|
||
return result
|
||
|
||
@staticmethod
|
||
def _normalize_text(value: Any) -> str:
|
||
"""清洗文本中的多余空白。"""
|
||
text = str(value or "").replace("\u2005", " ").replace("\xa0", " ")
|
||
text = re.sub(r"\s+", " ", text)
|
||
return text.strip()
|