From d6168460983aeae0f6807aa11096156f11e801f9 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 7 Apr 2026 11:15:29 +0800 Subject: [PATCH] refactor ai_auto_response into xiaoniu group bot --- plugins/ai_auto_response/README.md | 897 +++++++++++++++++++ plugins/ai_auto_response/bot_ai.py | 193 ---- plugins/ai_auto_response/config.toml | 148 +-- plugins/ai_auto_response/context_builder.py | 71 ++ plugins/ai_auto_response/flow_manager.py | 103 +++ plugins/ai_auto_response/llm_client.py | 166 ++++ plugins/ai_auto_response/main.py | 624 +++++++++---- plugins/ai_auto_response/memory_store.py | 89 ++ plugins/ai_auto_response/persona/xiaoniu.txt | 23 + plugins/ai_auto_response/persona_engine.py | 33 + plugins/ai_auto_response/response_planner.py | 23 + plugins/ai_auto_response/triggers.py | 87 ++ plugins/ai_auto_response/vector_memory.py | 138 +++ 13 files changed, 2150 insertions(+), 445 deletions(-) create mode 100644 plugins/ai_auto_response/README.md delete mode 100644 plugins/ai_auto_response/bot_ai.py create mode 100644 plugins/ai_auto_response/context_builder.py create mode 100644 plugins/ai_auto_response/flow_manager.py create mode 100644 plugins/ai_auto_response/llm_client.py create mode 100644 plugins/ai_auto_response/memory_store.py create mode 100644 plugins/ai_auto_response/persona/xiaoniu.txt create mode 100644 plugins/ai_auto_response/persona_engine.py create mode 100644 plugins/ai_auto_response/response_planner.py create mode 100644 plugins/ai_auto_response/triggers.py create mode 100644 plugins/ai_auto_response/vector_memory.py diff --git a/plugins/ai_auto_response/README.md b/plugins/ai_auto_response/README.md new file mode 100644 index 0000000..d7d806f --- /dev/null +++ b/plugins/ai_auto_response/README.md @@ -0,0 +1,897 @@ +# 小牛群聊 BOT 重构说明 + +## 目标 + +把当前 `ai_auto_response` 从“随机插话的自动回复”升级成一个真正适合微信群长期在线的拟人 BOT `小牛`: + +- 在群里像一个真实成员,而不是每次都像客服或问答机 +- 能及时回答明确问题,减少“看见了但不接话”的情况 +- 能根据群氛围决定什么时候主动参与,什么时候少说话 +- 能对不同群、不同成员表现出稳定的人设和长期记忆感 +- 能复用项目里已经存在的消息存档、成员画像、群总结、权限控制、后台管理能力 +- 能处理“某个老成员很久不说话,突然回来发言”的场景,不显得失忆 + +这份文档不是泛泛而谈的产品介绍,而是基于当前仓库现状整理的一份可落地实现方案。 + +--- + +## 当前实现现状 + +当前插件入口: + +- [`plugins/ai_auto_response/main.py`](/d:/learn/abot/plugins/ai_auto_response/main.py) +- [`plugins/ai_auto_response/bot_ai.py`](/d:/learn/abot/plugins/ai_auto_response/bot_ai.py) +- [`plugins/ai_auto_response/config.toml`](/d:/learn/abot/plugins/ai_auto_response/config.toml) + +现有版本已经具备这些基础能力: + +- 能监听群消息 +- 能缓存最近一段群聊文本 +- 能基于关键词、时间窗口、参与度、体力值决定是否插话 +- 能把最近几条消息拼成上下文后调用大模型 API 生成回复 + +但它目前仍然偏简单,核心问题主要有: + +1. 回复触发逻辑偏“概率型” + 现在更像“有没有兴致插一句”,而不是“是否有人在明确问它问题”。 + +2. 缺少答疑优先级 + 群里一旦出现明确求助、@机器人、连续追问,应该优先快速答复,而不是继续走随机参与逻辑。 + +3. 缺少长期人格 + 目前只有 prompt 里的简短口语化约束,还没有稳定的人设、口头习惯、边界感、偏好表达方式。 + +4. 缺少成员级长期记忆 + 仓库里已经有成员画像能力,但 `ai_auto_response` 还没有接进去,所以机器人对“这个人平时怎么说话、关注什么、适合怎么回”没有利用起来。 + +5. 缺少群级场景区分 + 不同群应该有不同模式,例如技术群偏答疑、闲聊群偏陪聊、交易群偏信息明确、熟人群偏轻松互动。 + +6. 缺少回复分层 + 并不是所有回复都应该走同一条 prompt。问答、接梗、安慰、提醒、总结、纠错,生成策略应该分开。 + +--- + +## 可以直接复用的现有技术能力 + +这个项目其实已经有很多“拟人 BOT”需要的基础设施,不需要从零重造: + +### 1. 消息接入与发送 + +- 主机器人入口:[`robot.py`](/d:/learn/abot/robot.py) +- 微信客户端:[`wechat_ipad/`](/d:/learn/abot/wechat_ipad) + +可直接复用消息接收、发群消息、发图片、联系人同步、群成员信息读取能力。 + +### 2. 权限和群功能开关 + +- 群功能控制:`GroupBotManager` + +这意味着“哪些群启用拟人 BOT”“哪些群仅答疑不闲聊”“哪些群完全关闭”都已经有基础能力。 + +### 3. 消息存档与历史上下文 + +- 消息存储:[`utils/wechat/message_to_db.py`](/d:/learn/abot/utils/wechat/message_to_db.py) +- 数据表操作:[`db/message_storage.py`](/d:/learn/abot/db/message_storage.py) + +这部分非常关键,可以让 BOT 不只看最近 10 句话,而是按需回看更长的上下文。 + +### 4. 成员画像与长期交互记忆 + +- 插件:[`plugins/member_context/main.py`](/d:/learn/abot/plugins/member_context/main.py) +- 服务:[`plugins/member_context/service.py`](/d:/learn/abot/plugins/member_context/service.py) +- 提示词构建:[`plugins/member_context/prompt_builder.py`](/d:/learn/abot/plugins/member_context/prompt_builder.py) + +这里已经有: + +- 成员日 / 周 / 月摘要 +- 兴趣主题 +- 互动风格 +- 回复偏好 +- 群内角色 +- 技能画像 + +这正是“拟人回复”最需要的长期上下文。 + +### 5. 群总结与压缩上下文 + +- 群总结:[`plugins/message_summary/main.py`](/d:/learn/abot/plugins/message_summary/main.py) +- 对话压缩:[`utils/compress_chat_data.py`](/d:/learn/abot/utils/compress_chat_data.py) + +长群聊可以先压缩再喂给模型,减少 token 压力。 + +### 6. 管理后台 + +- 后台目录:[`admin/dashboard/`](/d:/learn/abot/admin/dashboard) + +后续可以把“人设配置、群模式、回复频率、黑名单、禁聊时段、答疑策略”做成后台管理项。 + +--- + +## 新版本插件定位 + +建议把这个插件的定位改成: + +`小牛:一个有稳定人格、会看场合、能优先答问题、在群里长期在线的虚拟群成员` + +它不是纯陪聊,也不是纯问答助手,而是两种模式同时存在: + +### 1. 拟人参与模式 + +适合熟人群、日常聊天群、兴趣群: + +- 偶尔接话 +- 顺着上下文说话 +- 有自己的语气和偏好 +- 不抢话,不刷屏 + +### 2. 实时答疑模式 + +适合技术群、项目群、问答群: + +- 发现问题句、求助句、@机器人时优先响应 +- 响应速度快于闲聊逻辑 +- 回答尽量明确、可执行 +- 不懂就直接说不确定,不硬编 + +--- + +## 推荐架构 + +建议把新版本拆成 8 个层次,而不是把所有逻辑都放在 `main.py` 里。 + +### 1. Message Intake + +负责接收消息、标准化消息结构、过滤无效消息: + +- 是否群聊 +- 是否自己发的 +- 是否文本 / 图片标题 / 链接卡片 +- 是否命中黑名单 +- 是否命中禁用群 + +### 2. Trigger Router + +负责判断“为什么这次应该回”。 + +建议至少拆成以下触发源: + +- `at_trigger` + `@bot` 或明确点名 BOT + +- `question_trigger` + 明显的问题句,例如“怎么弄”“有人知道吗”“这个报错啥意思” + +- `followup_trigger` + 上一轮已经在和 BOT 对话,用户继续追问 + +- `topic_trigger` + 命中 BOT 擅长或关注的话题 + +- `social_trigger` + 打招呼、起哄、接梗、点名、夸它、吐槽它 + +- `silence_break_trigger` + 群里沉默较久后,用很轻的方式恢复气氛 + +其中优先级应为: + +`@提问 > 明确求助 > 连续追问 > 互动点名 > 普通插话` + +### 3. Flow Manager + +心流系统是小牛的“实时参与状态机”。 + +它解决的不是“记不记得人”,而是“现在要不要继续聊、聊多深、聊多久”。 + +建议按群维度维护 `flow_state` 和 `flow_score`,而不是全局只有一个热度值。 + +推荐状态: + +- `idle` + 低参与,主要观察,除非被点名或明确提问,否则不主动插话 + +- `warming` + 话题开始吸引小牛,可以做轻量接话 + +- `engaged` + 已进入连续互动,优先接住上下文 + +- `deep_engaged` + 正在进行高质量答疑或多人围绕同一主题连续互动 + +- `cooling` + 一轮互动结束后逐步退出,避免刷屏 + +- `silent` + 深夜、敏感话题、连续被忽略、系统限流时进入静默 + +建议影响心流的事件: + +- 提高心流: + `@小牛`、明确提问、连续追问、命中擅长话题、老成员回归、机器人发言后有人接话 + +- 降低心流: + 回复后无人接话、话题转移、连续回复过多、深夜、敏感话题、群里进入无关刷屏 + +建议先用简单的事件加减分模型: + +- `@小牛`:`+40` +- 明确提问:`+30` +- 连续追问:`+20` +- 话题命中:`+15` +- 回归成员:`+10` +- 机器人发言后有人接话:`+15` +- 机器人发言后没人接话:`-20` +- 连续回复过多:`-15` +- 深夜:`-30` + +建议状态阈值: + +- `<20` -> `idle` +- `20~39` -> `warming` +- `40~69` -> `engaged` +- `>=70` -> `deep_engaged` + +同时配合自然衰减,让心流值按分钟回落。 + +### 4. Context Builder + +负责为本次回复准备上下文,建议分四层: + +- 最近 20~50 条群消息 +- 当天压缩摘要 +- 当前发言人的成员画像 +- 当前群的人设配置和行为模式 + +建议输出统一上下文对象: + +```python +{ + "group_profile": {}, + "speaker_profile": {}, + "recent_messages": [], + "recent_summary": "", + "trigger_type": "question_trigger", + "reply_mode": "qa_fast" +} +``` + +### 5. Long-Term Memory Engine + +这是小牛和普通自动回复插件真正拉开差距的地方。 + +建议把记忆拆成四层,而不是只保留最近聊天记录: + +- `session_memory` + 最近一次连续对话的上下文,生命周期 5~15 分钟 + +- `daily_memory` + 今天这个群在聊什么、谁和谁正在互动、当前气氛如何 + +- `member_memory` + 某个成员长期关注的话题、典型说话风格、历史上经常问的问题、适合的回复方式 + +- `group_memory` + 这个群的长期主题、说话节奏、禁忌、常见梗、对小牛的接受度 + +其中 `member_memory` 和 `group_memory` 是解决“老成员突然回归”最关键的部分。 + +当某个成员很久没发言又突然出现时,不应该只看他刚发的这一句,而应该补充这些信息: + +- 这个人上次活跃是什么时候 +- 过去常聊什么 +- 过去在群里的角色更像提问者、答疑者还是气氛组 +- 过去和小牛是否有连续互动 +- 这次回归是轻松冒泡、直接求助、还是延续旧话题 + +建议为这类场景增加专门状态: + +- `returning_member` + 7 天以上未发言后再次出现 + +- `long_absent_member` + 30 天以上未发言后再次出现 + +- `reactivated_topic` + 当前话题与该成员历史关注主题高度相关 + +针对这类状态,小牛的回复要遵循两个原则: + +1. 记得这个人,但不要过度热情到像监控 + 可以自然表现出“你又出现了”“这个话题你之前也挺关注”,但不要直接说出太细的时间和行为记录。 + +2. 优先续接熟悉话题 + 如果该成员回归后直接提问,优先用他的长期主题和历史偏好组织答案,这样会更像“真的认识这个人”。 + +### 6. Persona Engine + +这里是“拟人感”的核心,不应该只靠一句 prompt。 + +建议把人格拆成结构化配置: + +- 名字 +- 年龄感 +- 说话风格 +- 常用语气词 +- 擅长话题 +- 不擅长话题 +- 回避边界 +- 幽默程度 +- 主动程度 +- 回复长度偏好 +- 是否喜欢反问 +- 是否会使用表情 + +建议新增独立人设文件,例如: + +- `persona_name` +- `core_identity` +- `tone_rules` +- `reply_rules` +- `taboo_rules` +- `example_replies` + +目录下现有的 [`plugins/ai_auto_response/瑞依.txt`](/d:/learn/abot/plugins/ai_auto_response/瑞依.txt) 只作为参考语料,不直接作为最终人格文件。 + +新版本应建立 `小牛` 的独立人格设定,建议固定为: + +- 名字:小牛 +- 角色感:群里常驻、靠谱、自然、不端着 +- 回答风格:先解决问题,再决定要不要延伸 +- 社交风格:熟人感轻一点,不装熟,不过分卖萌 +- 记忆风格:对老成员有熟悉感,但不过度暴露“系统知道很多” + +### 7. Response Planner + +不要让模型每次自由发挥,先确定回复策略,再生成内容。 + +推荐回复模式: + +- `qa_fast` + 用于明确问题,答案优先,少废话 + +- `qa_with_context` + 用于结合群聊历史、成员长期记忆或旧话题回答 + +- `social_short` + 用于轻量接话,1 句就够 + +- `comfort_mode` + 用于安慰、缓和、给建议 + +- `humor_mode` + 用于熟人群轻松互动 + +- `refuse_or_skip` + 不适合接话时直接不回,或仅给非常短的反馈 + +心流状态会直接影响回复策略: + +- `idle` + 只处理 `@bot`、明确问题、强触发事件 + +- `warming` + 允许 `social_short` + +- `engaged` + 提高 `qa_with_context` 和连续追问的响应率 + +- `deep_engaged` + 允许更完整的答疑和多轮连续互动 + +- `cooling` + 优先短回复或收口 + +- `silent` + 除非强触发,否则不回复 + +### 8. Safety and Rate Control + +拟人 BOT 最大的风险不是“答不出来”,而是“太像人却太爱说话”。 + +所以必须保留这些机制: + +- 每群独立冷却 +- 连续回复衰减 +- 被人无视后降低主动率 +- 深夜低活跃模式 +- 敏感词 / 风险话题降级 +- 管理员强制关闭 + +当前 `bot_ai.py` 里的“体力值 + 参与度”可以保留,但应降级为“主动聊天限流器”,而不是总入口。 + +--- + +## 向量记忆设计 + +当前环境里已经有可用的向量能力: + +- 向量库:`Qdrant` +- 向量模型服务:`Ollama` +- 适合接入位置:`Long-Term Memory Engine` + +这里的设计原则不是“所有回复都查向量库”,而是: + +`Qdrant 作为长期记忆召回层,member_context 和消息摘要作为稳定记忆层` + +也就是说: + +- `member_context` + 负责回答“这个人是谁,长期是什么风格” + +- `Qdrant` + 负责回答“这个人以前聊过什么类似内容” + +### 什么时候介入最合适 + +最合适的方式是第二阶段开始接入,但只用于特定场景,不作为所有回复的必经链路。 + +优先介入以下场景: + +- `returning_member` + 用户很久没发言后重新出现 + +- `long_absent_member` + 用户长期沉默后突然出现 + +- `qa_with_context` + 当前问题可能和历史问答或长期兴趣相关 + +- `reactivated_topic` + 当前话题和用户过去长期关注主题高度相关 + +普通闲聊、轻量接话、气氛互动不建议默认查向量库。 + +### 为什么不建议一开始全量依赖向量库 + +因为群聊拟人 BOT 最怕的不是“想不起来”,而是“乱想起来”。 + +如果每次都查向量库,容易出现: + +- 回复变慢 +- 召回结果不稳定 +- 机器人突然提旧事,像在翻聊天记录 +- 轻松闲聊也被过度结构化 + +所以更合理的方式是: + +- 平时主要依赖最近上下文和成员画像 +- 需要“找回记忆”时再触发向量召回 + +### 最适合写入 Qdrant 的内容 + +不建议先把全部原始聊天消息无差别写进向量库。 + +更推荐写入“记忆单元”: + +- 成员日摘要 +- 成员周摘要 +- 成员月摘要 +- 群日摘要 +- 重要问答对 +- 用户长期偏好卡片 +- 小牛与某成员的关键互动片段 + +这样做有几个好处: + +- 噪音更少 +- 召回更稳定 +- Token 更省 +- 更适合长期维护 + +### 推荐的 Qdrant Payload + +每条向量建议至少带这些字段: + +- `chatroom_id` +- `wxid` +- `memory_type` +- `topic_tags` +- `created_at` +- `last_active_at` +- `source_id` +- `content_summary` + +建议的 `memory_type` 包括: + +- `member_daily_digest` +- `member_weekly_digest` +- `member_monthly_digest` +- `group_daily_digest` +- `qa_pair` +- `interaction_memory` +- `preference_card` + +### 推荐查询策略 + +建议按下面顺序查,而不是直接全库语义搜: + +1. 先按 `chatroom_id` 过滤 +2. 如果目标明确,再按 `wxid` 过滤 +3. 再按 `memory_type` 过滤 +4. 最后做语义相似度检索 +5. `top_k` 建议先控制在 `3~5` + +这能显著降低错召回。 + +### 与 Ollama 向量模型的配合方式 + +你现有的 Ollama 小向量模型是可以直接用的,只要满足一个原则: + +`写入和查询必须使用同一个 embedding 模型` + +对“小牛”这种群聊记忆系统来说,小型 embedding 模型反而通常更合适,因为需要的是: + +- 响应快 +- 成本低 +- 稳定检索成员历史主题和问答片段 + +而不是做极重的通用语义推理。 + +### 推荐的接入方式 + +建议新增一个独立的记忆召回模块,例如: + +- `memory_store.py` + 负责写入、查询、过滤、召回排序 + +它的职责建议分成四块: + +- `upsert_memory` + 把摘要、问答、关键互动写入 Qdrant + +- `search_member_memory` + 查询某个成员的长期相关记忆 + +- `search_group_memory` + 查询当前群的历史相关记忆 + +- `build_memory_prompt` + 把召回结果压缩成可以送给模型的 prompt 片段 + +向量召回和心流系统的配合建议是: + +- 长期记忆负责“这个人以前是谁、聊过什么” +- 心流系统负责“这次值不值得进入连续互动” + +两者一起工作时,小牛才会既像“记得人”,又像“会看场合”。 + +### 小牛里最适合触发向量召回的时机 + +推荐在这些判断通过后才查 Qdrant: + +- 用户超过 `7` 天未发言重新出现 +- 用户超过 `30` 天未发言后提问 +- 当前问题命中“历史上经常问的主题” +- 最近上下文不够,但成员长期画像显示该用户过去反复讨论过此类话题 +- BOT 判断这是“旧话题延续”而不是新话题 + +### 记忆使用边界 + +向量召回的结果只应该作为“小牛知道哪些历史背景”的参考,而不是原样往外说。 + +生成回复时建议遵守: + +- 不直接暴露精确历史记录 +- 不直接说“你上次在几月几号说过” +- 不在轻量闲聊里强行提旧事 +- 只在确实有帮助时,让回复带一点自然熟悉感 + +理想效果是: + +- 用户觉得“小牛记得我” +- 但不会觉得“小牛在翻档案” + +--- + +## 推荐实现方案 + +### 第一阶段:把“随机插话”升级成“有优先级的触发回复” + +先不追求复杂人格,先解决“及时回答问题”: + +1. 新增问题检测 + 识别问号、求助句式、报错句式、`有人知道`、`怎么`、`为啥`、`?`、`??` + +2. 新增 `@bot` 强制响应 + 只要被明确点名,优先进入快速答疑链路 + +3. 新增会话延续窗口 + 机器人回复后 2~5 分钟内,如果同一人继续追问,应提高响应概率甚至直接响应 + +4. 闲聊逻辑与答疑逻辑分离 + 闲聊继续走拟人策略,答疑直接走高优先级策略 + +5. 引入群级心流系统 + 用 `flow_state` 替代旧的随机插话感,让小牛知道什么时候进入、什么时候退出对话 + +这一阶段完成后,体验会立刻提升很多。 + +### 第二阶段:接入长期记忆、成员画像和群模式 + +把仓库现有能力接进来: + +1. 从 `member_context` 读取当前发言人的画像 +2. 为每个成员建立最近会话缓存和长期记忆快照 +3. 给“久未发言再次出现”的成员增加回归识别逻辑 +4. 给不同群配置不同模式 +5. 在 prompt 中加入“这个人平时更喜欢什么风格的回复” +6. 在技术群中提高问题响应率,在闲聊群中降低长篇回答频率 +7. 在 `returning_member` 和 `qa_with_context` 场景接入 Qdrant 召回 +8. 让回归成员和旧话题召回同时提升群级心流,进入更自然的连续互动状态 + +### 第三阶段:做人设稳定化 + +这一阶段重点不是“更聪明”,而是“像同一个人”: + +1. 固化角色设定 +2. 固化用词习惯 +3. 固化情绪边界 +4. 固化“知道什么 / 不知道什么”的表达方式 +5. 给出少量 few-shot 回复样例 + +### 长期记忆专项:解决“很久不说话突然出现”的问题 + +这是新版本必须明确支持的场景。 + +推荐处理流程: + +1. 识别用户是否为回归成员 + 根据消息库和成员画像判断其最近一次活跃时间 + +2. 如果是回归成员,额外加载长期记忆 + 包括历史关注主题、常见问题类型、群内角色、和小牛过去互动风格 + +3. 生成时增加“轻微熟悉感” + 回复表现得像“记得这个人”,但不要像读档案 + +4. 如果该成员这次是来提问 + 则优先进入 `qa_with_context`,让回答带上他历史关注方向 + +5. 如果该成员只是冒泡 + 则只做轻量社交回应,不强行提旧事 + +一个好的感觉是: + +- 用户会觉得“小牛好像一直在群里” +- 但不会觉得“小牛在偷偷监控每个人” + +### 第四阶段:做后台配置化 + +建议把这些项做成可配置: + +- 每个群是否启用 +- 群模式 +- 人设模板 +- 回复频率 +- 工作时间 / 静默时间 +- 是否允许主动插话 +- 是否允许使用表情 +- 是否允许引用长期记忆 +- 回归成员识别阈值 +- 长期记忆回看天数 +- 回归成员的回复热度上限 + +--- + +## 群聊 BOT 的最小落地版本 + +如果你希望先做一个能用的版本,而不是一次性重构太大,推荐最小实现如下: + +### 必做 + +- 保留当前插件入口不变 +- 新增 `trigger_type` 判定 +- 新增 `flow_state` / `flow_score` 判定 +- 新增 `reply_mode` 判定 +- `@bot` / 提问类消息直接优先回复 +- 上下文从最近 10 条提升到最近 20~30 条 +- 人设文件从单段 prompt 改成结构化配置 + +### 优先做 + +- 接 `member_context` +- 给群配置模式 +- 给回复加冷却和连续会话窗口 +- 给回归成员场景接 Qdrant 召回 +- 给不同心流状态配置不同回复强度 + +### 后续再做 + +- 后台管理页 +- 不同人格模板 +- 记忆纠偏 +- 多模型路由 + +--- + +## 建议目录演进 + +建议把 `plugins/ai_auto_response/` 逐步整理成下面这种结构: + +```text +plugins/ai_auto_response/ +├── __init__.py +├── main.py # 插件入口,只做调度 +├── config.toml # 插件配置 +├── README.md # 本文档 +├── persona/ +│ ├── xiaoniu.txt +│ └── tech_helper.txt +├── flow_manager.py # 群级心流状态机 +├── memory_store.py # 长期记忆读取与装配 +├── vector_memory.py # Qdrant / Ollama 召回层 +├── triggers.py # 触发判定 +├── context_builder.py # 上下文构建 +├── persona_engine.py # 人设装配 +├── response_planner.py # 回复策略选择 +├── llm_client.py # OpenAI兼容 API / 其他模型调用 +└── rate_control.py # 冷却、频率、主动度控制 +``` + +这样以后维护会比现在轻松很多。 + +--- + +## 建议配置项 + +建议在 `config.toml` 后续补充这些内容: + +```toml +enable = true + +[mode] +group_default_mode = "social" +question_reply_timeout_sec = 12 +followup_session_window_sec = 300 +recent_context_size = 30 +allow_proactive_reply = true +returning_member_days = 7 +long_absent_member_days = 30 +memory_lookback_days = 180 + +[flow] +enable_flow_state = true +flow_decay_per_minute = 8 +idle_threshold = 20 +warming_threshold = 40 +engaged_threshold = 70 +at_bot_boost = 40 +question_boost = 30 +followup_boost = 20 +topic_boost = 15 +returning_member_boost = 10 +response_accepted_boost = 15 +ignored_reply_penalty = 20 +over_reply_penalty = 15 +night_penalty = 30 + +[persona] +name = "小牛" +style = "自然、口语化、像群友" +emoji_probability = 0.25 +max_reply_sentences = 3 + +[memory] +enable_vector_memory = true +vector_provider = "qdrant" +embedding_provider = "ollama" +qdrant_url = "http://127.0.0.1:6333" +qdrant_collection = "abot_xiaoniu_memory" +ollama_base_url = "http://192.168.2.50:11434" +embedding_model = "your_embedding_model" +vector_top_k = 5 +vector_min_score = 0.65 +vector_trigger_modes = ["returning_member", "long_absent_member", "qa_with_context", "reactivated_topic"] + +[priority] +at_bot = 1.0 +explicit_question = 0.95 +followup = 0.9 +social_call = 0.65 +casual_topic = 0.35 + +[cooldown] +group_reply_cooldown_sec = 45 +same_user_followup_cooldown_sec = 10 +night_silent_hours = ["01:00-07:30"] +``` + +--- + +## Prompt 设计建议 + +新版本 prompt 不建议再只写“简短、口语化”这种通用要求,而要明确四件事: + +1. 你是谁 + 你在这个群里的身份、语气、边界、说话节奏 + +2. 你为什么这次要回复 + 是因为被 @、被提问、正在连续对话、还是轻微接话,以及当前心流状态是否支持继续参与 + +3. 你现在掌握了什么上下文 + 最近群聊、成员画像、长期记忆、群模式、心流状态、历史摘要 + +4. 这次回复的目标 + 是回答问题、接一句、安慰、澄清、提醒,还是保持沉默 + +建议最终 prompt 由以下片段拼装: + +- `system_persona` +- `memory_prompt` +- `group_mode_prompt` +- `flow_prompt` +- `speaker_profile_prompt` +- `trigger_prompt` +- `recent_context_prompt` +- `response_rule_prompt` + +其中: + +- `memory_prompt` + 优先来自 `member_context` 的稳定画像 + +- `vector_memory_prompt` + 只在命中特定场景时从 Qdrant 召回并追加 + +--- + +## 成功标准 + +如果这个插件升级成功,应该能达到下面这些效果: + +### 拟人感 + +- 说话前后风格一致 +- 不会每次都像在写标准答案 +- 会看群气氛,不乱抢话 +- 被调侃时能自然接住 +- 会自然进入和退出对话,不会像开关一样突兀 + +### 答疑能力 + +- 被 @ 时基本能及时回复 +- 明确问题能优先答复 +- 回答比现在更聚焦、更短、更有执行性 +- 不确定时会明确说明 + +### 长期记忆 + +- 对活跃成员和沉默很久后回归的成员都能保持连续感 +- 不会把短期状态误认为长期人格 +- 能识别老成员的长期关注主题 +- 回归成员发言时,小牛的回复会有自然熟悉感 +- Qdrant 召回只在需要时介入,不会让普通闲聊变得迟钝和奇怪 + +### 工程可维护性 + +- 触发逻辑、上下文逻辑、生成逻辑分层 +- 心流逻辑独立成层,不和长期记忆混在一起 +- 可接入成员画像 +- 可配置不同群模式 +- 可通过后台持续调参 + +--- + +## 推荐开发顺序 + +1. 保留当前插件名和入口,先完成触发路由重构 +2. 把群级心流系统做出来,替换旧的随机插话逻辑 +3. 把“答疑优先”做出来,解决及时回复问题 +4. 把长期记忆层接进来,先解决回归成员场景 +5. 把人设配置从自由文本升级成结构化配置,并固定为小牛 +6. 接入 `member_context` 做成员级回复优化 +7. 接入 Qdrant + Ollama,先只服务回归成员和旧话题召回 +8. 增加群模式配置 +9. 最后再做后台配置和更细的人格控制 + +--- + +## 一句话结论 + +你现在这个 `ai_auto_response` 已经有“群里自动说话”的雏形了,但如果目标是“小牛”这种真正长期在线的群聊拟人 BOT,核心不在于继续调概率,而在于把它升级成: + +`触发有优先级、心流会收放、上下文有层次、长期记忆可用、人格固定为小牛、答疑能优先、群模式可配置` + +这样它才会既像群友,又真的有用。 diff --git a/plugins/ai_auto_response/bot_ai.py b/plugins/ai_auto_response/bot_ai.py deleted file mode 100644 index b3efeda..0000000 --- a/plugins/ai_auto_response/bot_ai.py +++ /dev/null @@ -1,193 +0,0 @@ -import re -import random -from datetime import datetime, time, timedelta -import toml -import os -from loguru import logger - -class RoomState: - """每个群的独立状态""" - def __init__(self): - self.participation_score = 0.0 - self.last_active_time = datetime.now() - self.last_bot_reply_time = None # 上次机器人回复的时间 - -class InterventionBot: - def __init__(self, config_path=None): - # 加载配置 - self.config = {} - if config_path and os.path.exists(config_path): - self.config = toml.load(config_path) - - # 从配置中获取关键词 - keywords = self.config.get("Keywords", {}) - self.emojis = keywords.get("emojis", []) - self.hot_topics = keywords.get("hot_topics", []) - self.fish_keywords = keywords.get("fish_keywords", []) - self.tech_keywords = keywords.get("tech_keywords", []) - self.news_keywords = keywords.get("news_keywords",[]) - - # 拟人化配置 - hl_config = self.config.get("HumanLike", {}) - self.max_energy = hl_config.get("max_energy", 100.0) - self.energy_recovery_rate = hl_config.get("energy_recovery_per_minute", 1.0) - self.energy_cost = hl_config.get("energy_cost_per_reply", 15.0) - - self.participation_inc = hl_config.get("participation_increase_per_msg", 5.0) - self.topic_bonus = hl_config.get("topic_match_bonus", 15.0) - self.participation_threshold = hl_config.get("participation_threshold", 20.0) - self.participation_drop = hl_config.get("participation_drop_factor", 0.8) - self.base_prob = hl_config.get("base_reply_probability", 0.6) - - # 机器人全局状态 - self.current_energy = self.max_energy - self.last_energy_update_time = datetime.now() - - # 群组状态 {room_id: RoomState} - self.room_states = {} - - # 辅助功能:早晨时间窗口 - time_window = self.config.get("TimeWindow", {}) - self.morning_window = ( - time(time_window.get("morning_start_hour", 8), time_window.get("morning_start_minute", 0)), - time(time_window.get("morning_end_hour", 8), time_window.get("morning_end_minute", 30)) - ) - - def _get_room_state(self, room_id): - if room_id not in self.room_states: - self.room_states[room_id] = RoomState() - return self.room_states[room_id] - - def _update_energy(self): - """更新全局体力值""" - now = datetime.now() - minutes_passed = (now - self.last_energy_update_time).total_seconds() / 60.0 - - recovered = minutes_passed * self.energy_recovery_rate - self.current_energy = min(self.max_energy, self.current_energy + recovered) - self.last_energy_update_time = now - - logger.debug(f"[Energy] Recovered {recovered:.2f}, Current: {self.current_energy:.2f}") - - def detect_topic(self, message): - if not isinstance(message, str): - return None - message_lower = message.lower() - if any(keyword in message_lower for keyword in self.fish_keywords): - return "fish" - if any(keyword in message_lower for keyword in self.tech_keywords): - return "tech" - if any(keyword in message_lower for keyword in self.news_keywords): - return "news" - if any(keyword in message_lower for keyword in self.hot_topics): - return "hot_topic" - return None - - def is_morning_window(self, timestamp): - try: - # 简化时间处理,这里假设timestamp通常是当前时间附近 - now = datetime.now() - return self.morning_window[0] <= now.time() <= self.morning_window[1] - except: - return False - - def calculate_participation_boost(self, message, messages, is_at=False): - """计算这条消息带来的参与度提升""" - if is_at: - return 100.0 # 被AT直接拉满 - - boost = self.participation_inc - - # 话题加成 - topic = self.detect_topic(message) - if topic: - boost += self.topic_bonus - - # 关键词加成 (早安/表情等) - if any(e in message for e in self.emojis): - boost += 5 - - if "签到" in message or "早" in message: - if self.is_morning_window(None): - boost += 20 # 早上问好权重高 - - return boost - - def should_intervene(self, room_id, timestamp, message, messages, chat_log, is_at=False): - """ - 核心判定逻辑 - :param room_id: 群ID - :param timestamp: 消息时间 - :param message: 当前消息内容 - :param messages: 最近消息列表(文本) - :param chat_log: 完整聊天记录对象 - :param is_at: 是否被AT - """ - self._update_energy() - state = self._get_room_state(room_id) - - # 1. 增加参与度(Listening) - boost = self.calculate_participation_boost(message, messages, is_at) - - # 连续对话奖励:如果机器人在最近 2 分钟内回复过,说明可能在对话中,参与度增加翻倍 - if state.last_bot_reply_time: - time_since_last_reply = (datetime.now() - state.last_bot_reply_time).total_seconds() - if time_since_last_reply < 120: # 2分钟内 - boost *= 2.0 - logger.debug(f"[{room_id}] 连续对话奖励触发 (上次回复 {int(time_since_last_reply)}s 前)") - - state.participation_score += boost - state.last_active_time = datetime.now() - - logger.debug(f"[{room_id}] 收到消息: '{message}' | 参与度+{boost} -> {state.participation_score:.2f} | 体力: {self.current_energy:.2f}") - - # 2. 检查阈值 - if state.participation_score < self.participation_threshold: - return False - - # 3. 检查体力 - if self.current_energy < self.energy_cost: - logger.debug(f"[{room_id}] 体力不足 ({self.current_energy:.2f} < {self.energy_cost}),跳过") - return False - - # 4. 概率判定 - # 参与度越高,概率越高;体力越高,概率越高 - # 归一化因子 - participation_factor = min(state.participation_score / 100.0, 1.5) # 上限1.5倍 - energy_factor = self.current_energy / self.max_energy - - final_prob = self.base_prob * participation_factor * energy_factor - - # 被AT必然回复 - if is_at: - final_prob = 1.0 - - # 随机判定 - rand_val = random.random() - should_reply = rand_val < final_prob - - logger.debug(f"[{room_id}] 判定: Prob={final_prob:.2f} (Base={self.base_prob} * Part={participation_factor:.2f} * Energy={energy_factor:.2f}) vs Rand={rand_val:.2f} -> {should_reply}") - - if should_reply: - # 扣除消耗 - self.current_energy -= self.energy_cost - - # 更新状态 - state.last_bot_reply_time = datetime.now() - - # 降低参与度(满足了表达欲) - # 改为减法,保留部分参与度以便连续对话 - # 如果是高参与度(>50),减去 30;否则减半 - if state.participation_score > 50: - state.participation_score = max(0, state.participation_score - 40) - else: - state.participation_score *= 0.5 - - return True - - return False - - def rule_high_reply_rate(self, timestamp, chat_log): - # 保留这个方法以兼容 main.py 的调用,或者在 main.py 中移除 - # 这里我们可以简单的返回 False,因为新的逻辑已经包含了频率控制(通过体力值) - return False diff --git a/plugins/ai_auto_response/config.toml b/plugins/ai_auto_response/config.toml index 170f71c..c082605 100644 --- a/plugins/ai_auto_response/config.toml +++ b/plugins/ai_auto_response/config.toml @@ -1,72 +1,88 @@ enable = true -dify_api_url = "http://192.168.2.240/v1/chat-messages" -dify_api_key = "app-oDHbln5CzBLt3uS9bIBlJjhZ" # 请在此处填入您的DIFY API密钥 +[persona] +name = "小牛" +persona_file = "persona/xiaoniu.txt" +style = "自然、口语化、像群友,先回答问题,再决定是否延伸" +emoji_probability = 0.18 +max_reply_sentences = 3 +familiarity_hint = "有熟悉感,但不过度装熟" -[Keywords] -# 表情符号库 -emojis = ["[捂脸]", "[奸笑]", "[可怜]", "[擦汗]", "[发呆]", "[抠鼻]", "[破涕为笑]", "[旺柴]"] -# 话题关键词 -hot_topics = ["咖啡", "手机", "小米", "华为", "苹果", "价格", "流畅", "螺蛳粉", "外卖"] -fish_keywords = [ - "鱼缸", "缸", "鱼", "鱼苗", "热带鱼", "金鱼", "观赏鱼", "罗汉鱼", "斗鱼", "孔雀鱼", "神仙鱼", "鹦鹉鱼", - "灯科", "龙鱼", "地图鱼", "魟鱼", "草缸", "海缸", "水草", "水泵", "滤材", "硝化", "过滤", "缸体", - "缸底", "底砂", "加热棒", "冷水鱼", "水温", "水质", "换水", "晒水", "PH值", "亚硝酸盐", "硝酸盐", - "鱼食", "投喂", "喂食", "缸养", "寄生虫", "白点", "病鱼", "翻肚", "缸爆", "沉底", "氧气泵", - "打氧", "造景", "生化球", "过滤桶", "外置过滤", "过滤棉", "清缸", "拉线", "鱼便", "崩缸", - "饲养", "养水", "循环系统", "水妖精", "进水口", "出水口", "滴流盒", "蛋分", "藻类", "绿水", - "爆藻", "石头", "沉木", "CO2", "定时器" +[api] +provider = "openai_compatible" +api_base_url = "https://api.xinmeng.dpdns.org/v1" +endpoint = "chat/completions" +api_key = "sk-XTWwXIgo2QMyX8AwBg0NQrxaDkvQiCX8rfylfmnHID5zdjMt" +model = "gpt-5.4" +timeout_seconds = 45 +temperature = 0.7 +max_tokens = 500 +stream = true + +[mode] +group_default_mode = "social" +question_reply_timeout_sec = 12 +followup_session_window_sec = 300 +recent_context_size = 30 +allow_proactive_reply = true +returning_member_days = 7 +long_absent_member_days = 30 +memory_lookback_days = 180 +active_context_hours = 8 + +[priority] +at_bot = 1.0 +explicit_question = 0.95 +followup = 0.90 +social_call = 0.65 +casual_topic = 0.35 + +[flow] +enable_flow_state = true +flow_decay_per_minute = 8 +idle_threshold = 20 +warming_threshold = 40 +engaged_threshold = 70 +at_bot_boost = 40 +question_boost = 30 +followup_boost = 20 +topic_boost = 15 +returning_member_boost = 10 +response_accepted_boost = 15 +ignored_reply_penalty = 20 +over_reply_penalty = 15 +night_penalty = 30 +max_bot_reply_streak = 3 + +[cooldown] +group_reply_cooldown_sec = 45 +same_user_followup_cooldown_sec = 10 +night_silent_hours = ["01:00-07:30"] + +[memory] +enable_member_context = true +enable_vector_memory = true +vector_provider = "qdrant" +embedding_provider = "ollama" +qdrant_url = "http://127.0.0.1:6333" +qdrant_collection = "abot_xiaoniu_memory" +ollama_base_url = "http://192.168.2.50:11434" +embedding_model = "bge-m3" +vector_top_k = 5 +vector_min_score = 0.65 +vector_trigger_modes = ["returning_member", "long_absent_member", "qa_with_context", "reactivated_topic"] + +[topics] +focus = [ + "技术", "开发", "程序", "python", "微信机器人", "脚本", "报错", "部署", + "服务器", "docker", "数据库", "redis", "mysql", "qdrant", "ollama", "dify", + "ai", "大模型", "接口", "插件", "自动化" ] -tech_keywords = [ - # 品牌/系统相关 - "MIUI", "鸿蒙", "iOS", "iPhone", "安卓", "Android", "Windows", "Mac", "Linux", - "小米", "华为", "荣耀", "苹果", "三星", "vivo", "OPPO", "realme", "魅族", "一加", "红米", "中兴", "联想", - # 功能/硬件组件 - "推送", "充电", "屏幕", "电池", "信号", "发热", "卡顿", "刷新率", "像素", "拍照", "音质", - "NFC", "红外", "扬声器", "马达", "快充", "耗电", "续航", "UI", "指纹", "面部识别", "解锁", - "摄像头", "前摄", "后摄", "内存", "闪存", "存储空间", "RAM", "ROM", "屏占比", "刘海屏", "挖孔屏", "折叠屏", +[filters] +ignore_prefixes = ["/", "#"] +ignore_exact = ["收到", "好的", "嗯", "哦", "6", "1", "?", "?"] +min_text_length = 1 - # 芯片/性能 - "骁龙", "天玑", "麒麟", "A系列", "SoC", "处理器", "芯片", "跑分", "安兔兔", "鲁大师", "GPU", "CPU", "核心", "制程", - - # 系统/应用/开发 - "系统更新", "OTA", "开源", "固件", "刷机", "root", "bootloader", "ADB", "Xposed", "系统权限", "卡刷", "线刷", - - # 科技趋势/智能化 - "AI", "人工智能", "大模型", "GPT", "ChatGPT", "语音助手", "Siri", "小爱同学", "小艺", "算法", "智能识别", - "自动驾驶", "感应器", "人脸识别", "语音识别", "AR", "VR", "混合现实", "穿戴设备", "智能手表", "手环" -] -mechanism_keywords = [] -news_keywords = ["新闻", "骨灰房", "法院", "判决", "住建局"] - -[TimeWindow] -# 早晨签到时间窗口(8:00-8:30) -morning_start_hour = 8 -morning_start_minute = 0 -morning_end_hour = 8 -morning_end_minute = 30 - -[ReplyThreshold] -# 每分钟消息数阈值,超过此值将触发AI介入 -messages_per_minute_threshold = 3 -# 分析窗口大小(分钟) -analysis_window_minutes = 5 - -[HumanLike] -# 最大体力值 -max_energy = 100.0 -# 体力恢复速度(每分钟) -energy_recovery_per_minute = 1.0 -# 每次回复消耗体力 -energy_cost_per_reply = 15.0 -# 基础参与度增加(每收到一条群消息) -participation_increase_per_msg = 5.0 -# 话题相关参与度奖励 -topic_match_bonus = 15.0 -# 触发回复的参与度阈值(只有参与度高于此值才可能回复) -participation_threshold = 20.0 -# 每次回复后参与度降低比例 (0.0 - 1.0, 1.0表示清零) -participation_drop_factor = 0.8 -# 基础回复概率 (0.0 - 1.0) - 当满足阈值时,基于此概率和体力值计算最终概率 -base_reply_probability = 0.6 \ No newline at end of file +[logging] +debug = true diff --git a/plugins/ai_auto_response/context_builder.py b/plugins/ai_auto_response/context_builder.py new file mode 100644 index 0000000..ae398ae --- /dev/null +++ b/plugins/ai_auto_response/context_builder.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from typing import Dict, List + + +class ContextBuilder: + def build( + self, + *, + room_id: str, + sender: str, + sender_name: str, + content: str, + recent_messages: List[Dict], + member_context: Dict, + trigger: Dict, + flow_state: str, + reply_mode: str, + vector_memories: List[Dict], + ) -> Dict: + recent_lines = [] + for item in recent_messages[-30:]: + msg_sender = item.get("sender_name") or item.get("sender") or "未知成员" + msg_content = item.get("content") or item.get("message") or "" + if msg_content: + recent_lines.append(f"{msg_sender}: {msg_content}") + return { + "group_profile": {"room_id": room_id}, + "speaker_profile": { + "wxid": sender, + "display_name": sender_name, + "member_context": member_context or {}, + }, + "recent_messages": recent_lines, + "recent_summary": "", + "trigger_type": trigger.get("trigger_type", "none"), + "reply_mode": reply_mode, + "flow_state": flow_state, + "memory_prompt": self._build_member_memory_prompt(member_context), + "vector_memory_prompt": self._build_vector_memory_prompt(vector_memories), + "current_message": f"{sender_name}: {content}", + } + + @staticmethod + def _build_member_memory_prompt(member_context: Dict) -> str: + if not member_context: + return "暂无稳定成员画像。" + meta = member_context.get("meta", {}) or {} + topics = member_context.get("topics_of_interest", []) or [] + recent_focus = member_context.get("recent_focus", []) or [] + lines = [ + f"成员摘要:{member_context.get('summary_text', '')}".strip(), + f"互动风格:{member_context.get('interaction_style', '')}".strip(), + f"回复偏好:{member_context.get('response_style_hint', '')}".strip(), + f"长期主题:{', '.join(topics[:5])}" if topics else "", + f"近期关注:{', '.join(recent_focus[:4])}" if recent_focus else "", + f"群内角色:{meta.get('group_role', '')}".strip(), + ] + return "\n".join([line for line in lines if line]) + + @staticmethod + def _build_vector_memory_prompt(vector_memories: List[Dict]) -> str: + if not vector_memories: + return "" + lines = [] + for item in vector_memories[:5]: + summary = item.get("content_summary") or item.get("summary_text") or item.get("text") or "" + memory_type = item.get("memory_type", "memory") + if summary: + lines.append(f"[{memory_type}] {summary}") + return "\n".join(lines) diff --git a/plugins/ai_auto_response/flow_manager.py b/plugins/ai_auto_response/flow_manager.py new file mode 100644 index 0000000..5be7588 --- /dev/null +++ b/plugins/ai_auto_response/flow_manager.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, time +from typing import Dict + + +@dataclass +class FlowState: + room_id: str + score: float = 0.0 + state: str = "idle" + last_update: datetime = field(default_factory=datetime.now) + last_bot_reply_at: datetime | None = None + last_human_message_at: datetime | None = None + bot_reply_streak: int = 0 + ignored_reply_count: int = 0 + accepted_reply_count: int = 0 + last_topic: str = "" + + +class FlowManager: + def __init__(self, config: Dict): + self.config = config or {} + self.states: Dict[str, FlowState] = {} + + def get_state(self, room_id: str) -> FlowState: + if room_id not in self.states: + self.states[room_id] = FlowState(room_id=room_id) + return self.states[room_id] + + def decay(self, room_id: str) -> FlowState: + state = self.get_state(room_id) + now = datetime.now() + elapsed_minutes = max((now - state.last_update).total_seconds() / 60.0, 0.0) + decay = elapsed_minutes * float(self.config.get("flow_decay_per_minute", 8)) + state.score = max(0.0, state.score - decay) + state.last_update = now + state.state = self._score_to_state(state.score) + return state + + def apply_message_event(self, room_id: str, event: Dict) -> FlowState: + state = self.decay(room_id) + now = datetime.now() + if self._is_night_silent(now.time()): + state.score = max(0.0, state.score - float(self.config.get("night_penalty", 30))) + if event.get("is_at"): + state.score += float(self.config.get("at_bot_boost", 40)) + if event.get("is_question"): + state.score += float(self.config.get("question_boost", 30)) + if event.get("is_followup"): + state.score += float(self.config.get("followup_boost", 20)) + if event.get("topic_hit"): + state.score += float(self.config.get("topic_boost", 15)) + if event.get("is_returning_member"): + state.score += float(self.config.get("returning_member_boost", 10)) + if state.last_bot_reply_at: + since_reply = (now - state.last_bot_reply_at).total_seconds() + if since_reply <= 180 and event.get("message_after_bot"): + state.score += float(self.config.get("response_accepted_boost", 15)) + state.accepted_reply_count += 1 + state.last_human_message_at = now + state.last_topic = event.get("topic") or state.last_topic + state.state = self._score_to_state(state.score) + return state + + def note_bot_reply(self, room_id: str) -> FlowState: + state = self.decay(room_id) + state.last_bot_reply_at = datetime.now() + state.bot_reply_streak += 1 + max_streak = int(self.config.get("max_bot_reply_streak", 3)) + if state.bot_reply_streak > max_streak: + state.score = max(0.0, state.score - float(self.config.get("over_reply_penalty", 15))) + state.state = self._score_to_state(state.score) + return state + + def _score_to_state(self, score: float) -> str: + idle_threshold = float(self.config.get("idle_threshold", 20)) + warming_threshold = float(self.config.get("warming_threshold", 40)) + engaged_threshold = float(self.config.get("engaged_threshold", 70)) + if score < idle_threshold: + return "idle" + if score < warming_threshold: + return "warming" + if score < engaged_threshold: + return "engaged" + return "deep_engaged" + + def _is_night_silent(self, current_time: time) -> bool: + for window in self.config.get("night_silent_hours", []): + try: + start_str, end_str = window.split("-", 1) + start = time.fromisoformat(start_str) + end = time.fromisoformat(end_str) + if start <= end: + if start <= current_time <= end: + return True + else: + if current_time >= start or current_time <= end: + return True + except Exception: + continue + return False diff --git a/plugins/ai_auto_response/llm_client.py b/plugins/ai_auto_response/llm_client.py new file mode 100644 index 0000000..fcce89f --- /dev/null +++ b/plugins/ai_auto_response/llm_client.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +import json +from typing import Dict, List + +import requests + + +class LLMClient: + def __init__(self, config: Dict): + self.config = config or {} + self.provider = self.config.get("provider", "openai_compatible") + self.base_url = str(self.config.get("api_base_url", "")).rstrip("/") + self.endpoint = str(self.config.get("endpoint", "chat/completions")).lstrip("/") + self.api_key = self.config.get("api_key", "") + self.model = self.config.get("model", "") + self.timeout_seconds = int(self.config.get("timeout_seconds", 45)) + self.temperature = float(self.config.get("temperature", 0.7)) + self.max_tokens = int(self.config.get("max_tokens", 500)) + self.stream = bool(self.config.get("stream", True)) + self.last_error = "" + + def chat(self, system_prompt: str, user_prompt: str, user_id: str) -> str: + self.last_error = "" + if not self.base_url: + self.last_error = "empty_base_url" + return "" + if self.provider == "openai_compatible": + return self._chat_openai_compatible(system_prompt, user_prompt, user_id) + self.last_error = f"unsupported_provider:{self.provider}" + return "" + + def _chat_openai_compatible(self, system_prompt: str, user_prompt: str, user_id: str) -> str: + if not self.model: + return "" + + payload = { + "model": self.model, + "messages": self._build_messages(system_prompt, user_prompt), + "temperature": self.temperature, + "max_tokens": self.max_tokens, + "user": user_id, + } + if self.stream: + payload["stream"] = True + headers = { + "Content-Type": "application/json", + } + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + + try: + if self.stream: + return self._chat_streaming(payload, headers) + response = requests.post( + f"{self.base_url}/{self.endpoint}", + json=payload, + headers=headers, + timeout=self.timeout_seconds, + ) + response.raise_for_status() + data = response.json() + text = self._extract_text(data) + if text: + return text + self.last_error = f"empty_model_output:{self.model}" + return "" + except Exception as exc: + self.last_error = f"request_failed:{exc}" + return "" + + def _chat_streaming(self, payload: Dict, headers: Dict[str, str]) -> str: + chunks: List[str] = [] + with requests.post( + f"{self.base_url}/{self.endpoint}", + json=payload, + headers=headers, + timeout=self.timeout_seconds, + stream=True, + ) as response: + response.raise_for_status() + buffer = b"" + for part in response.iter_content(chunk_size=None): + if not part: + continue + buffer += part + while b"\n\n" in buffer: + event, buffer = buffer.split(b"\n\n", 1) + try: + event_text = event.decode("utf-8") + except UnicodeDecodeError: + buffer = event + b"\n\n" + buffer + break + text_piece, done = self._parse_sse_event(event_text) + if text_piece: + chunks.append(text_piece) + if done: + final_text = "".join(chunks).strip() + if final_text: + return final_text + self.last_error = f"empty_stream_output:{self.model}" + return "" + final_text = "".join(chunks).strip() + if final_text: + return final_text + self.last_error = f"empty_stream_output:{self.model}" + return "" + + @staticmethod + def _build_messages(system_prompt: str, user_prompt: str) -> List[Dict[str, str]]: + return [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + @staticmethod + def _extract_text(data: Dict) -> str: + choices = data.get("choices") or [] + if choices: + message = choices[0].get("message", {}) or {} + content = message.get("content") + if isinstance(content, str) and content.strip(): + return content.strip() + if isinstance(content, list): + parts = [] + for item in content: + if isinstance(item, dict): + text = item.get("text") or item.get("content") + if isinstance(text, str) and text.strip(): + parts.append(text.strip()) + if parts: + return "\n".join(parts).strip() + for key in ("reasoning_content", "text", "output_text"): + value = message.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + for key in ("output_text", "text", "answer", "response"): + value = data.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return "" + + @classmethod + def _parse_sse_event(cls, event_text: str) -> tuple[str, bool]: + lines = [line.strip() for line in event_text.splitlines() if line.strip()] + data_lines = [line[5:].strip() for line in lines if line.startswith("data:")] + if not data_lines: + return "", False + data = "\n".join(data_lines) + if data == "[DONE]": + return "", True + obj = json.loads(data) + choice = (obj.get("choices") or [{}])[0] + delta = choice.get("delta") or {} + content = delta.get("content") + if isinstance(content, str): + return content, False + if isinstance(content, list): + parts = [] + for item in content: + if isinstance(item, dict): + text = item.get("text") or item.get("content") + if isinstance(text, str): + parts.append(text) + return "".join(parts), False + return "", False diff --git a/plugins/ai_auto_response/main.py b/plugins/ai_auto_response/main.py index 43ffd34..f670a6d 100644 --- a/plugins/ai_auto_response/main.py +++ b/plugins/ai_auto_response/main.py @@ -1,49 +1,56 @@ +from __future__ import annotations + +import re +import time +import xml.etree.ElementTree as ET +from typing import Any, Dict, List, Optional, Tuple + from loguru import logger -import os -import requests -from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus -from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager +from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.wechat.contact_manager import ContactManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.message import MessageType -import xml.etree.ElementTree as ET -from .bot_ai import InterventionBot +from .context_builder import ContextBuilder +from .flow_manager import FlowManager +from .llm_client import LLMClient +from .memory_store import MemoryStore +from .persona_engine import PersonaEngine +from .response_planner import ResponsePlanner +from .triggers import TriggerRouter +from .vector_memory import VectorMemoryStore class AIAutoResponsePlugin(MessagePluginInterface): - """AI自动对话插件""" - - # 功能权限常量 FEATURE_KEY = "AI_AUTO_RESPONSE" - FEATURE_DESCRIPTION = "🤖 AI自动对话功能 [自动对话]" + FEATURE_DESCRIPTION = "🐮 小牛拟人群聊BOT [群聊拟真、及时答疑、长期记忆]" @property def name(self) -> str: - return "AI自动对话" + return "小牛群聊BOT" @property def version(self) -> str: - return "1.0.0" + return "2.0.0" @property def description(self) -> str: - return "提供AI自动对话功能,可以在群聊中自动介入对话" + return "拟人化群聊BOT,支持心流、长期记忆和回归成员识别" @property def author(self) -> str: - return "liu.wei" + return "ABOT Team" @property def command_prefix(self) -> Optional[str]: - return "" # 不需要前缀,直接匹配命令 + return None @property def commands(self) -> List[str]: - return self._commands + return [] @property def feature_key(self) -> Optional[str]: @@ -55,207 +62,452 @@ class AIAutoResponsePlugin(MessagePluginInterface): def __init__(self): super().__init__() - self.intervention_bot = None - self.group_messages = {} # 存储每个群的最近消息 - self.max_messages = 100 # 每个群最多存储的消息数量 - # 注册功能权限 self.feature = self.register_feature() - - # DIFY API配置 - self.dify_api_url = "" - self.dify_api_key = "" # 需要在配置文件中设置 + self.group_messages: Dict[str, List[Dict]] = {} + self.enable = True + self.last_reply_at: Dict[str, float] = {} def initialize(self, context: Dict[str, Any]) -> bool: - """初始化插件""" self.LOG = logger - self.LOG.debug(f"正在初始化 {self.name} 插件...") - - # 保存上下文对象 - self.event_system = context.get("event_system") - - # 加载配置 - config_path = os.path.join(os.path.dirname(__file__), "config.toml") - self.enable = self._config.get("enable", True) - - # 从配置中获取DIFY API密钥 - self.dify_api_key = self._config.get("dify_api_key", "") - self.dify_api_url = self._config.get("dify_api_url", "") - - # 初始化介入机器人 - self.intervention_bot = InterventionBot(config_path) - + self.db_manager = context.get("db_manager") + self.enable = bool(self._config.get("enable", True)) + self.persona_engine = PersonaEngine(self.get_plugin_path(), self._config.get("persona", {})) + self.flow_manager = FlowManager({ + **(self._config.get("flow", {}) or {}), + "night_silent_hours": (self._config.get("cooldown", {}) or {}).get("night_silent_hours", []), + }) + merged_trigger_config = dict(self._config.get("priority", {}) or {}) + merged_trigger_config.update(self._config.get("topics", {}) or {}) + self.trigger_router = TriggerRouter(merged_trigger_config) + merged_memory_config = dict(self._config.get("mode", {}) or {}) + merged_memory_config.update(self._config.get("memory", {}) or {}) + self.memory_store = MemoryStore(self.db_manager, merged_memory_config) + self.vector_memory = VectorMemoryStore(self._config.get("memory", {}) or {}) + self.context_builder = ContextBuilder() + self.response_planner = ResponsePlanner() + self.llm_client = LLMClient(self._config.get("api", {}) or {}) + self.filters = self._config.get("filters", {}) or {} + self.mode_config = self._config.get("mode", {}) or {} + self.cooldown_config = self._config.get("cooldown", {}) or {} + self._synced_member_context_versions: Dict[str, str] = {} + self.log_debug = bool((self._config.get("logging", {}) or {}).get("debug", True)) + self.LOG.debug(f"[{self.name}] 初始化完成") return True def start(self) -> bool: - """启动插件""" - self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: - """停止插件""" - self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: - """检查是否可以处理该消息""" if not self.enable: return False - - content = str(message.get("content", "")).strip() - roomid = message.get("roomid", "") - - if GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: + room_id = message.get("roomid", "") + if not room_id: + return False + if GroupBotManager.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED: return False - # 如果是群消息,且该群启用了自动回复,则处理 - if roomid: - self.LOG.debug(f"[{roomid}] 进入AI自动回复逻辑") - # 存储消息 - if roomid not in self.group_messages: - self.group_messages[roomid] = [] - msg_type = message.get("type") - # 获取发送者昵称 - sender_id = message.get("sender", "") - try: - members = ContactManager.get_instance().get_group_members(roomid) - sender_name = members.get(sender_id, sender_id) - except Exception: - sender_name = sender_id + msg_type = message.get("type") + if msg_type not in (MessageType.TEXT, MessageType.APP): + return False - # 仅追加文本(1)与应用消息(49),并对49提取标题 - content_to_store = None - try: - if msg_type == MessageType.TEXT: - content_to_store = content - elif msg_type == MessageType.APP: - try: - root = ET.fromstring(content) - title_elem = root.find('.//title') - if title_elem is not None and title_elem.text: - content_to_store = title_elem.text - else: - content_to_store = "[应用消息]" - except Exception as e: - self.LOG.error(f"解析消息类型49出错: {e}") - content_to_store = "[应用消息]" - except Exception as e: - self.LOG.error(f"处理消息类型出错: {e}") - content_to_store = None + full_msg = message.get("full_wx_msg") + if full_msg and full_msg.from_self(): + return False - if content_to_store is not None: - # 添加新消息 - current_message = { - "timestamp": message.get("timestamp", ""), - "message": content_to_store, - "sender": sender_id, - "sender_name": sender_name - } - - # 添加新消息 - if content_to_store is not None: - self.group_messages[roomid].append(current_message) - - # 限制消息数量 - if len(self.group_messages[roomid]) > self.max_messages: - self.group_messages[roomid] = self.group_messages[roomid][-self.max_messages:] - - # 判断是否需要介入 - messages = [msg["message"] for msg in self.group_messages[roomid]] - timestamp = message.get("timestamp", "") - # 传递完整的聊天记录给should_intervene方法 - can = self.intervention_bot.should_intervene(roomid, timestamp, content, messages, self.group_messages[roomid]) - if can: - self.LOG.debug(f"[{roomid}] 触发自动回复规则,准备生成回复") - return True - else: - self.LOG.debug(f"[{roomid}] 跳过聊天") - return False - return False + content = self._normalize_content(message) + if not content: + return False + if self._should_ignore(content): + return False + return True async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """处理消息""" - content = str(message.get("content", "")).strip() - self.LOG.debug(f"插件执行: {self.name}:{content}") - sender = message.get("sender") - roomid = message.get("roomid", "") + room_id = message.get("roomid", "") + sender = message.get("sender", "") bot: WechatAPIClient = message.get("bot") - # 检查权限 - if roomid and GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: - return False, "没有权限" - # 处理自动回复 - try: - # 获取最近的消息 (完整的消息对象) - chat_history = self.group_messages[roomid] - timestamp = message.get("timestamp", "") + content = self._normalize_content(message) + sender_name = self._get_sender_name(room_id, sender) + self._log_event( + "recv", + room_id=room_id, + sender=sender, + sender_name=sender_name, + is_at=message.get("is_at", False), + content_preview=self._preview(content), + msg_type=str(message.get("type")), + ) - # 记录触发原因 - if self.intervention_bot.rule_high_reply_rate(timestamp, self.group_messages[roomid]): - self.LOG.info(f"[{roomid}] 触发高频率回复规则,准备生成回复") + normalized_message = { + "sender": sender, + "sender_name": sender_name, + "content": content, + "timestamp": message.get("timestamp"), + } + self._append_group_message(room_id, normalized_message) - # 生成回复 - response = self._generate_response_with_dify(content, chat_history) - if response: - # 发送回复 - await bot.send_text_message(roomid, response, sender) - return False, "自动回复成功" - else: - return False, "生成回复失败" + memory_hints = self.memory_store.build_memory_hints(room_id, sender) + self._sync_member_memory(room_id, sender, sender_name, memory_hints.get("member_context", {})) + self._log_event( + "memory", + room_id=room_id, + sender=sender, + returning_state=memory_hints.get("returning_member_state", "") or "none", + has_member_context=bool(memory_hints.get("member_context")), + is_followup=memory_hints.get("is_followup", False), + last_active_at=memory_hints.get("last_active_at", "") or "", + ) + trigger = self.trigger_router.route(message | {"content": content}, memory_hints) + flow_state = self.flow_manager.apply_message_event(room_id, { + "is_at": message.get("is_at", False), + "is_question": trigger.is_question, + "is_followup": trigger.is_followup, + "topic_hit": bool(trigger.topic), + "topic": trigger.topic, + "is_returning_member": trigger.is_returning_member, + "message_after_bot": True, + }) + self._log_event( + "decision", + room_id=room_id, + sender=sender, + trigger_type=trigger.trigger_type, + priority=trigger.priority, + reasons="|".join(trigger.reasons), + flow_state=flow_state.state, + flow_score=round(flow_state.score, 2), + topic=trigger.topic or "", + ) - except Exception as e: - self.LOG.error(f"处理AI自动对话出错: {e}") - return False, f"处理出错: {e}" - - def _generate_response_with_dify(self, current_message: str, chat_history: List[Dict[str, Any]]) -> str: - """使用DIFY API生成自动回复内容""" - try: - # 构建上下文消息 - # 取更多上下文以帮助理解语境 - recent_msgs = chat_history[-10:] if len(chat_history) > 10 else chat_history - - context_str_list = [] - for msg in recent_msgs: - # 优先使用昵称,如果没有则使用sender ID - sender = msg.get("sender_name") or msg.get("sender", "Unknown") - content = msg.get("message", "") - context_str_list.append(f"{sender}: {content}") - - context = "\n".join(context_str_list) - - # 构建提示词 - 增强拟人化指令 - prompt = ( - f"当前群聊上下文(格式为 '发言人: 内容',最后一句是最新消息):\n{context}\n\n" - f"指令:\n" - f"1. 参考上下文。\n" - f"2. 保持简短(1-2句话),口语化,不要长篇大论。\n" - f"3. 不要重复之前的回复。\n" - f"4. 如果最后一句不是对你说的,且你觉得没必要强行接话,可以回个表情或简短的语气词,或者委婉结束话题。\n" - f"请生成回复:" + allow_proactive = bool(self.mode_config.get("allow_proactive_reply", True)) + reply_mode = self.response_planner.choose_reply_mode(trigger.__dict__, flow_state.state) + should_reply = self.response_planner.should_reply(trigger.__dict__, flow_state.state, allow_proactive) + if not should_reply: + self._log_event( + "skip", + room_id=room_id, + sender=sender, + reason="planner_skip", + trigger_type=trigger.trigger_type, + reply_mode=reply_mode, + flow_state=flow_state.state, ) + return False, "skip" + if not self._pass_cooldown(room_id, trigger.__dict__): + self._log_event( + "skip", + room_id=room_id, + sender=sender, + reason="cooldown", + trigger_type=trigger.trigger_type, + reply_mode=reply_mode, + ) + return False, "cooldown" - # 调用DIFY API - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.dify_api_key}" - } + recent_messages = self.group_messages.get(room_id) or self.memory_store.get_recent_messages(room_id) + vector_memories = [] + if self.vector_memory.should_search(reply_mode, trigger.trigger_type, memory_hints.get("returning_member_state", "")): + vector_memories = self.vector_memory.search(content, room_id, sender) + self._log_event( + "context", + room_id=room_id, + sender=sender, + reply_mode=reply_mode, + recent_message_count=len(recent_messages), + vector_hit_count=len(vector_memories), + ) - payload = { - "inputs": {}, - "query": prompt, - "response_mode": "blocking", - "user": "ai_auto_response" - } + context = self.context_builder.build( + room_id=room_id, + sender=sender, + sender_name=sender_name, + content=content, + recent_messages=recent_messages, + member_context=memory_hints.get("member_context", {}), + trigger=trigger.__dict__, + flow_state=flow_state.state, + reply_mode=reply_mode, + vector_memories=vector_memories, + ) - response = requests.post(self.dify_api_url, headers=headers, json=payload) + system_prompt = self.persona_engine.build_system_prompt() + user_prompt = self._build_user_prompt(context, memory_hints) + response = self._sanitize_response(self.llm_client.chat(system_prompt, user_prompt, user_id=f"{room_id}:{sender}")) + if not response: + self._log_event( + "model_empty", + room_id=room_id, + sender=sender, + model=self.llm_client.model, + last_error=self.llm_client.last_error, + reply_mode=reply_mode, + ) + return False, "empty_response" - if response.status_code == 200: - result = response.json() - return result.get("answer", "") - else: - self.LOG.error(f"DIFY API调用失败: {response.status_code} - {response.text}") - return "" + await bot.send_text_message(room_id, response, sender) + self.last_reply_at[room_id] = time.time() + self.flow_manager.note_bot_reply(room_id) + self.memory_store.note_bot_reply(room_id, sender, trigger.topic) + self._upsert_interaction_memory(room_id, sender, sender_name, content, response, trigger.trigger_type, trigger.topic) + self._log_event( + "sent", + room_id=room_id, + sender=sender, + sender_name=sender_name, + trigger_type=trigger.trigger_type, + reply_mode=reply_mode, + response_preview=self._preview(response), + response_len=len(response), + ) + return False, "replied" - except Exception as e: - self.LOG.error(f"生成回复出错: {e}") + def _append_group_message(self, room_id: str, message: Dict) -> None: + items = self.group_messages.setdefault(room_id, []) + items.append(message) + size = int(self.mode_config.get("recent_context_size", 30)) + if len(items) > size: + self.group_messages[room_id] = items[-size:] + + def _normalize_content(self, message: Dict[str, Any]) -> str: + msg_type = message.get("type") + content = str(message.get("content", "")).strip() + if msg_type == MessageType.TEXT: + return self._strip_at_prefix(content) + if msg_type == MessageType.APP: + try: + root = ET.fromstring(content) + title = root.find(".//title") + return (title.text or "").strip() if title is not None else "[应用消息]" + except Exception: + return "[应用消息]" + return content + + @staticmethod + def _strip_at_prefix(content: str) -> str: + return re.sub(r"@.*?[\u2005\s]+", "", content).strip() + + def _should_ignore(self, content: str) -> bool: + if len(content) < int(self.filters.get("min_text_length", 1)): + return True + if content in set(self.filters.get("ignore_exact", [])): + return True + return any(content.startswith(prefix) for prefix in self.filters.get("ignore_prefixes", [])) + + def _get_sender_name(self, room_id: str, sender: str) -> str: + try: + members = ContactManager.get_instance().get_group_members(room_id) + return members.get(sender, sender) + except Exception: + return sender + + def _pass_cooldown(self, room_id: str, trigger: Dict) -> bool: + current_ts = time.time() + room_cd = int(self.cooldown_config.get("group_reply_cooldown_sec", 45)) + user_cd = int(self.cooldown_config.get("same_user_followup_cooldown_sec", 10)) + last_room_reply = self.last_reply_at.get(room_id, 0.0) + if trigger.get("is_question") or trigger.get("is_followup") or trigger.get("trigger_type") == "at_trigger": + return (current_ts - last_room_reply) >= user_cd + return (current_ts - last_room_reply) >= room_cd + + def _build_user_prompt(self, context: Dict, memory_hints: Dict) -> str: + recent_text = "\n".join(context.get("recent_messages", [])[-20:]) or "暂无" + reply_mode = context.get("reply_mode", "social_short") + length_rule = self._build_length_rule(reply_mode) + return ( + f"当前群聊消息:\n{recent_text}\n\n" + f"当前发言:{context.get('current_message', '')}\n" + f"触发类型:{context.get('trigger_type', 'none')}\n" + f"回复模式:{context.get('reply_mode', 'social_short')}\n" + f"当前心流状态:{context.get('flow_state', 'idle')}\n" + f"成员稳定记忆:\n{context.get('memory_prompt', '暂无')}\n\n" + f"向量召回记忆:\n{context.get('vector_memory_prompt', '') or '暂无'}\n\n" + f"补充信息:回归状态={memory_hints.get('returning_member_state', '') or 'none'}\n" + f"要求:\n" + f"1. 如果是明确问题,先给清楚答案。\n" + f"2. 如果只是轻量接话,保持自然短句。\n" + f"3. 不要暴露系统记忆来源。\n" + f"4. 如果信息不足,不要硬编。\n" + f"5. 输出最终可直接发到群里的内容,不要解释你的思路。\n" + f"6. {length_rule}\n" + ) + + @staticmethod + def _sanitize_response(response: str) -> str: + if not response: return "" + response = response.strip() + response = re.sub(r"\n{3,}", "\n\n", response) + return response[:500].strip() + + @staticmethod + def _build_length_rule(reply_mode: str) -> str: + if reply_mode == "social_short": + return "默认只回一句短话,最好控制在2到12个字,除非非常不自然。" + if reply_mode == "qa_fast": + return "尽量只回1句话,必要时最多2句,先给结论,不要展开成长教程。" + if reply_mode == "qa_with_context": + return "优先控制在1到2句,除非对方明显在等详细步骤。" + return "尽量短,像群友临时接一句,不要长篇大论。" + + + def _sync_member_memory(self, room_id: str, sender: str, sender_name: str, member_context: Dict) -> None: + if not member_context: + return + version = str(member_context.get("last_profiled_at", "")) + cache_key = f"{room_id}:{sender}" + if version and self._synced_member_context_versions.get(cache_key) == version: + return + text = self.context_builder._build_member_memory_prompt(member_context) + if not text or text == "暂无稳定成员画像。": + return + payload = { + "chatroom_id": room_id, + "wxid": sender, + "display_name": sender_name, + "memory_type": "member_context_snapshot", + "source_id": cache_key, + "last_active_at": member_context.get("last_profiled_at", ""), + "topic_tags": member_context.get("topics_of_interest", [])[:5], + "summary_text": member_context.get("summary_text", ""), + } + ok = self.vector_memory.upsert_memory(f"member_context:{cache_key}:{version}", text, payload) + self._log_event( + "memory_upsert", + room_id=room_id, + sender=sender, + memory_type="member_context_snapshot", + ok=ok, + ) + if ok and version: + self._synced_member_context_versions[cache_key] = version + + def _upsert_interaction_memory( + self, + room_id: str, + sender: str, + sender_name: str, + content: str, + response: str, + trigger_type: str, + topic: str, + ) -> None: + text = f"{sender_name}说:{content}\n小牛回复:{response}" + payload = { + "chatroom_id": room_id, + "wxid": sender, + "display_name": sender_name, + "memory_type": "interaction_memory", + "topic_tags": [item for item in [topic, trigger_type] if item], + "created_at": time.strftime("%Y-%m-%d %H:%M:%S"), + "source_id": f"{room_id}:{sender}:{int(time.time())}", + "summary_text": text[:500], + } + ok = self.vector_memory.upsert_memory(payload["source_id"], text, payload) + self._log_event( + "memory_upsert", + room_id=room_id, + sender=sender, + memory_type="interaction_memory", + ok=ok, + trigger_type=trigger_type, + ) + + def _log_event(self, event: str, **kwargs: Any) -> None: + if not self.log_debug: + return + summary = self._build_log_summary(event, kwargs) + self.LOG.info(summary) + + @staticmethod + def _preview(text: str, limit: int = 80) -> str: + text = (text or "").replace("\n", "\\n").strip() + if len(text) <= limit: + return text + return text[: limit - 3] + "..." + + def _build_log_summary(self, event: str, data: Dict[str, Any]) -> str: + room = self._short_id(data.get("room_id", "")) + sender_name = data.get("sender_name", "") or self._short_id(data.get("sender", "")) + sender = self._short_id(data.get("sender", "")) + + if event == "recv": + return ( + f"[XIAONIU] RECV room={room} user={sender_name}/{sender} " + f"at={self._yn(data.get('is_at'))} msg={data.get('content_preview', '')}" + ).strip() + + if event == "memory": + return ( + f"[XIAONIU] MEMORY room={room} user={sender} " + f"ctx={self._yn(data.get('has_member_context'))} " + f"follow={self._yn(data.get('is_followup'))} " + f"return={data.get('returning_state', 'none')}" + ).strip() + + if event == "decision": + return ( + f"[XIAONIU] DECIDE room={room} user={sender} " + f"trigger={data.get('trigger_type', 'none')} " + f"flow={data.get('flow_state', '')}:{data.get('flow_score', '')} " + f"topic={data.get('topic', '-') or '-'} " + f"reasons={data.get('reasons', '-') or '-'}" + ).strip() + + if event == "skip": + return ( + f"[XIAONIU] SKIP room={room} user={sender} " + f"reason={data.get('reason', '')} " + f"trigger={data.get('trigger_type', 'none')} " + f"mode={data.get('reply_mode', '')}" + ).strip() + + if event == "context": + return ( + f"[XIAONIU] CTX room={room} user={sender} " + f"mode={data.get('reply_mode', '')} " + f"recent={data.get('recent_message_count', 0)} " + f"vector={data.get('vector_hit_count', 0)}" + ).strip() + + if event == "model_empty": + return ( + f"[XIAONIU] MODEL_EMPTY room={room} user={sender} " + f"model={data.get('model', '')} " + f"mode={data.get('reply_mode', '')} " + f"err={data.get('last_error', '')}" + ).strip() + + if event == "sent": + return ( + f"[XIAONIU] SENT room={room} user={sender_name}/{sender} " + f"trigger={data.get('trigger_type', 'none')} " + f"mode={data.get('reply_mode', '')} " + f"len={data.get('response_len', 0)} " + f"reply={data.get('response_preview', '')}" + ).strip() + + if event == "memory_upsert": + return ( + f"[XIAONIU] MEM_UPSERT room={room} user={sender} " + f"type={data.get('memory_type', '')} " + f"ok={self._yn(data.get('ok'))} " + f"trigger={data.get('trigger_type', '-') or '-'}" + ).strip() + + compact = " ".join(f"{key}={data[key]}" for key in sorted(data) if data.get(key) not in (None, "")) + return f"[XIAONIU] {event.upper()} {compact}".strip() + + @staticmethod + def _yn(value: Any) -> str: + return "Y" if bool(value) else "N" + + @staticmethod + def _short_id(value: str) -> str: + value = str(value or "") + if len(value) <= 10: + return value + return value[:4] + "..." + value[-4:] diff --git a/plugins/ai_auto_response/memory_store.py b/plugins/ai_auto_response/memory_store.py new file mode 100644 index 0000000..4c0b248 --- /dev/null +++ b/plugins/ai_auto_response/memory_store.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Dict, List, Optional + +from db.member_context_db import MemberContextDBOperator +from db.message_storage import MessageStorageDB + + +class MemoryStore: + def __init__(self, db_manager, config: Dict): + self.config = config or {} + self.message_db = MessageStorageDB(db_manager) + self.member_context_db = MemberContextDBOperator(db_manager) + self.followup_sessions: Dict[str, Dict] = {} + + def get_recent_messages(self, room_id: str) -> List[Dict]: + hours = int(self.config.get("active_context_hours", 8)) + recent = self.message_db.get_recent_messages(room_id, hours_ago=hours, min_content_length=0) or [] + size = int(self.config.get("recent_context_size", 30)) + return recent[-size:] + + def get_member_context(self, room_id: str, wxid: str) -> Optional[Dict]: + if not self.config.get("enable_member_context", True): + return None + return self.member_context_db.get_member_context(room_id, wxid) + + def build_memory_hints(self, room_id: str, wxid: str) -> Dict: + lookback_days = int(self.config.get("memory_lookback_days", 180)) + returning_days = int(self.config.get("returning_member_days", 7)) + long_absent_days = int(self.config.get("long_absent_member_days", 30)) + active_dates = self.message_db.get_member_active_dates(room_id, wxid, days=lookback_days) or [] + member_context = self.get_member_context(room_id, wxid) + + last_active_at = "" + returning_state = "" + days_since_active = None + if active_dates: + last_item = active_dates[-1] + last_active_at = last_item.get("last_message_time") or "" + parsed = self._parse_datetime(last_active_at) + if parsed: + days_since_active = max((datetime.now() - parsed).days, 0) + if days_since_active >= long_absent_days: + returning_state = "long_absent_member" + elif days_since_active >= returning_days: + returning_state = "returning_member" + + followup = self._get_followup_state(room_id, wxid) + return { + "member_context": member_context or {}, + "last_active_at": last_active_at, + "days_since_active": days_since_active, + "returning_member_state": returning_state, + "is_followup": followup, + } + + def note_bot_reply(self, room_id: str, wxid: str, topic: str = "") -> None: + key = self._followup_key(room_id, wxid) + self.followup_sessions[key] = { + "last_bot_reply_at": datetime.now(), + "topic": topic, + } + + def _get_followup_state(self, room_id: str, wxid: str) -> bool: + key = self._followup_key(room_id, wxid) + state = self.followup_sessions.get(key) + if not state: + return False + timeout = int(self.config.get("followup_session_window_sec", 300)) + last_reply_at = state.get("last_bot_reply_at") + if not last_reply_at: + return False + return (datetime.now() - last_reply_at).total_seconds() <= timeout + + @staticmethod + def _followup_key(room_id: str, wxid: str) -> str: + return f"{room_id}:{wxid}" + + @staticmethod + def _parse_datetime(value: str) -> Optional[datetime]: + if not value: + return None + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): + try: + return datetime.strptime(value, fmt) + except ValueError: + continue + return None diff --git a/plugins/ai_auto_response/persona/xiaoniu.txt b/plugins/ai_auto_response/persona/xiaoniu.txt new file mode 100644 index 0000000..d9b2409 --- /dev/null +++ b/plugins/ai_auto_response/persona/xiaoniu.txt @@ -0,0 +1,23 @@ +你叫小牛,是微信群里的常驻虚拟成员,不是客服,不是公告机,也不是冷冰冰的机器人。 + +你的人设要求: +1. 说话自然、口语化、像群友,不端着。 +2. 优先解决问题,尤其是有人明确提问、@你、或者继续追问时。 +3. 有熟悉感,但不过分装熟,不要让人觉得你在翻旧档案。 +4. 轻松聊天时尽量短,答疑时尽量清楚。 +5. 不确定就直接说不确定,不编。 +6. 不抢话,不刷屏,不要每条都接。 +7. 在群里更像一个靠谱、反应快、稍微有点温度的成员。 + +你的表达偏好: +- 能一句说清就别说三句 +- 避免客服腔、教程腔、模板腔 +- 除非很有必要,不要长篇大论 +- 允许少量语气词,但不要太油腻 +- 面对回归成员时,可以表现出轻微熟悉感,但不要直接暴露细粒度历史记录 + +你的边界: +- 不要假装知道不存在的上下文 +- 不要把系统记忆原样说给用户听 +- 不要在闲聊里强行翻旧事 +- 敏感、风险、不确定的话题要收敛 diff --git a/plugins/ai_auto_response/persona_engine.py b/plugins/ai_auto_response/persona_engine.py new file mode 100644 index 0000000..25ee4de --- /dev/null +++ b/plugins/ai_auto_response/persona_engine.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Dict + + +class PersonaEngine: + def __init__(self, plugin_path: str, config: Dict): + self.plugin_path = Path(plugin_path) + self.config = config or {} + self.persona_text = self._load_persona() + + def build_system_prompt(self) -> str: + name = self.config.get("name", "小牛") + style = self.config.get("style", "") + familiarity = self.config.get("familiarity_hint", "") + max_sentences = self.config.get("max_reply_sentences", 3) + return ( + f"{self.persona_text}\n\n" + f"补充约束:\n" + f"- 你当前对外名称固定为{name}\n" + f"- 整体风格:{style}\n" + f"- 熟悉感边界:{familiarity}\n" + f"- 一般最多输出{max_sentences}句\n" + f"- 优先根据场景决定是答疑、接话还是不说话\n" + ) + + def _load_persona(self) -> str: + persona_file = self.config.get("persona_file", "persona/xiaoniu.txt") + persona_path = self.plugin_path / persona_file + if persona_path.exists(): + return persona_path.read_text(encoding="utf-8").strip() + return "你叫小牛,是一个自然、靠谱、会看场合的群聊成员。" diff --git a/plugins/ai_auto_response/response_planner.py b/plugins/ai_auto_response/response_planner.py new file mode 100644 index 0000000..7b78ccc --- /dev/null +++ b/plugins/ai_auto_response/response_planner.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from typing import Dict + + +class ResponsePlanner: + def choose_reply_mode(self, trigger: Dict, flow_state: str) -> str: + if trigger.get("is_question"): + return "qa_with_context" if flow_state in {"engaged", "deep_engaged"} else "qa_fast" + if trigger.get("is_followup"): + return "qa_with_context" + if trigger.get("is_social_call"): + return "social_short" + if trigger.get("is_returning_member"): + return "social_short" + return "social_short" if flow_state in {"warming", "engaged"} else "refuse_or_skip" + + def should_reply(self, trigger: Dict, flow_state: str, allow_proactive: bool) -> bool: + if trigger.get("should_respond"): + return True + if not allow_proactive: + return False + return flow_state in {"warming", "engaged", "deep_engaged"} and trigger.get("priority", 0) >= 0.35 diff --git a/plugins/ai_auto_response/triggers.py b/plugins/ai_auto_response/triggers.py new file mode 100644 index 0000000..5c8f294 --- /dev/null +++ b/plugins/ai_auto_response/triggers.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Dict, List + + +QUESTION_PATTERNS = [ + r"\?$", r"?$", r"怎么", r"如何", r"咋弄", r"为啥", r"为什么", + r"有人知道", r"谁知道", r"能不能", r"可以吗", r"报错", r"怎么解决", +] +SOCIAL_PATTERNS = [r"小牛", r"在吗", r"出来", r"帮忙看", r"看看"] + + +@dataclass +class TriggerResult: + trigger_type: str = "none" + priority: float = 0.0 + is_question: bool = False + is_followup: bool = False + is_social_call: bool = False + is_returning_member: bool = False + should_respond: bool = False + topic: str = "" + reasons: List[str] = field(default_factory=list) + + +class TriggerRouter: + def __init__(self, config: Dict): + self.config = config or {} + self.topic_keywords = [str(item).lower() for item in self.config.get("focus", [])] + + def route(self, message: Dict, memory_hints: Dict) -> TriggerResult: + content = str(message.get("content", "")).strip() + content_lower = content.lower() + result = TriggerResult() + if message.get("is_at"): + result.trigger_type = "at_trigger" + result.priority = float(self.config.get("at_bot", 1.0)) + result.should_respond = True + result.reasons.append("is_at") + if self._is_question(content): + if result.priority < float(self.config.get("explicit_question", 0.95)): + result.trigger_type = "question_trigger" + result.priority = float(self.config.get("explicit_question", 0.95)) + result.is_question = True + result.should_respond = True + result.reasons.append("question") + if memory_hints.get("is_followup"): + if result.priority < float(self.config.get("followup", 0.90)): + result.trigger_type = "followup_trigger" + result.priority = float(self.config.get("followup", 0.90)) + result.is_followup = True + result.should_respond = True + result.reasons.append("followup") + topic = self._detect_topic(content_lower) + if topic: + result.topic = topic + result.reasons.append("topic") + if result.priority < float(self.config.get("casual_topic", 0.35)): + result.trigger_type = result.trigger_type if result.trigger_type != "none" else "topic_trigger" + result.priority = max(result.priority, float(self.config.get("casual_topic", 0.35))) + if self._is_social_call(content_lower): + if result.priority < float(self.config.get("social_call", 0.65)): + result.trigger_type = result.trigger_type if result.trigger_type != "none" else "social_trigger" + result.priority = max(result.priority, float(self.config.get("social_call", 0.65))) + result.is_social_call = True + result.reasons.append("social_call") + if memory_hints.get("returning_member_state") in {"returning_member", "long_absent_member"}: + result.is_returning_member = True + result.reasons.append(memory_hints.get("returning_member_state")) + if result.trigger_type == "none": + result.trigger_type = "returning_member" + result.priority = max(result.priority, float(self.config.get("casual_topic", 0.35))) + return result + + def _is_question(self, content: str) -> bool: + return any(re.search(pattern, content, flags=re.IGNORECASE) for pattern in QUESTION_PATTERNS) + + def _is_social_call(self, content: str) -> bool: + return any(re.search(pattern, content, flags=re.IGNORECASE) for pattern in SOCIAL_PATTERNS) + + def _detect_topic(self, content_lower: str) -> str: + for keyword in self.topic_keywords: + if keyword and keyword in content_lower: + return keyword + return "" diff --git a/plugins/ai_auto_response/vector_memory.py b/plugins/ai_auto_response/vector_memory.py new file mode 100644 index 0000000..997fadf --- /dev/null +++ b/plugins/ai_auto_response/vector_memory.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +import hashlib +from typing import Dict, List + +import requests + + +class VectorMemoryStore: + def __init__(self, config: Dict): + self.config = config or {} + self.enabled = bool(self.config.get("enable_vector_memory")) + self.qdrant_url = str(self.config.get("qdrant_url", "")).rstrip("/") + self.collection = self.config.get("qdrant_collection", "") + self.ollama_base_url = str(self.config.get("ollama_base_url", "")).rstrip("/") + self.embedding_model = self.config.get("embedding_model", "") + self.top_k = int(self.config.get("vector_top_k", 5)) + self.min_score = float(self.config.get("vector_min_score", 0.65)) + self.collection_ready = False + self.last_error = "" + + def should_search(self, reply_mode: str, trigger_type: str, returning_state: str) -> bool: + modes = set(self.config.get("vector_trigger_modes", [])) + return any(item in modes for item in [reply_mode, trigger_type, returning_state] if item) + + def search(self, query: str, room_id: str, wxid: str = "") -> List[Dict]: + self.last_error = "" + if not self.enabled or not self.qdrant_url or not self.collection or not self.embedding_model: + self.last_error = "vector_disabled_or_incomplete" + return [] + embedding = self._embed(query) + if not embedding: + self.last_error = "embed_failed" + return [] + self._ensure_collection(len(embedding)) + must = [{"key": "chatroom_id", "match": {"value": room_id}}] + payload = { + "vector": embedding, + "limit": self.top_k, + "with_payload": True, + "score_threshold": self.min_score, + "filter": {"must": must}, + } + if wxid: + payload["filter"]["should"] = [{"key": "wxid", "match": {"value": wxid}}] + try: + response = requests.post( + f"{self.qdrant_url}/collections/{self.collection}/points/search", + json=payload, + timeout=15, + ) + response.raise_for_status() + items = response.json().get("result", []) or [] + return [item.get("payload", {}) for item in items if item.get("payload")] + except Exception as exc: + self.last_error = f"search_failed:{exc}" + return [] + + def upsert_memory(self, memory_id: str, text: str, payload: Dict) -> bool: + self.last_error = "" + if not self.enabled or not text or not self.qdrant_url or not self.collection or not self.embedding_model: + self.last_error = "vector_disabled_or_incomplete" + return False + embedding = self._embed(text) + if not embedding: + self.last_error = "embed_failed" + return False + if not self._ensure_collection(len(embedding)): + self.last_error = "ensure_collection_failed" + return False + point = { + "points": [ + { + "id": self._stable_id(memory_id), + "vector": embedding, + "payload": payload | {"content_summary": text}, + } + ] + } + try: + response = requests.put( + f"{self.qdrant_url}/collections/{self.collection}/points", + json=point, + timeout=15, + ) + response.raise_for_status() + return True + except Exception as exc: + self.last_error = f"upsert_failed:{exc}" + return False + + def _embed(self, query: str) -> List[float]: + try: + response = requests.post( + f"{self.ollama_base_url}/api/embeddings", + json={"model": self.embedding_model, "prompt": query}, + timeout=20, + ) + response.raise_for_status() + return response.json().get("embedding") or [] + except Exception: + return [] + + def _ensure_collection(self, vector_size: int) -> bool: + if self.collection_ready: + return True + try: + response = requests.get( + f"{self.qdrant_url}/collections/{self.collection}", + timeout=10, + ) + if response.status_code == 200: + self.collection_ready = True + return True + except Exception: + pass + + try: + response = requests.put( + f"{self.qdrant_url}/collections/{self.collection}", + json={ + "vectors": { + "size": vector_size, + "distance": "Cosine", + } + }, + timeout=15, + ) + response.raise_for_status() + self.collection_ready = True + return True + except Exception: + return False + + @staticmethod + def _stable_id(memory_id: str) -> int: + digest = hashlib.md5(memory_id.encode("utf-8")).hexdigest()[:15] + return int(digest, 16)