Files
abot/robot.py
liuwei 712dda5b41 让 Robot 接入 WechatGateway 并同步推进文档
- 为 WechatGateway 增加属性写透传,兼容现有 ipad_bot 属性赋值方式
- 将 Robot 的 wechat 接入实例化入口切换为 Gateway,并默认走 legacy_855 provider
- 在适配路线图中补充当前推进状态,明确已完成项与待迁移运行时职责
2026-05-07 09:52:27 +08:00

1027 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import asyncio
import threading
import time
import tomllib
import traceback
import uuid
from collections import deque
import toml
from loguru import logger
import wechat_ipad
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from base.plugin_common.plugin_manager import PluginManager
from base.plugin_common.plugin_registry import PluginRegistry
from configuration import Config
from db.connection import DBConnectionManager
from db.contacts_db import ContactsDBOperator
from db.group_plugin_config_db import GroupPluginConfigDBOperator
from db.llm_catalog_db import LLMCatalogDBOperator
from db.plugin_schedule_db import PluginScheduleDBOperator
from db.system_job_db import SystemJobDBOperator
from utils.system_jobs import SystemJobLoader
from utils.email_util import EmailSender
from utils.group_plugin_config_service import GroupPluginConfigService
from utils.plugin_schedule_manager import PluginScheduleManager
from utils.revoke.message_auto_revoke import MessageAutoRevoke
from utils.robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
from utils.wechat.contact_manager import ContactManager
from utils.wechat.member_monitor import ChatroomMemberMonitor
from utils.wechat.message_to_db import MessageStorage
from utils.ai.llm_registry import LLMRegistry
from utils.trace_context import set_current_trace_id, reset_current_trace_id
from wechat_ipad import WechatAPIClient, WechatGateway
from wechat_ipad.models.message import WxMessage, MessageType
# 定义全局信号量,限制最大并发 10
sem = asyncio.Semaphore(20)
class Robot:
"""个性化自己的机器人
"""
def __init__(self, config: Config) -> None:
super().__init__()
self.config = config
self.LOG = logger
self.LOG.info(f"=" * 50)
# wechat_ipad 相关属性
self.ipad_bot: WechatAPIClient
self.ipad_config = None
self.ipad_running = False
self.ipad_thread = None
self.ipad_loop = None
self.wxid = None
self.nickname = None
self.alias = None
self.phone = None
self.message_auto_revoke: MessageAutoRevoke = None
self.LOG.debug(f"DB+REDIS 连接池开始初始化")
# 使用单例模式获取实例
self.db_manager = DBConnectionManager.get_instance(
mysql_config=self.config.mariadb,
redis_config=self.config.redis
)
self.LOG.debug(f"数据库连接管理器初始化完成")
# 为了兼容现有代码,保留原有的连接池
self.db_pool = self.db_manager.mysql_pool
self.redis_pool = self.db_manager.redis_pool
self.contacts_db = ContactsDBOperator(self.db_manager)
self.group_plugin_config_db = GroupPluginConfigDBOperator(self.db_manager)
self.llm_catalog_db = LLMCatalogDBOperator(self.db_manager)
self.plugin_schedule_db = PluginScheduleDBOperator(self.db_manager)
self.system_job_db = SystemJobDBOperator(self.db_manager)
self.group_plugin_config_db.init_tables()
# 新版 LLM 目录模型Provider 模板 / Dify 应用 / Scene初始化。
self.llm_catalog_db.init_tables()
self.llm_catalog_db.bootstrap_from_legacy_llm(self.config.llm)
self.group_plugin_config_service = GroupPluginConfigService(
db_operator=self.group_plugin_config_db,
redis_client=self.db_manager.get_redis_connection(),
)
# 初始化联系人管理器
self.contact_manager = ContactManager.get_instance()
self.allContacts = {} # 将在登录后填充
# 提前初始化消息存储:
# 1. DashboardServer 会在主线程里较早启动,并直接读取 robot.message_storage
# 2. 旧逻辑要等 iPad 登录成功后才赋值,导致后台在启动竞态下拿不到这个属性;
# 3. 这里先给一个可用的默认实例,后续登录成功后再注入真实 bot 覆盖即可。
self.message_storage = MessageStorage()
self.groups = {} # 存储按group_id分组的消息列表每个group_id最多保留10条消息
# 初始化插件系统
self.LOG.debug("开始初始化插件系统...")
self.plugin_registry = PluginRegistry()
self.plugin_modules = {} # 存储已加载的插件模块
self.plugins = {} # 存储已加载的插件实例
# 设置插件系统上下文
self.system_context = {
"config": config,
"plugin_registry": self.plugin_registry,
"db_manager": self.db_manager,
"db_pool": self.db_pool,
"redis_pool": self.redis_pool,
"group_plugin_config_service": self.group_plugin_config_service,
}
self.plugin_manager = PluginManager(plugin_dir=getattr(self.config, "plugin_dir", "plugins"))
self.plugin_manager.set_system_context(self.system_context)
self.plugins = self.plugin_manager.load_all_plugins()
# 插件热加载默认关闭:
# 1. 它会持续轮询插件目录并调用 discover_plugins线上运行会产生额外扫盘开销
# 2. 对当前以“稳定运行”为主的场景,这类自动热更新收益远低于成本;
# 3. 若后续确实需要在线调试插件,可通过配置重新打开,并把轮询间隔调大。
hot_reload_cfg = dict(getattr(self.config, "plugin_hot_reload", {}) or {})
if bool(hot_reload_cfg.get("enabled")):
interval_seconds = max(float(hot_reload_cfg.get("interval_seconds", 600) or 600), 60.0)
self.plugin_manager.start_hot_reload_watcher(interval_seconds=interval_seconds)
self.LOG.info(f"插件热加载监听已启用,轮询间隔 {interval_seconds}s")
else:
self.LOG.info("插件热加载监听已禁用,启动阶段不再自动扫盘检查插件变更")
self.system_job_loader = SystemJobLoader(self, self.system_job_db)
self.system_job_loader.init_and_load()
self.plugin_schedule_manager = PluginScheduleManager(self.plugin_manager, self.plugin_schedule_db)
self.plugin_schedule_manager.init_and_load()
# 将历史业务型系统任务迁移到插件调度配置,避免升级后出现“任务丢失”。
migration_result = self.plugin_schedule_manager.migrate_from_system_jobs(self.system_job_db)
if migration_result.get("migrated", 0) > 0:
self.LOG.info(f"系统任务迁移到插件任务完成: {migration_result}")
self.plugin_schedule_manager.reload_from_db()
# 迁移完成后,清理已下沉到插件层的系统任务,避免后台重复维护两套配置。
self._cleanup_migrated_system_jobs()
# 加载插件
self.LOG.debug("插件系统初始化完成")
GroupBotManager.load_local_cache()
# 权限模块加载
self.gbm = GroupBotManager()
self.email_sender = EmailSender(
smtp_server=self.config.email.get("smtp_server", "smtp.163.com"),
smtp_port=self.config.email.get("smtp_port", 465),
sender_email=self.config.email.get("sender_email", "bovine_liu@163.com"),
sender_password=self.config.email.get("sender_password", "LTS9BhmX9XhS36QS")
)
# 通过类属性设置 admin_list而不是实例属性
GroupBotManager.admin_list = self.config.wx_config.get("admin", [])
self.recent_msg_ids = deque(maxlen=20)
def apply_runtime_config(self, reload_catalog: bool = False) -> None:
"""把最新全局配置应用到当前运行中的关键对象。
说明:
1. `self.config.reload()` 只会刷新 Config 实例中的字段,不会自动更新启动时已构造的依赖对象;
2. 这里集中处理“保存配置后希望立刻生效”的轻量刷新动作,避免为大多数改动走整进程重启;
3. 该方法刻意不去重建 DB 连接、微信登录态、插件实例,尽量把影响范围控制在可热刷的配置项。
"""
# 邮件发送器在初始化时会拷贝 SMTP 参数,因此这里需要按最新配置重建一份实例。
self.email_sender = EmailSender(
smtp_server=self.config.email.get("smtp_server", "smtp.163.com"),
smtp_port=self.config.email.get("smtp_port", 465),
sender_email=self.config.email.get("sender_email", "bovine_liu@163.com"),
sender_password=self.config.email.get("sender_password", "LTS9BhmX9XhS36QS")
)
# 管理员列表走 GroupBotManager 的类级缓存;只 reload Config 不会自动回写到这里。
GroupBotManager.admin_list = self.config.wx_config.get("admin", [])
# system_context 中保存的是 config 对象引用reload 后插件读取到的是最新字段。
# 但 LLMRegistry 自己还有一层短 TTL 缓存,因此保存全局 LLM 配置后需要显式清掉。
if reload_catalog:
self.llm_catalog_db.bootstrap_from_legacy_llm(self.config.llm)
LLMRegistry.invalidate_cache()
self.LOG.info(
"运行时配置已应用: "
f"admin_count={len(GroupBotManager.admin_list)}, "
f"email_sender={'ready' if self.email_sender else 'missing'}, "
f"llm_cache_reloaded={reload_catalog}"
)
def _cleanup_migrated_system_jobs(self):
"""清理已经迁移到插件层的历史系统任务键。"""
migrated_keys = [
"news_baidu_report_auto",
"epic_free_games",
"message_ranking_push",
"sehuatang_pdf_push",
"xiuren_download",
"shenshi_r15_download",
"update_image_cache",
# 联系人头像缓存任务在 2026-05-06 调整过命名:
# 1. 旧键 `sync_contact_avatar_cache` 只存在于历史数据库配置;
# 2. 新键统一使用 `contact_avatar_cache_sync`,避免命名风格前后不一致;
# 3. 这里在启动期顺手清理旧键,避免后台任务页长期出现“数据库有记录、运行态无处理器”的幽灵任务。
"sync_contact_avatar_cache",
]
removed = 0
for job_key in migrated_keys:
try:
row = self.system_job_db.get_job(job_key)
if not row:
continue
if self.system_job_db.delete_job(job_key):
removed += 1
except Exception as e:
self.LOG.warning(f"清理迁移系统任务失败: job_key={job_key}, error={e}")
if removed > 0:
self.LOG.info(f"已清理 {removed} 个历史系统任务配置(迁移至插件任务)")
def init_wechat_ipad(self):
"""初始化wechat_ipad客户端"""
try:
# 读取config.toml文件
with open("wechat_ipad/config.toml", "rb") as f:
self.ipad_config = tomllib.load(f)
self.LOG.debug("正在初始化wechat_ipad客户端...")
# 检查必要的配置
server_url = self.ipad_config.get("server_url", "")
if server_url == "":
self.LOG.error("server_url不能为空wechat_ipad初始化失败")
return False
server_ip = self.ipad_config.get("server_ip", "")
server_port = self.ipad_config.get("server_port", 8059)
# 当前阶段先通过 Gateway 承接 provider 选择:
# 1. 默认仍走 legacy_855保持现有现网协议行为
# 2. 这里提前把入口收敛到 Gateway后续接 864 时可不再修改主链路;
# 3. `server_type` 缺失时自动回退 legacy_855兼容现有 config.toml。
# 创建事件循环
self.ipad_loop = asyncio.new_event_loop()
# 在新线程中启动wechat_ipad客户端
self.ipad_thread = threading.Thread(
target=self._run_wechat_ipad_client,
args=(server_ip, server_port),
daemon=True
)
self.ipad_thread.start()
self.LOG.debug("wechat_ipad客户端初始化完成")
return True
except Exception as e:
self.LOG.error(f"初始化wechat_ipad客户端失败: {e}")
return False
def _run_wechat_ipad_client(self, server_ip, server_port):
"""在新线程中运行wechat_ipad客户端"""
asyncio.set_event_loop(self.ipad_loop)
self.ipad_loop.run_until_complete(self._wechat_ipad_core(server_ip, server_port))
async def _wechat_ipad_core(self, server_ip, server_port):
"""wechat_ipad核心逻辑基于bot-core.py"""
try:
self.LOG.debug("启动wechat_ipad bot")
# 调用登录接口:
# 1. 这里不再直接实例化具体客户端实现,而是统一通过 Gateway 选择 provider
# 2. 第一阶段仍默认绑定 legacy_855后续接入 864 时这里只需要读新配置即可;
# 3. 通过 Gateway 的属性透传能力,先尽量保持现有 `self.ipad_bot.xxx` 写法不变。
server_type = str(self.ipad_config.get("server_type", "legacy_855") or "legacy_855").strip()
self.ipad_bot = WechatGateway(server_ip, server_port, server_type=server_type)
self.message_auto_revoke = MessageAutoRevoke(self.ipad_bot)
wxid = self.ipad_config.get("wxid", "")
device_name = self.ipad_config.get("device_name", "")
device_id = self.ipad_config.get("device_id", "")
if device_name == "":
device_name = self.ipad_bot.create_device_name()
if device_id == "":
device_id = self.ipad_bot.create_device_id()
# 登录逻辑
if not await self.ipad_bot.is_logged_in(wxid):
await self._handle_ipad_login(wxid, device_name, device_id)
else: # 已登录
self.ipad_bot.wxid = wxid
profile = await self.ipad_bot.get_profile()
self.ipad_bot.nickname = profile.get("NickName").get("string")
self.ipad_bot.alias = profile.get("Alias")
self.ipad_bot.phone = profile.get("BindMobile").get("string")
self.ipad_bot.signature = profile.get("Signature", "")
# 更新Robot类的属性
self.wxid = self.ipad_bot.wxid
self.nickname = self.ipad_bot.nickname
self.alias = self.ipad_bot.alias
self.phone = self.ipad_bot.phone
self.signature = self.ipad_bot.signature
self.LOG.info(
f"wechat_ipad登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}")
# 注入加载完成的bot
self.plugin_manager.inject_bot(self.ipad_bot)
self.LOG.info(f"wechat_ipad登录设备信息: device_name: {device_name} device_id: {device_id}")
self.LOG.info("wechat_ipad登录成功")
# 登录成功后加载联系人信息
self.allContacts = self.get_all_contacts()
friends = await self.ipad_bot.get_contract_list()
self.head_images = self.get_all_head_images()
self.all_chatroom_members = self.contacts_db.get_chatroom_member_list_name_all()
# self.LOG.debug(f"all_chatroom_members:{self.all_chatroom_members}")
self.contact_manager.set_contacts(self.allContacts, friends, self.head_images, self.all_chatroom_members)
self.message_storage = MessageStorage(self.ipad_bot)
self.member_monitor = ChatroomMemberMonitor(self.ipad_bot)
# # 获取扩展信息,显示相关内容
ext_profile = await self.ipad_bot.get_profile_info_ext()
self.ipad_bot.profile_ext = ext_profile
self.head_image = ext_profile.get("SmallHeadImgUrl")
# 先接受堆积消息
self.LOG.info("处理堆积消息中")
# await self.ipad_bot.send_text_message("filehelper", "ipad客户端启动成功")
count = 0
while True:
data = await self.ipad_bot.sync_message()
data = data.get("AddMsgs")
if not data:
if count > 2:
break
else:
count += 1
continue
self.LOG.debug(f"接受到 {len(data)} 条历史消息,开始仅落库归档")
for raw_message in data:
await self._archive_startup_history_message(raw_message)
await asyncio.sleep(1)
self.LOG.info("处理堆积消息完毕")
# 标记为运行中
self.ipad_running = True
# 开启自动心跳(作为后台任务)
heartbeat_task = asyncio.create_task(self._heartbeat_task())
heartbeat_task_long = asyncio.create_task(self._heartbeat_task_long())
# 开始处理消息
self.LOG.info("开始处理wechat_ipad消息")
while self.ipad_running:
try:
data_temp = await self.ipad_bot.sync_message()
except Exception as e:
self.LOG.error(f"获取新消息失败 {e}")
if "用户可能退出" in str(e):
self.LOG.error(f"用户可能退出: {e}")
self.email_sender.send_wechat_alert(self.config.email.get("alert_recipient"),
f"用户可能退出: {e}", self.wxid,
self.nickname)
await self.login_twice_auto_auth()
await asyncio.sleep(5)
continue
data = data_temp.get("AddMsgs")
if data:
for message in data:
# self.LOG.debug(f"sync_message.处理消息消息内容: {message}")
# 处理消息
try:
wxmsg: WxMessage = WxMessage.from_json(message)
self._attach_trace_id(wxmsg)
# 判断是否已经收到过。处理。存储最近20个msg_id处理之前判断是否在清单里面如果在这不重新处理了。
msg_id = wxmsg.msg_id
if msg_id in self.recent_msg_ids:
self.LOG.info(self._trace_message(wxmsg, f"出现重复ID消息: {msg_id}"))
continue # 已处理,跳过
self.recent_msg_ids.append(msg_id)
self.LOG.debug(
self._trace_message(
wxmsg,
f"收到消息 type={getattr(wxmsg.msg_type, 'name', wxmsg.msg_type)} "
f"sender={wxmsg.sender} room={wxmsg.roomid or '-'}"
)
)
except Exception as e:
self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {message},错误: {e}")
continue # 跳过本条消息,继续处理下一条
# 创建独立任务,不阻塞下一条消息
# 并发执行,限制最大并发数
xx = asyncio.create_task(self._process_with_semaphore(wxmsg))
else:
# 只有当 Ret 不等于 0 或者 不包含 KeyBuf 时才打印
if not (isinstance(data_temp, dict) and data_temp.get("Ret") == 0 and "KeyBuf" in data_temp):
self.LOG.debug(f"MESSAGE{data_temp}")
changed_groups = self.member_monitor.parse_mod_contacts_msg(data_temp)
if changed_groups:
self.LOG.info(f"监测到群成员变动消息,涉及群: {changed_groups}")
for group_id in changed_groups:
if self.gbm.get_group_permission(group_id,
Feature.GROUP_MEMBER_CHANGE) == PermissionStatus.ENABLED:
xx = asyncio.create_task(self.member_monitor.check_and_handle_changes(group_id))
# 使用异步睡眠替代忙等待循环
await asyncio.sleep(2)
except Exception as e:
self.LOG.exception(f"wechat_ipad客户端运行出错: {e}")
self.ipad_running = False
# 在类里直接写一个内联 async 方法(不额外抽取新的对外方法)
async def _archive_startup_history_message(self, raw_message: dict) -> None:
"""启动阶段只归档历史消息,不触发实时业务处理。
目标:
1. 保留历史消息记录,方便后台查询、总结和审计;
2. 不触发插件、副作用指令、自动回复、积分统计等实时逻辑;
3. 与实时拉流阶段共享最近消息去重队列,避免边界消息被重复处理。
"""
try:
wxmsg: WxMessage = WxMessage.from_json(raw_message)
except Exception as e:
self.LOG.error(f"启动阶段历史消息解析失败,消息内容: {raw_message},错误: {e}")
return
try:
self._attach_trace_id(wxmsg)
msg_id = wxmsg.msg_id
if msg_id in self.recent_msg_ids:
self.LOG.debug(self._trace_message(wxmsg, f"历史消息重复,跳过归档: {msg_id}"))
return
# 先放入近期去重队列:
# 1. 启动阶段拉到的最后几条消息,可能和实时阶段收到的第一批消息重叠;
# 2. 这里提前记下 msg_id可以避免后续被当成“新消息”再次触发业务逻辑
# 3. 该队列长度虽然有限,但足够覆盖启动切换期的边界重复问题。
self.recent_msg_ids.append(msg_id)
if not self.message_storage:
self.LOG.warning(self._trace_message(wxmsg, "历史消息归档跳过message_storage 尚未初始化"))
return
# 历史消息只落库,不做实时业务:
# 1. 不调用 process_plugin_message避免历史消息触发插件副作用
# 2. 不调用 process_message避免历史发言被重复计入实时统计
# 3. 不走 _process_ipad_message避免自动加群、成员变更、媒体业务等被整段回放。
self.message_storage.archive_message(wxmsg)
self.LOG.debug(
self._trace_message(
wxmsg,
f"历史消息已归档 type={getattr(wxmsg.msg_type, 'name', wxmsg.msg_type)} "
f"sender={wxmsg.sender} room={wxmsg.roomid or '-'}"
)
)
except Exception as e:
self.LOG.error(self._trace_message(wxmsg, f"历史消息归档失败 msg_id={wxmsg.msg_id}, 错误: {e}"))
async def _process_with_semaphore(self, wxmsg):
async with sem:
# 进入单条消息处理前,把 trace_id 放入当前异步上下文:
# 1. 后续插件中的 AI 调用、消息发送、子协程都可以自动继承这个 trace_id
# 2. 这样不需要给大量现有方法额外加 trace_id 参数,侵入性更小;
# 3. finally 中会回滚 token避免把当前消息的 trace_id 泄漏到下一条消息。
trace_token = set_current_trace_id(self._get_trace_id(wxmsg))
try:
await self._process_ipad_message(wxmsg)
except Exception as e:
self.LOG.error(self._trace_message(wxmsg, f"处理消息失败 msg_id={wxmsg.msg_id}, 错误: {e}"))
finally:
reset_current_trace_id(trace_token)
async def _handle_ipad_login(self, wxid, device_name, device_id):
"""处理wechat_ipad登录"""
while not await self.ipad_bot.is_logged_in(wxid):
# 需要登录
try:
if await self.ipad_bot.get_cached_info(wxid):
# 尝试唤醒登录
uuid = await self.ipad_bot.awaken_login(wxid)
self.LOG.info(f"获取到登录uuid: {uuid}")
else:
# 二维码登录
if not device_name:
device_name = self.ipad_bot.create_device_name()
if not device_id:
device_id = self.ipad_bot.create_device_id()
uuid, url = await self.ipad_bot.get_qr_code(device_id=device_id, device_name=device_name,
print_qr=True)
self.LOG.info(f"获取到登录uuid: {uuid}")
self.LOG.info(f"获取到登录二维码: {url}")
except Exception as e:
self.LOG.error(f"登录过程出错: {e}")
# 二维码登录
if not device_name:
device_name = self.ipad_bot.create_device_name()
if not device_id:
device_id = self.ipad_bot.create_device_id()
uuid, url = await self.ipad_bot.get_qr_code(device_id=device_id, device_name=device_name, print_qr=True)
self.LOG.info(f"获取到登录uuid: {uuid}")
self.LOG.info(f"获取到登录二维码: {url}")
while True:
self.LOG.info(f"uuid: {uuid}, url: {url}")
stat, data = await self.ipad_bot.check_login_uuid(uuid, device_id=device_id)
if stat:
break
self.LOG.info(f"等待登录中,过期倒计时:{data}")
await asyncio.sleep(5)
# 保存登录信息
self.ipad_config["wxid"] = self.ipad_bot.wxid
self.ipad_config["device_name"] = device_name
self.ipad_config["device_id"] = device_id
self.ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
with open("wechat_ipad/config.toml", "w", encoding="utf-8") as f:
toml.dump(self.ipad_config, f)
# 获取登录账号信息
self.ipad_bot.wxid = data.get("acctSectResp").get("userName")
self.ipad_bot.nickname = data.get("acctSectResp").get("nickName")
self.ipad_bot.alias = data.get("acctSectResp").get("alias")
self.ipad_bot.phone = data.get("acctSectResp").get("bindMobile")
self.ipad_bot.signature = data.get("Signature", "")
# 更新Robot类的属性
self.wxid = self.ipad_bot.wxid
self.nickname = self.ipad_bot.nickname
self.alias = self.ipad_bot.alias
self.phone = self.ipad_bot.phone
self.signature = self.ipad_bot.signature
self.LOG.info(
f"wechat_ipad登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}")
break
async def _heartbeat_task(self):
"""wechat_ipad心跳任务"""
self.LOG.info("开启wechat_ipad心跳")
while self.ipad_running:
try:
success = await self.ipad_bot.heartbeat()
if success:
self.LOG.debug("心跳进行中")
else:
self.LOG.warning("心跳失败")
except Exception as e:
self.LOG.error(f"wechat_ipad heartbeat: {e}")
if "用户可能退出" in str(e):
self.LOG.error(f"用户可能退出: {e}")
self.email_sender.send_wechat_alert(self.config.email.get("alert_recipient"), f"用户可能退出: {e}",
self.wxid,
self.nickname)
await self.login_twice_auto_auth()
await asyncio.sleep(60)
async def _heartbeat_task_long(self):
"""wechat_ipad心跳任务"""
self.LOG.info("开启wechat_ipad长连接心跳")
while self.ipad_running:
try:
success = await self.ipad_bot.heartbeat_long()
if success:
self.LOG.debug("长连接心跳进行中")
else:
self.LOG.warning("长连接心跳失败")
except Exception as e:
self.LOG.error(f"wechat_ipad heartbeat long: {e}")
if "用户可能退出" in str(e):
self.LOG.error(f"用户可能退出: {e}")
self.email_sender.send_wechat_alert(self.config.email.get("alert_recipient"), f"用户可能退出: {e}",
self.wxid,
self.nickname)
await self.login_twice_auto_auth()
await asyncio.sleep(120)
async def _process_ipad_message(self, message: WxMessage):
"""处理wechat_ipad消息"""
try:
# self.LOG.debug(f"message: {message}")
# 消息已经是WxMessage对象直接使用其属性和方法
# 判断是否为群消息
is_group = message.from_group()
group_id = message.roomid
# 检测群聊是否已加入机器人管理
if is_group and group_id not in GroupBotManager.local_cache["group_list"]:
self.LOG.info(f"检测到新群聊: {group_id},自动添加到机器人管理列表并启用机器人功能")
# 添加群组到列表
GroupBotManager.local_cache["group_list"].add(group_id)
# 保存到Redis
redis_conn = self.db_manager.get_redis_connection()
redis_conn.sadd("group:list", group_id)
# 设置ROBOT功能为启用状态
GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED)
# 获取群成员信息并更新数据库
try:
chatroom_info = await self.ipad_bot.get_chatroom_info(group_id)
self.LOG.debug(f"获取到群信息: {chatroom_info}")
self.allContacts[group_id] = chatroom_info.get('NickName').get("string", "未知群名")
if chatroom_info:
# 保存群信息到数据库
self.contacts_db.save_chatroom_info(chatroom_info)
members = await self.ipad_bot.get_chatroom_member_list(group_id)
# 保存群成员信息
if members:
# 兼容逻辑已放到save_chatroom_member_simple内部
self.contacts_db.save_chatroom_member_simple(group_id, members)
self.LOG.info(f"member_list: {members}")
# 更新联系人缓存
for member in members:
wxid = member.get("UserName", "")
nick_name = member.get("NickName", "")
displayName = member.get("DisplayName", "")
small_head_img_url = member.get("SmallHeadImgUrl", "")
# 如果displayName不为空使用displayName
if displayName:
nick_name = displayName
if wxid:
self.allContacts[wxid] = nick_name
self.head_images[wxid] = small_head_img_url
friends = await self.ipad_bot.get_contract_list()
self.all_chatroom_members = self.contacts_db.get_chatroom_member_list_name_all()
self.contact_manager.set_contacts(self.allContacts, friends, self.head_images,
self.all_chatroom_members)
self.LOG.info(f"已更新群 {group_id} 的成员信息")
except Exception as e:
self.LOG.error(f"获取群成员信息失败: {e}")
# 尝试使用插件处理消息
await self.process_plugin_message(message)
if is_group:
self.LOG.debug(f"入库和记录群消息: {message}")
# 调用统计逻辑进行聊天数据统计:
try:
if message.sender != self.wxid:
self.message_storage.process_message(message)
except Exception as e:
self.LOG.error(self._trace_message(message, f"process_message error: {e}"))
# # 聊天记录入库动作:
try:
self.message_storage.archive_message(message)
# 单独处理图片消息 后续写定时任务自动完成下载。延时处理。
if message.msg_type == MessageType.IMAGE: # 图片消息类型
self.message_storage.process_image(message)
except Exception as e:
self.LOG.error(self._trace_message(message, f"archive_message error: {e}"))
except Exception as e:
self.LOG.error(self._trace_message(message, f"处理wechat_ipad消息出错: {e}"))
def stop_wechat_ipad(self):
"""停止wechat_ipad客户端"""
self.ipad_running = False
if self.ipad_loop:
self.ipad_loop.stop()
self.LOG.info("wechat_ipad客户端已停止")
def keep_running_and_block_process(self) -> None:
"""
保持机器人运行,不让进程退出
"""
while True:
time.sleep(1)
async def process_plugin_message(self, msg) -> bool:
"""使用插件处理消息"""
# 获取所有消息处理插件
# 关闭00:30-05:00的系统交互降低被风控风险
current_hour = time.localtime().tm_hour
current_minute = time.localtime().tm_min
is_sleep_time = (current_hour == 0 and current_minute >= 30) or (1 <= current_hour < 5)
if is_sleep_time:
# 只处理特定消息,如管理员消息或紧急消息
self.LOG.info(self._trace_message(msg, f"夜间休眠时间(00:30-05:00),忽略消息: {msg}"))
return False
message_plugins = self.plugin_registry.get_plugins_by_type(MessagePluginInterface)
message_plugins = self._sort_message_plugins(message_plugins)
if not message_plugins:
return False
# 依次尝试处理消息
for plugin in message_plugins:
if plugin.status != PluginStatus.RUNNING:
continue
# 这里在进入插件前统一准备统计上下文:
# 1. 事件系统删除后,插件调用统计需要直接在主链路埋点;
# 2. 提前抽出 room_id / sender / command后续无论成功还是异常都能复用
# 3. 这样可以保证观测逻辑收口在一处,避免每个插件自己重复埋点。
room_id = msg.roomid if msg.from_group() else ""
sender = msg.sender
command_name = self._extract_plugin_command(msg)
started_at = time.perf_counter()
try:
# 转换消息为插件可处理的格式
plugin_msg = {
"type": msg.msg_type,
"content": msg.content.clean_content,
"sender": sender,
"roomid": room_id,
"is_at": msg.is_at(self.wxid),
"timestamp": time.time(),
"trace_id": self._get_trace_id(msg),
"all_contacts": self.allContacts,
"full_wx_msg": msg,
"gbm": self.gbm,
"bot": self.ipad_bot,
"revoke": self.message_auto_revoke
}
# 检查插件是否可以处理该消息
if plugin.can_process(plugin_msg):
processed, _ = await plugin.process_message(plugin_msg)
self._record_plugin_call_result(
plugin=plugin,
msg=msg,
command_name=command_name,
# 这里把“无异常执行完成”视为统计意义上的成功:
# 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”;
# 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低;
# 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。
process_result=True,
process_time_ms=self._elapsed_ms(started_at),
)
if processed:
self.LOG.info(
self._trace_message(
msg,
f"插件命中 plugin={plugin.name} command={command_name} "
f"cost_ms={self._elapsed_ms(started_at)}"
)
)
return True
except Exception as e:
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
)
self.LOG.error(self._trace_message(msg, f"插件 {plugin.name} 处理消息失败: {e}"))
return False
def _attach_trace_id(self, msg: WxMessage) -> str:
"""为消息对象附加稳定 trace_id便于后续全链路关联。"""
trace_id = self._get_trace_id(msg)
if trace_id:
return trace_id
msg_id = str(getattr(msg, "msg_id", "") or "0")
create_time = str(getattr(msg, "create_time", "") or "0")
sender_tail = str(getattr(msg, "sender", "") or "")[-6:] or "unknown"
random_tail = uuid.uuid4().hex[:6]
trace_id = f"wx-{msg_id}-{create_time}-{sender_tail}-{random_tail}"
setattr(msg, "trace_id", trace_id)
return trace_id
@staticmethod
def _get_trace_id(msg: WxMessage) -> str:
"""读取消息对象上的 trace_id若不存在则返回空字符串。"""
return str(getattr(msg, "trace_id", "") or "").strip()
def _trace_message(self, msg: WxMessage, message: str) -> str:
"""为日志消息统一追加 trace_id 前缀。"""
trace_id = self._get_trace_id(msg)
if not trace_id:
return message
return f"[trace_id={trace_id}] {message}"
@staticmethod
def _elapsed_ms(started_at: float) -> float:
"""把 monotonic 起始时间转换为毫秒耗时。"""
return round((time.perf_counter() - started_at) * 1000, 2)
@staticmethod
def _extract_plugin_command(msg: WxMessage) -> str:
"""尽力从消息内容中提取一个可读的“触发命令”。"""
# 这里不追求把所有命令解析得非常精确,只要能满足后台统计可读性即可:
# 1. 文本消息优先取第一段词,避免把整句长文本都记成 command
# 2. 非文本消息统一落到消息类型名,便于区分“文本触发”和“链接触发”等场景;
# 3. 空内容时返回通用占位,避免统计表出现 NULL / 空字符串。
raw_content = str(getattr(getattr(msg, "content", None), "clean_content", "") or "").strip()
if raw_content:
first_token = raw_content.split()[0].strip()
return first_token[:50] if first_token else "[文本消息]"
msg_type = getattr(getattr(msg, "msg_type", None), "name", "")
return f"[{msg_type or 'UNKNOWN'}]"
def _get_stats_collector_plugin(self):
"""获取运行中的统计收集插件实例。"""
# 统计插件已经从“事件订阅”切到“主链路直接回调”,
# 因此每次埋点前都需要安全地确认插件实例是否存在且处于运行态。
plugin = self.plugin_manager.plugins.get("指令记录")
if not plugin:
return None
if getattr(plugin, "status", None) != PluginStatus.RUNNING:
return None
return plugin
def _record_plugin_call_result(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
process_result: bool,
process_time_ms: float,
) -> None:
"""将插件执行结果直接写入统计插件。"""
stats_plugin = self._get_stats_collector_plugin()
if not stats_plugin or not hasattr(stats_plugin, "record_plugin_call"):
return
try:
stats_plugin.record_plugin_call(
plugin_name=plugin.name,
command=command_name,
user_id=msg.sender,
group_id=msg.roomid if msg.from_group() else None,
is_group=msg.from_group(),
process_result=process_result,
process_time_ms=process_time_ms,
trace_id=self._get_trace_id(msg),
)
except Exception as stats_error:
self.LOG.error(self._trace_message(msg, f"记录插件调用统计失败: plugin={plugin.name}, error={stats_error}"))
def _record_plugin_call_error(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
error: Exception,
) -> None:
"""将插件执行异常直接写入统计插件。"""
stats_plugin = self._get_stats_collector_plugin()
if not stats_plugin or not hasattr(stats_plugin, "record_plugin_error"):
return
try:
stats_plugin.record_plugin_error(
plugin_name=plugin.name,
command=command_name,
user_id=msg.sender,
group_id=msg.roomid if msg.from_group() else None,
is_group=msg.from_group(),
error_message=str(error),
trace_id=self._get_trace_id(msg),
# 这里保留完整堆栈,便于后台直接查看异常上下文,而不必只看摘要日志。
stack_trace=traceback.format_exc(),
)
except Exception as stats_error:
self.LOG.error(self._trace_message(msg, f"记录插件异常统计失败: plugin={plugin.name}, error={stats_error}"))
@staticmethod
def _sort_message_plugins(message_plugins):
"""将兜底型插件放到最后执行,避免影响其他插件命中。"""
if not message_plugins:
return []
def is_fallback_plugin(plugin):
feature_key = str(getattr(plugin, "feature_key", "") or "").strip().upper()
module_name = str(getattr(plugin.__class__, "__module__", "") or "").lower()
plugin_name = str(getattr(plugin, "name", "") or "").strip().lower()
return (
feature_key == "AI_AUTO_RESPONSE"
or "plugins.ai_auto_response" in module_name
or plugin_name in {"小牛群聊bot", "ai_auto_response"}
)
normal_plugins = [plugin for plugin in message_plugins if not is_fallback_plugin(plugin)]
fallback_plugins = [plugin for plugin in message_plugins if is_fallback_plugin(plugin)]
return normal_plugins + fallback_plugins
def get_all_contacts(self) -> dict:
"""获取所有联系人信息并返回字典格式 {wxid: nickname}"""
try:
# 从数据库获取联系人信息
contacts = self.contacts_db.get_all_contacts()
return contacts
except Exception as e:
self.LOG.error(f"获取联系人信息失败: {e}")
return {}
def get_all_head_images(self) -> dict:
"""获取所有的联系人头像信息"""
try:
# 从数据库获取所有联系人的头像信息
head_images = self.contacts_db.get_all_contacts_avatar()
return head_images
except Exception as e:
self.LOG.error(f"获取所有联系人头像信息失败: {e}")
return {}
async def refresh_contacts_db(self):
"""刷新联系人信息"""
self.LOG.info("开始刷新联系人信息")
contacts = await self.ipad_bot.get_contract_list()
self.LOG.debug(f"获取到的联系人:{contacts}")
batch_size = 20
discovered_groups = set()
for i in range(0, len(contacts), batch_size):
batch_contacts = contacts[i:i + batch_size]
contact_info = await self.ipad_bot.get_contract_detail(batch_contacts)
self.LOG.debug(f"获取到的联系人详细信息数量:{len(contact_info)}")
friend_contacts = []
official_contacts = []
for contact in contact_info:
user_name = contact.get("UserName")
if isinstance(user_name, dict):
user_name = user_name.get("string", "")
user_name = user_name or ""
if not user_name:
continue
if user_name.endswith("@chatroom"):
discovered_groups.add(user_name)
# 群资料这里不能只在“首次发现”时写入:
# 1. 群头像、小群名、公告等字段都可能在微信侧发生变化;
# 2. 如果只插入不更新,后续头像缓存拿到的仍然会是旧 URL
# 3. 因此每次刷新通讯录都做一次 upsert确保群资料是最新的。
self.contacts_db.save_chatroom_info(contact)
continue
if user_name.startswith("gh_"):
official_contacts.append(contact)
else:
friend_contacts.append(contact)
# 联系人详情这里必须允许覆盖更新:
# 1. get_contract_detail 已经重新向远端拿到了最新昵称、签名、头像 URL
# 2. 如果 still only_insert=True库里旧联系人将永远保留历史头像地址
# 3. 改成 upsert 后,后续头像缓存同步才能真正拿到最新 URL 并下载新头像。
if friend_contacts:
self.contacts_db.save_contacts(friend_contacts, "friends", only_insert=False)
if official_contacts:
self.contacts_db.save_contacts(official_contacts, "ghs", only_insert=False)
groups = self.contacts_db.get_chatroom_list()
for group in groups:
group_id = group["chatroom_id"]
discovered_groups.add(group_id)
chatroom_info = await self.ipad_bot.get_chatroom_info(group_id)
self.LOG.debug(f"获取到的群成员信息:{chatroom_info}")
if chatroom_info.get("UserName", ""):
members = await self.ipad_bot.get_chatroom_member_list(group_id)
if members:
active_member_wxids = []
for member in members:
wxid = member.get("UserName", "")
if isinstance(wxid, dict):
wxid = wxid.get("string", "")
if wxid:
active_member_wxids.append(wxid)
self.contacts_db.mark_chatroom_members_active(group_id, active_member_wxids)
self.contacts_db.mark_chatroom_members_left(group_id, active_member_wxids)
# 群成员头像 URL 同样需要覆盖更新:
# 1. 群成员换头像后,成员表里的 small_head_img_url 会变;
# 2. 若只做 INSERT IGNORE则历史记录不会被刷新
# 3. 这里改成 upsert保证后台通讯录与头像缓存都能感知到最新头像地址。
self.contacts_db.save_chatroom_member_simple(group_id, members, only_insert=False)
self.LOG.info(f"已增量同步群 {group_id} 的成员信息")
else:
self.contacts_db.mark_chatroom_members_left(group_id, [])
self.LOG.warning(f"{group_id} 当前未获取到成员列表,已将历史成员标记为已退群")
else:
self.LOG.warning(f"获取群 {group_id} 信息失败,保留群资料并将成员标记为已退群。")
self.contacts_db.mark_chatroom_members_left(group_id, [])
for group_id in discovered_groups:
if not self.contacts_db.get_chatroom_info(group_id):
chatroom_info = await self.ipad_bot.get_chatroom_info(group_id)
if chatroom_info.get("UserName", ""):
self.contacts_db.save_chatroom_info(chatroom_info)
members = await self.ipad_bot.get_chatroom_member_list(group_id)
if members:
self.contacts_db.save_chatroom_member_simple(group_id, members, only_insert=False)
friends = await self.ipad_bot.get_contract_list()
self.allContacts = self.get_all_contacts()
self.head_images = self.get_all_head_images()
self.all_chatroom_members = self.contacts_db.get_chatroom_member_list_name_all()
self.contact_manager.set_contacts(self.allContacts, friends, self.head_images,
self.all_chatroom_members)
self.LOG.info("联系人信息刷新完成")
async def login_twice_auto_auth(self) -> None:
try:
self.LOG.info(f"定时进行二次登录动作")
resp = await self.ipad_bot.twice_auto_auth()
if resp:
self.LOG.info(f"定时二次登录成功!")
if self.ipad_running:
self.LOG.info(f"ipad_wechat running:{self.ipad_running}")
else:
self.ipad_running = True
self.LOG.info(f"ipad_wechat stopped change running:{self.ipad_running}")
else:
self.LOG.error(f"定时二次登录失败!")
self.ipad_running = False
except Exception as e:
self.LOG.error(f"login_twice_auto_auth error: {e}")
# ============================================== 系统级任务(刚需)==========================================================
async def message_count_to_db(self):
try:
self.message_storage.write_to_db()
except Exception as e:
self.LOG.error(f"write_to_db error{e}")