From 994f452b994919f28b7a3808be355f6998886f1d Mon Sep 17 00:00:00 2001 From: Liu Date: Fri, 1 May 2026 12:45:28 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"=E4=BC=98=E5=8C=96=E5=BE=AE=E4=BF=A1?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E8=B6=85=E6=97=B6=E5=85=9C=E5=BA=95=E5=B9=B6?= =?UTF-8?q?=E4=B8=8B=E6=B2=89=E5=A4=B4=E5=83=8F=E7=BC=93=E5=AD=98=E9=A2=84?= =?UTF-8?q?=E7=83=AD"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit c3830d905e4ca21a8d64def44bcfec1deab6fee2. --- robot.py | 90 ++++--------------------- utils/system_jobs.py | 8 --- utils/wechat/contact_manager.py | 115 ++++++++++++++++++-------------- wechat_ipad/client/message.py | 33 +-------- wechat_ipad/errors.py | 5 -- 5 files changed, 80 insertions(+), 171 deletions(-) diff --git a/robot.py b/robot.py index 8291f77..87e5d3a 100644 --- a/robot.py +++ b/robot.py @@ -60,9 +60,6 @@ class Robot: self.nickname = None self.alias = None self.phone = None - # 连续同步超时计数单独保留,目的是区分“偶发网络抖动”和“服务端已经持续不可用”。 - # 这样后续日志可以更明确地告诉我们当前是第几次连续超时,排障时不需要手工数日志。 - self.ipad_sync_timeout_streak = 0 self.message_auto_revoke: MessageAutoRevoke = None self.LOG.debug(f"DB+REDIS 连接池开始初始化") # 使用单例模式获取实例 @@ -306,11 +303,8 @@ class Robot: # await self.ipad_bot.send_text_message("filehelper", "ipad客户端启动成功") count = 0 while True: - data_temp = await self._sync_ipad_messages_with_guard("处理堆积消息") - if data_temp is None: - continue - - data = data_temp.get("AddMsgs") + data = await self.ipad_bot.sync_message() + data = data.get("AddMsgs") if not data: if count > 2: break @@ -330,8 +324,17 @@ class Robot: # 开始处理消息 self.LOG.info("开始处理wechat_ipad消息") while self.ipad_running: - data_temp = await self._sync_ipad_messages_with_guard("消息主循环") - if data_temp is None: + try: + data_temp = await self.ipad_bot.sync_message() + except Exception as e: + self.LOG.error(f"获取新消息失败 {e}") + if "用户可能退出" in str(e): + self.LOG.error(f"用户可能退出: {e}") + self.email_sender.send_wechat_alert(self.config.email.get("alert_recipient"), + f"用户可能退出: {e}", self.wxid, + self.nickname) + await self.login_twice_auto_auth() + await asyncio.sleep(5) continue data = data_temp.get("AddMsgs") @@ -381,55 +384,6 @@ class Robot: self.LOG.exception(f"wechat_ipad客户端运行出错: {e}") self.ipad_running = False - async def _sync_ipad_messages_with_guard(self, phase: str) -> dict | None: - """统一封装 wechat_ipad 的消息同步调用。 - - 设计说明: - 1. 启动阶段清空堆积消息和运行阶段实时拉消息,本质上调用的是同一个 `/api/Msg/Sync`; - 2. 过去两处各自直接调用 `sync_message()`,导致启动阶段一旦超时就会把整个主循环打断; - 3. 现在把重试、连续超时计数、掉线自愈放在一起,后续如果策略要调整,只改这一处即可。 - """ - try: - data = await self.ipad_bot.sync_message() - if self.ipad_sync_timeout_streak > 0: - # 这里在恢复成功时主动打一条恢复日志,方便和前面的连续超时告警配对查看。 - self.LOG.info( - f"{phase}同步消息恢复正常,已清空连续超时计数: {self.ipad_sync_timeout_streak}" - ) - self.ipad_sync_timeout_streak = 0 - return data - except wechat_ipad.RequestTimeoutError as timeout_error: - # 对同步超时做“警告级别 + 连续计数”处理,而不是直接当致命错误退出: - # 1. 局域网环境下偶发抖动、服务端短暂卡顿都可能触发超时; - # 2. 这类问题大多数可通过下一轮重试自动恢复; - # 3. 只有保留连续次数,我们才能快速判断是偶发还是持续故障。 - self.ipad_sync_timeout_streak += 1 - self.LOG.warning( - f"{phase}同步消息超时,第 {self.ipad_sync_timeout_streak} 次连续超时: {timeout_error}" - ) - except Exception as e: - if self.ipad_sync_timeout_streak > 0: - # 非超时异常说明故障类型已经变化,先把超时计数归零,避免后续日志语义混乱。 - self.LOG.warning( - f"{phase}同步消息异常类型已变化,重置连续超时计数: {self.ipad_sync_timeout_streak}" - ) - self.ipad_sync_timeout_streak = 0 - - self.LOG.error(f"{phase}获取新消息失败: {e}") - if "用户可能退出" in str(e): - self.LOG.error(f"用户可能退出: {e}") - self.email_sender.send_wechat_alert( - self.config.email.get("alert_recipient"), - f"用户可能退出: {e}", - self.wxid, - self.nickname - ) - await self.login_twice_auto_auth() - - # 这里统一等待 5 秒再重试,避免在服务端异常时进入高频空转。 - await asyncio.sleep(5) - return None - # 在类里直接写一个内联 async 方法(不额外抽取新的对外方法) async def _process_with_semaphore(self, wxmsg): @@ -1133,21 +1087,3 @@ class Robot: self.message_storage.write_to_db() except Exception as e: self.LOG.error(f"write_to_db error:{e}") - - async def sync_contact_avatar_cache(self) -> None: - """系统级定时任务:增量同步联系人头像缓存。 - - 说明: - 1. 头像缓存预热从登录启动链路挪到这里,避免机器人刚上线时就批量下载头像; - 2. `ContactManager.sync_avatar_cache()` 仍是同步 I/O,因此这里用 `asyncio.to_thread` 丢到线程池执行; - 3. 这样既保留定时批处理能力,也不会阻塞 async_job 所在的事件循环。 - """ - try: - stats = await asyncio.to_thread( - self.contact_manager.sync_avatar_cache, - "system_job:sync_contact_avatar_cache", - ) - self.LOG.info(f"联系人头像缓存定时同步完成: {stats}") - except Exception as e: - self.LOG.error(f"sync_contact_avatar_cache error: {e}") - raise diff --git a/utils/system_jobs.py b/utils/system_jobs.py index 0a5a5c6..a6141cd 100644 --- a/utils/system_jobs.py +++ b/utils/system_jobs.py @@ -43,14 +43,6 @@ def get_system_job_definitions(robot) -> List[Dict[str, Any]]: "trigger_config": {"seconds": 300}, "handler": _build_process_pending_images_handler(robot), }, - { - "job_key": "sync_contact_avatar_cache", - "name": "联系人头像缓存同步", - "description": "每 30 分钟增量同步一次联系人头像缓存,避免启动时集中下载", - "trigger_type": "every_seconds", - "trigger_config": {"seconds": 1800}, - "handler": robot.sync_contact_avatar_cache, - }, ] def _build_process_pending_images_handler(robot) -> Callable[[], Awaitable[None]]: diff --git a/utils/wechat/contact_manager.py b/utils/wechat/contact_manager.py index 44630ef..d0b0225 100644 --- a/utils/wechat/contact_manager.py +++ b/utils/wechat/contact_manager.py @@ -33,6 +33,9 @@ class ContactManager: _avatar_cache_dir: Optional[Path] = None _avatar_manifest_path: Optional[Path] = None _avatar_manifest: Dict[str, Dict[str, str]] = {} + _avatar_sync_state_lock = threading.Lock() + _avatar_sync_running = False + _avatar_sync_pending = False # 定义公共好友列表 _PUBLIC_FRIENDS = { 'fmessage': '朋友推荐消息', @@ -92,14 +95,54 @@ class ContactManager: self._friends = friends self._head_images = head_imgs self._group_members = chatroom_members - # 这里不再在刷新通讯录时主动触发头像下载: - # 1. 用户当前明确希望“启动时只加载联系人元数据,不要顺手拉整批头像”; - # 2. 头像缓存改由独立定时任务统一批处理,降低启动期的网络和磁盘压力; - # 3. 若后台页面或统计图提前访问头像,仍可走 `ensure_head_image_cached()` 按需补下载。 + # 头像缓存同步改成后台异步触发: + # 1. set_contacts 常出现在“刷新通讯录”主链路里,若同步下载头像会明显阻塞系统; + # 2. 这里改成只提交后台任务,主线程先把联系人数据更新完立即返回; + # 3. 后台线程会基于当前最新 head_images 做增量同步,不会丢掉头像更新。 + self._schedule_avatar_cache_sync(reason="set_contacts") self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人") # 分类联系人 self._classify_contacts() + def _schedule_avatar_cache_sync(self, reason: str = "unknown") -> None: + """提交头像缓存后台同步任务,避免在主链路里同步下载头像。""" + with self._avatar_sync_state_lock: + self._avatar_sync_pending = True + if self._avatar_sync_running: + self._logger.debug(f"头像缓存后台同步已在执行,合并本次请求 reason={reason}") + return + self._avatar_sync_running = True + worker = threading.Thread( + target=self._avatar_sync_worker, + args=(reason,), + daemon=True, + name="contact-avatar-sync", + ) + worker.start() + self._logger.debug(f"头像缓存后台同步已提交 reason={reason}") + + def _avatar_sync_worker(self, initial_reason: str) -> None: + """串行消费头像缓存同步请求,确保高频更新时只跑最新一轮。""" + trigger_reason = initial_reason + while True: + with self._avatar_sync_state_lock: + self._avatar_sync_pending = False + try: + self._logger.debug(f"头像缓存后台同步开始 reason={trigger_reason}") + self._sync_avatar_cache() + except Exception as exc: + self._logger.error(f"头像缓存后台同步失败 reason={trigger_reason}: {exc}") + + with self._avatar_sync_state_lock: + # 如果在本轮执行期间又收到了新的同步请求,就立刻再跑一轮, + # 这样可以把多次 set_contacts/update_head_image 合并成少量批处理。 + if self._avatar_sync_pending: + trigger_reason = "pending_update" + continue + self._avatar_sync_running = False + self._logger.debug("头像缓存后台同步结束") + break + def _init_avatar_cache(self) -> None: """初始化头像缓存目录和 manifest。""" cache_dir = Path(__file__).resolve().parents[2] / "temp" / "contact_avatars" @@ -245,30 +288,12 @@ class ContactManager: response.close() return str(target_path) - def sync_avatar_cache(self, reason: str = "manual") -> Dict[str, int]: - """按当前头像 URL 增量同步本地缓存。 - - Args: - reason: 本次同步的触发来源,方便从日志区分是定时任务、人工触发还是别的入口。 - - Returns: - Dict[str, int]: 本轮同步统计信息,便于调用方记录日志或观察执行效果。 - """ + def _sync_avatar_cache(self) -> None: + """按当前头像 URL 增量同步本地缓存。""" with self._avatar_cache_lock: head_images_snapshot = dict(self._head_images) if not head_images_snapshot: - self._logger.debug(f"头像缓存同步跳过 reason={reason} because no head images loaded") - return { - "total_head_images": 0, - "manifest_entries": len(self._avatar_manifest), - "reuse": 0, - "downloaded": 0, - "replaced": 0, - "removed_contacts": 0, - "removed_files": 0, - "orphan_deleted": 0, - "tmp_deleted": 0, - } + return manifest_changed = False # 统计字段用于打 debug 汇总日志,方便观察“初始化第一批头像”时到底发生了什么: @@ -332,31 +357,18 @@ class ContactManager: # 无论本轮 manifest 是否有变化,都顺手做一次目录对账, # 保证历史异常中断留下的孤儿文件也能逐步被回收。 cleanup_stats = self._cleanup_avatar_cache_files() - stats = { - "total_head_images": len(active_wxids), - "manifest_entries": len(self._avatar_manifest), - "reuse": reuse_count, - "downloaded": download_count, - "replaced": replaced_count, - "removed_contacts": removed_contact_count, - "removed_files": removed_file_count, - "orphan_deleted": cleanup_stats.get("orphan_deleted", 0), - "tmp_deleted": cleanup_stats.get("tmp_deleted", 0), - } self._logger.debug( "头像缓存同步完成 " - f"reason={reason} " - f"total_head_images={stats['total_head_images']} " - f"manifest_entries={stats['manifest_entries']} " - f"reuse={stats['reuse']} " - f"downloaded={stats['downloaded']} " - f"replaced={stats['replaced']} " - f"removed_contacts={stats['removed_contacts']} " - f"removed_files={stats['removed_files']} " - f"orphan_deleted={stats['orphan_deleted']} " - f"tmp_deleted={stats['tmp_deleted']}" + f"total_head_images={len(active_wxids)} " + f"manifest_entries={len(self._avatar_manifest)} " + f"reuse={reuse_count} " + f"downloaded={download_count} " + f"replaced={replaced_count} " + f"removed_contacts={removed_contact_count} " + f"removed_files={removed_file_count} " + f"orphan_deleted={cleanup_stats.get('orphan_deleted', 0)} " + f"tmp_deleted={cleanup_stats.get('tmp_deleted', 0)}" ) - return stats def get_cached_head_image_path(self, wxid: str) -> str: """返回头像缓存本地路径,若缓存不存在则返回空字符串。""" @@ -541,10 +553,11 @@ class ContactManager: """ with self._avatar_cache_lock: self._head_images.update({wxid: head_image}) - # 单头像更新现在只更新内存里的最新远端 URL: - # 1. 实时消息链路不再顺手触发下载,避免把头像同步重新带回主链路; - # 2. 批量预热交给定时任务统一处理,减少碎片化下载; - # 3. 若某个页面立刻需要这张头像,访问链路仍会按需补下载,不影响功能可用性。 + # 单头像更新也改成后台异步: + # 1. 入群欢迎、成员变更等实时链路不能被头像下载阻塞; + # 2. 这里只更新最新远端 URL,并触发后台增量同步; + # 3. 若页面提前访问该头像,/api/avatar 仍会按需补下载,不影响可用性。 + self._schedule_avatar_cache_sync(reason=f"update_head_image:{wxid}") return True def get_group_name(self, roomid: str, wxid: str) -> str: diff --git a/wechat_ipad/client/message.py b/wechat_ipad/client/message.py index 60b6a8b..14cde47 100644 --- a/wechat_ipad/client/message.py +++ b/wechat_ipad/client/message.py @@ -18,7 +18,6 @@ import aiofiles from utils.video_utils import get_first_frame, get_first_frame_bytes from utils.trace_context import format_trace_prefix from wechat_ipad import UserLoggedOut -from wechat_ipad.errors import RequestTimeoutError from wechat_ipad.client.base import WechatAPIClientBase @@ -732,41 +731,15 @@ class MessageMixin(WechatAPIClientBase): Raises: UserLoggedOut: 未登录时调用 - RequestTimeoutError: 同步接口超时时抛出,方便上层做重试而不是直接退出主循环 根据error_handler处理错误 """ if not self.wxid: raise UserLoggedOut("请先登录") - # `/api/Msg/Sync` 在没有新消息时,服务端经常会把请求挂住一段时间再返回, - # 行为更接近“短轮询/长轮询”而不是普通的即时 RPC。 - # 因此这里不能继续沿用 10 秒总超时,否则在空闲期也会被误判成异常。 - timeout = aiohttp.ClientTimeout( - total=70, - connect=10, - sock_connect=10, - sock_read=60, - ) - async with aiohttp.ClientSession(timeout=timeout) as session: + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session: json_param = {"Wxid": self.wxid, "Scene": 0, "Synckey": ""} - try: - response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/Sync', json=json_param) - # 某些服务端实现不会稳定返回 `application/json`, - # 这里放宽 content-type 校验,避免把“已收到响应”误判成解析失败。 - json_resp = await response.json(content_type=None) - except asyncio.TimeoutError as exc: - self.logging.warning( - "同步消息超时: wxid:{} server={}:{} connect_timeout={}s read_timeout={}s total_timeout={}s", - self.wxid, - self.ip, - self.port, - timeout.sock_connect, - timeout.sock_read, - timeout.total, - ) - raise RequestTimeoutError( - f"SyncMessage 接口调用超时,读超时={timeout.sock_read}s,总超时={timeout.total}s" - ) from exc + response = await session.post(f'http://{self.ip}:{self.port}/api/Msg/Sync', json=json_param) + json_resp = await response.json() if json_resp.get("Success"): return json_resp.get("Data") diff --git a/wechat_ipad/errors.py b/wechat_ipad/errors.py index 5e2d95b..e186583 100644 --- a/wechat_ipad/errors.py +++ b/wechat_ipad/errors.py @@ -33,8 +33,3 @@ class UserLoggedOut(Exception): class BanProtection(Exception): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - - -class RequestTimeoutError(Exception): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs)