收口855 provider运行时并同步适配路线图
This commit is contained in:
367
robot.py
367
robot.py
@@ -6,8 +6,6 @@ import tomllib
|
||||
import traceback
|
||||
import uuid
|
||||
from collections import deque
|
||||
|
||||
import toml
|
||||
from loguru import logger
|
||||
|
||||
import wechat_ipad
|
||||
@@ -271,145 +269,119 @@ class Robot:
|
||||
server_type = str(self.ipad_config.get("server_type", "legacy_855") or "legacy_855").strip()
|
||||
self.ipad_bot = WechatGateway(server_ip, server_port, server_type=server_type)
|
||||
self.message_auto_revoke = MessageAutoRevoke(self.ipad_bot)
|
||||
wxid = self.ipad_config.get("wxid", "")
|
||||
device_name = self.ipad_config.get("device_name", "")
|
||||
device_id = self.ipad_config.get("device_id", "")
|
||||
|
||||
if device_name == "":
|
||||
device_name = self.ipad_bot.create_device_name()
|
||||
if device_id == "":
|
||||
device_id = self.ipad_bot.create_device_id()
|
||||
|
||||
# 登录逻辑
|
||||
if not await self.ipad_bot.is_logged_in(wxid):
|
||||
await self._handle_ipad_login(wxid, device_name, device_id)
|
||||
else: # 已登录
|
||||
self.ipad_bot.wxid = wxid
|
||||
profile = await self.ipad_bot.get_profile()
|
||||
self.ipad_bot.nickname = profile.get("NickName").get("string")
|
||||
self.ipad_bot.alias = profile.get("Alias")
|
||||
self.ipad_bot.phone = profile.get("BindMobile").get("string")
|
||||
self.ipad_bot.signature = profile.get("Signature", "")
|
||||
# 更新Robot类的属性
|
||||
self.wxid = self.ipad_bot.wxid
|
||||
self.nickname = self.ipad_bot.nickname
|
||||
self.alias = self.ipad_bot.alias
|
||||
self.phone = self.ipad_bot.phone
|
||||
self.signature = self.ipad_bot.signature
|
||||
|
||||
self.LOG.info(
|
||||
f"wechat_ipad登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}")
|
||||
|
||||
# 注入加载完成的bot
|
||||
self.plugin_manager.inject_bot(self.ipad_bot)
|
||||
self.LOG.info(f"wechat_ipad登录设备信息: device_name: {device_name} device_id: {device_id}")
|
||||
self.LOG.info("wechat_ipad登录成功")
|
||||
|
||||
# 登录成功后加载联系人信息
|
||||
self.allContacts = self.get_all_contacts()
|
||||
friends = await self.ipad_bot.get_contract_list()
|
||||
self.head_images = self.get_all_head_images()
|
||||
self.all_chatroom_members = self.contacts_db.get_chatroom_member_list_name_all()
|
||||
# self.LOG.debug(f"all_chatroom_members:{self.all_chatroom_members}")
|
||||
self.contact_manager.set_contacts(self.allContacts, friends, self.head_images, self.all_chatroom_members)
|
||||
|
||||
self.message_storage = MessageStorage(self.ipad_bot)
|
||||
self.member_monitor = ChatroomMemberMonitor(self.ipad_bot)
|
||||
# # 获取扩展信息,显示相关内容
|
||||
ext_profile = await self.ipad_bot.get_profile_info_ext()
|
||||
self.ipad_bot.profile_ext = ext_profile
|
||||
self.head_image = ext_profile.get("SmallHeadImgUrl")
|
||||
|
||||
# 先接受堆积消息
|
||||
self.LOG.info("处理堆积消息中")
|
||||
|
||||
# await self.ipad_bot.send_text_message("filehelper", "ipad客户端启动成功")
|
||||
count = 0
|
||||
while True:
|
||||
data = await self.ipad_bot.sync_message()
|
||||
data = data.get("AddMsgs")
|
||||
if not data:
|
||||
if count > 2:
|
||||
break
|
||||
else:
|
||||
count += 1
|
||||
continue
|
||||
|
||||
self.LOG.debug(f"接受到 {len(data)} 条历史消息,开始仅落库归档")
|
||||
for raw_message in data:
|
||||
await self._archive_startup_history_message(raw_message)
|
||||
await asyncio.sleep(1)
|
||||
self.LOG.info("处理堆积消息完毕")
|
||||
|
||||
# 标记为运行中
|
||||
self.ipad_running = True
|
||||
# 开启自动心跳(作为后台任务)
|
||||
heartbeat_task = asyncio.create_task(self._heartbeat_task())
|
||||
heartbeat_task_long = asyncio.create_task(self._heartbeat_task_long())
|
||||
# 开始处理消息
|
||||
self.LOG.info("开始处理wechat_ipad消息")
|
||||
while self.ipad_running:
|
||||
try:
|
||||
data_temp = await self.ipad_bot.sync_message()
|
||||
except Exception as e:
|
||||
self.LOG.error(f"获取新消息失败 {e}")
|
||||
if "用户可能退出" in str(e):
|
||||
self.LOG.error(f"用户可能退出: {e}")
|
||||
self.email_sender.send_wechat_alert(self.config.email.get("alert_recipient"),
|
||||
f"用户可能退出: {e}", self.wxid,
|
||||
self.nickname)
|
||||
await self.login_twice_auto_auth()
|
||||
await asyncio.sleep(5)
|
||||
continue
|
||||
|
||||
data = data_temp.get("AddMsgs")
|
||||
if data:
|
||||
for message in data:
|
||||
# self.LOG.debug(f"sync_message.处理消息消息内容: {message}")
|
||||
# 处理消息
|
||||
try:
|
||||
wxmsg: WxMessage = WxMessage.from_json(message)
|
||||
self._attach_trace_id(wxmsg)
|
||||
# 判断是否已经收到过。处理。存储最近20个msg_id,处理之前判断是否在清单里面,如果在,这不重新处理了。
|
||||
msg_id = wxmsg.msg_id
|
||||
if msg_id in self.recent_msg_ids:
|
||||
self.LOG.info(self._trace_message(wxmsg, f"出现重复ID消息: {msg_id}"))
|
||||
continue # 已处理,跳过
|
||||
self.recent_msg_ids.append(msg_id)
|
||||
self.LOG.debug(
|
||||
self._trace_message(
|
||||
wxmsg,
|
||||
f"收到消息 type={getattr(wxmsg.msg_type, 'name', wxmsg.msg_type)} "
|
||||
f"sender={wxmsg.sender} room={wxmsg.roomid or '-'}"
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {message},错误: {e}")
|
||||
continue # 跳过本条消息,继续处理下一条
|
||||
# 创建独立任务,不阻塞下一条消息
|
||||
# 并发执行,限制最大并发数
|
||||
xx = asyncio.create_task(self._process_with_semaphore(wxmsg))
|
||||
else:
|
||||
# 只有当 Ret 不等于 0 或者 不包含 KeyBuf 时才打印
|
||||
if not (isinstance(data_temp, dict) and data_temp.get("Ret") == 0 and "KeyBuf" in data_temp):
|
||||
self.LOG.debug(f"MESSAGE:{data_temp}")
|
||||
|
||||
changed_groups = self.member_monitor.parse_mod_contacts_msg(data_temp)
|
||||
if changed_groups:
|
||||
self.LOG.info(f"监测到群成员变动消息,涉及群: {changed_groups}")
|
||||
for group_id in changed_groups:
|
||||
if self.gbm.get_group_permission(group_id,
|
||||
Feature.GROUP_MEMBER_CHANGE) == PermissionStatus.ENABLED:
|
||||
xx = asyncio.create_task(self.member_monitor.check_and_handle_changes(group_id))
|
||||
|
||||
# 使用异步睡眠替代忙等待循环
|
||||
await asyncio.sleep(2)
|
||||
# 855 provider 现在自行承接运行时模型:
|
||||
# 1. provider 内部负责登录、历史消息拉取、心跳、长心跳、掉线恢复与实时轮询;
|
||||
# 2. Robot 只注册业务回调,继续处理联系人初始化、消息归档、插件调度等项目内逻辑;
|
||||
# 3. 这样未来切到 864 时,主链路只需要替换 provider,而不是继续改这里的大循环。
|
||||
await self.ipad_bot.run_runtime(
|
||||
ipad_config=self.ipad_config,
|
||||
config_path="wechat_ipad/config.toml",
|
||||
logger=self.LOG,
|
||||
on_login_ready=self._on_ipad_login_ready,
|
||||
on_history_message=self._archive_startup_history_message,
|
||||
on_message=self._handle_runtime_message,
|
||||
on_idle_payload=self._handle_runtime_idle_payload,
|
||||
on_logout=self._handle_ipad_logout,
|
||||
on_runtime_state_change=self._handle_runtime_state_change,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.LOG.exception(f"wechat_ipad客户端运行出错: {e}")
|
||||
self.ipad_running = False
|
||||
|
||||
# 在类里直接写一个内联 async 方法(不额外抽取新的对外方法)
|
||||
async def _on_ipad_login_ready(self, login_identity: dict) -> None:
|
||||
"""处理 provider 登录成功后的项目侧初始化动作。
|
||||
|
||||
这里保留在 Robot 的原因很明确:
|
||||
1. 联系人缓存、插件注入、消息归档器、成员监控器都属于项目业务层能力;
|
||||
2. provider 不应该知道本项目有哪些数据库表、后台缓存或插件系统;
|
||||
3. 因此登录“流程”放到 provider,登录后的“业务初始化”继续留在 Robot。
|
||||
"""
|
||||
self.wxid = login_identity.get("wxid", "")
|
||||
self.nickname = login_identity.get("nickname", "")
|
||||
self.alias = login_identity.get("alias", "")
|
||||
self.phone = login_identity.get("phone", "")
|
||||
self.signature = login_identity.get("signature", "")
|
||||
|
||||
# 这里同时把 Robot 侧的身份信息镜像回 bot,保证旧代码仍可从 `self.ipad_bot.xxx` 读取。
|
||||
self.ipad_bot.wxid = self.wxid
|
||||
self.ipad_bot.nickname = self.nickname
|
||||
self.ipad_bot.alias = self.alias
|
||||
self.ipad_bot.phone = self.phone
|
||||
self.ipad_bot.signature = self.signature
|
||||
self.LOG.info(
|
||||
f"wechat_ipad登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}"
|
||||
)
|
||||
|
||||
self.plugin_manager.inject_bot(self.ipad_bot)
|
||||
self.allContacts = self.get_all_contacts()
|
||||
friends = await self.ipad_bot.get_contract_list()
|
||||
self.head_images = self.get_all_head_images()
|
||||
self.all_chatroom_members = self.contacts_db.get_chatroom_member_list_name_all()
|
||||
self.contact_manager.set_contacts(self.allContacts, friends, self.head_images, self.all_chatroom_members)
|
||||
|
||||
self.message_storage = MessageStorage(self.ipad_bot)
|
||||
self.member_monitor = ChatroomMemberMonitor(self.ipad_bot)
|
||||
ext_profile = await self.ipad_bot.get_profile_info_ext()
|
||||
self.ipad_bot.profile_ext = ext_profile
|
||||
self.head_image = ext_profile.get("SmallHeadImgUrl")
|
||||
|
||||
async def _handle_runtime_message(self, raw_message: dict) -> None:
|
||||
"""处理 provider 交付的单条实时原始消息。"""
|
||||
try:
|
||||
wxmsg: WxMessage = WxMessage.from_json(raw_message)
|
||||
self._attach_trace_id(wxmsg)
|
||||
msg_id = wxmsg.msg_id
|
||||
if msg_id in self.recent_msg_ids:
|
||||
self.LOG.info(self._trace_message(wxmsg, f"出现重复ID消息: {msg_id}"))
|
||||
return
|
||||
self.recent_msg_ids.append(msg_id)
|
||||
self.LOG.debug(
|
||||
self._trace_message(
|
||||
wxmsg,
|
||||
f"收到消息 type={getattr(wxmsg.msg_type, 'name', wxmsg.msg_type)} "
|
||||
f"sender={wxmsg.sender} room={wxmsg.roomid or '-'}"
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
self.LOG.error(f"WxMessage.from_json 解析失败,消息内容: {raw_message},错误: {e}")
|
||||
return
|
||||
|
||||
# 这里继续沿用“单条消息单独起任务 + 信号量限流”的项目策略:
|
||||
# 1. 保持与现网处理吞吐一致;
|
||||
# 2. 避免 provider 轮询被某条耗时消息阻塞;
|
||||
# 3. 也不把并发控制职责再塞回 provider,边界更清楚。
|
||||
asyncio.create_task(self._process_with_semaphore(wxmsg))
|
||||
|
||||
async def _handle_runtime_idle_payload(self, data_temp: dict) -> None:
|
||||
"""处理 855 空轮询之外的补充同步负载,例如群成员变更通知。"""
|
||||
if isinstance(data_temp, dict) and data_temp.get("Ret") == 0 and "KeyBuf" in data_temp:
|
||||
return
|
||||
|
||||
self.LOG.debug(f"MESSAGE:{data_temp}")
|
||||
changed_groups = self.member_monitor.parse_mod_contacts_msg(data_temp)
|
||||
if changed_groups:
|
||||
self.LOG.info(f"监测到群成员变动消息,涉及群: {changed_groups}")
|
||||
for group_id in changed_groups:
|
||||
if self.gbm.get_group_permission(
|
||||
group_id,
|
||||
Feature.GROUP_MEMBER_CHANGE,
|
||||
) == PermissionStatus.ENABLED:
|
||||
asyncio.create_task(self.member_monitor.check_and_handle_changes(group_id))
|
||||
|
||||
async def _handle_ipad_logout(self, reason: str) -> None:
|
||||
"""处理 provider 识别到的掉线事件,仅负责业务侧告警。"""
|
||||
self.LOG.error(f"用户可能退出: {reason}")
|
||||
self.email_sender.send_wechat_alert(
|
||||
self.config.email.get("alert_recipient"),
|
||||
f"用户可能退出: {reason}",
|
||||
self.wxid,
|
||||
self.nickname,
|
||||
)
|
||||
|
||||
async def _handle_runtime_state_change(self, running: bool) -> None:
|
||||
"""镜像 provider 运行态到 Robot,供后台与运维逻辑读取。"""
|
||||
self.ipad_running = running
|
||||
|
||||
async def _archive_startup_history_message(self, raw_message: dict) -> None:
|
||||
"""启动阶段只归档历史消息,不触发实时业务处理。
|
||||
@@ -471,109 +443,6 @@ class Robot:
|
||||
finally:
|
||||
reset_current_trace_id(trace_token)
|
||||
|
||||
async def _handle_ipad_login(self, wxid, device_name, device_id):
|
||||
"""处理wechat_ipad登录"""
|
||||
while not await self.ipad_bot.is_logged_in(wxid):
|
||||
# 需要登录
|
||||
try:
|
||||
if await self.ipad_bot.get_cached_info(wxid):
|
||||
# 尝试唤醒登录
|
||||
uuid = await self.ipad_bot.awaken_login(wxid)
|
||||
self.LOG.info(f"获取到登录uuid: {uuid}")
|
||||
else:
|
||||
# 二维码登录
|
||||
if not device_name:
|
||||
device_name = self.ipad_bot.create_device_name()
|
||||
if not device_id:
|
||||
device_id = self.ipad_bot.create_device_id()
|
||||
uuid, url = await self.ipad_bot.get_qr_code(device_id=device_id, device_name=device_name,
|
||||
print_qr=True)
|
||||
self.LOG.info(f"获取到登录uuid: {uuid}")
|
||||
self.LOG.info(f"获取到登录二维码: {url}")
|
||||
except Exception as e:
|
||||
self.LOG.error(f"登录过程出错: {e}")
|
||||
# 二维码登录
|
||||
if not device_name:
|
||||
device_name = self.ipad_bot.create_device_name()
|
||||
if not device_id:
|
||||
device_id = self.ipad_bot.create_device_id()
|
||||
uuid, url = await self.ipad_bot.get_qr_code(device_id=device_id, device_name=device_name, print_qr=True)
|
||||
self.LOG.info(f"获取到登录uuid: {uuid}")
|
||||
self.LOG.info(f"获取到登录二维码: {url}")
|
||||
|
||||
while True:
|
||||
self.LOG.info(f"uuid: {uuid}, url: {url}")
|
||||
stat, data = await self.ipad_bot.check_login_uuid(uuid, device_id=device_id)
|
||||
if stat:
|
||||
break
|
||||
self.LOG.info(f"等待登录中,过期倒计时:{data}")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
# 保存登录信息
|
||||
self.ipad_config["wxid"] = self.ipad_bot.wxid
|
||||
self.ipad_config["device_name"] = device_name
|
||||
self.ipad_config["device_id"] = device_id
|
||||
self.ipad_config["login_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
with open("wechat_ipad/config.toml", "w", encoding="utf-8") as f:
|
||||
toml.dump(self.ipad_config, f)
|
||||
|
||||
# 获取登录账号信息
|
||||
self.ipad_bot.wxid = data.get("acctSectResp").get("userName")
|
||||
self.ipad_bot.nickname = data.get("acctSectResp").get("nickName")
|
||||
self.ipad_bot.alias = data.get("acctSectResp").get("alias")
|
||||
self.ipad_bot.phone = data.get("acctSectResp").get("bindMobile")
|
||||
self.ipad_bot.signature = data.get("Signature", "")
|
||||
|
||||
# 更新Robot类的属性
|
||||
self.wxid = self.ipad_bot.wxid
|
||||
self.nickname = self.ipad_bot.nickname
|
||||
self.alias = self.ipad_bot.alias
|
||||
self.phone = self.ipad_bot.phone
|
||||
self.signature = self.ipad_bot.signature
|
||||
self.LOG.info(
|
||||
f"wechat_ipad登录账号信息: wxid: {self.wxid} 昵称: {self.nickname} 微信号: {self.alias} 手机号: {self.phone}")
|
||||
break
|
||||
|
||||
async def _heartbeat_task(self):
|
||||
"""wechat_ipad心跳任务"""
|
||||
self.LOG.info("开启wechat_ipad心跳!")
|
||||
while self.ipad_running:
|
||||
try:
|
||||
success = await self.ipad_bot.heartbeat()
|
||||
if success:
|
||||
self.LOG.debug("心跳进行中")
|
||||
else:
|
||||
self.LOG.warning("心跳失败")
|
||||
except Exception as e:
|
||||
self.LOG.error(f"wechat_ipad heartbeat: {e}")
|
||||
if "用户可能退出" in str(e):
|
||||
self.LOG.error(f"用户可能退出: {e}")
|
||||
self.email_sender.send_wechat_alert(self.config.email.get("alert_recipient"), f"用户可能退出: {e}",
|
||||
self.wxid,
|
||||
self.nickname)
|
||||
await self.login_twice_auto_auth()
|
||||
await asyncio.sleep(60)
|
||||
|
||||
async def _heartbeat_task_long(self):
|
||||
"""wechat_ipad心跳任务"""
|
||||
self.LOG.info("开启wechat_ipad长连接心跳!")
|
||||
while self.ipad_running:
|
||||
try:
|
||||
success = await self.ipad_bot.heartbeat_long()
|
||||
if success:
|
||||
self.LOG.debug("长连接心跳进行中")
|
||||
else:
|
||||
self.LOG.warning("长连接心跳失败")
|
||||
except Exception as e:
|
||||
self.LOG.error(f"wechat_ipad heartbeat long: {e}")
|
||||
if "用户可能退出" in str(e):
|
||||
self.LOG.error(f"用户可能退出: {e}")
|
||||
self.email_sender.send_wechat_alert(self.config.email.get("alert_recipient"), f"用户可能退出: {e}",
|
||||
self.wxid,
|
||||
self.nickname)
|
||||
await self.login_twice_auto_auth()
|
||||
await asyncio.sleep(120)
|
||||
|
||||
async def _process_ipad_message(self, message: WxMessage):
|
||||
"""处理wechat_ipad消息"""
|
||||
try:
|
||||
@@ -658,6 +527,8 @@ class Robot:
|
||||
def stop_wechat_ipad(self):
|
||||
"""停止wechat_ipad客户端"""
|
||||
self.ipad_running = False
|
||||
if hasattr(self, "ipad_bot") and self.ipad_bot and hasattr(self.ipad_bot, "stop_runtime"):
|
||||
self.ipad_bot.stop_runtime()
|
||||
if self.ipad_loop:
|
||||
self.ipad_loop.stop()
|
||||
self.LOG.info("wechat_ipad客户端已停止")
|
||||
@@ -999,24 +870,6 @@ class Robot:
|
||||
self.all_chatroom_members)
|
||||
self.LOG.info("联系人信息刷新完成")
|
||||
|
||||
async def login_twice_auto_auth(self) -> None:
|
||||
try:
|
||||
self.LOG.info(f"定时进行二次登录动作")
|
||||
resp = await self.ipad_bot.twice_auto_auth()
|
||||
if resp:
|
||||
self.LOG.info(f"定时二次登录成功!")
|
||||
if self.ipad_running:
|
||||
self.LOG.info(f"ipad_wechat running:{self.ipad_running}")
|
||||
else:
|
||||
self.ipad_running = True
|
||||
self.LOG.info(f"ipad_wechat stopped change running:{self.ipad_running}")
|
||||
else:
|
||||
self.LOG.error(f"定时二次登录失败!")
|
||||
self.ipad_running = False
|
||||
|
||||
except Exception as e:
|
||||
self.LOG.error(f"login_twice_auto_auth error: {e}")
|
||||
|
||||
# ============================================== 系统级任务(刚需)==========================================================
|
||||
|
||||
async def message_count_to_db(self):
|
||||
|
||||
Reference in New Issue
Block a user