From 4dbf390c6544b76b62554127d36cd7ecc872ddae Mon Sep 17 00:00:00 2001 From: liuwei Date: Mon, 27 Apr 2026 09:30:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=E9=80=9A=E8=AE=AF=E5=BD=95=E5=88=B7?= =?UTF-8?q?=E6=96=B0=E4=B8=8E=E5=A4=B4=E5=83=8F=E7=BC=93=E5=AD=98=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E6=94=B9=E4=B8=BA=E5=BC=82=E6=AD=A5=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- admin/dashboard/blueprints/contacts.py | 40 ++++- .../templates/contacts_management.html | 6 +- utils/wechat/contact_manager.py | 144 ++++++++++++------ 3 files changed, 138 insertions(+), 52 deletions(-) diff --git a/admin/dashboard/blueprints/contacts.py b/admin/dashboard/blueprints/contacts.py index 2f1e757..27a11c5 100644 --- a/admin/dashboard/blueprints/contacts.py +++ b/admin/dashboard/blueprints/contacts.py @@ -18,6 +18,8 @@ message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="mes # 创建共享的事件循环 shared_loop = None loop_lock = threading.Lock() +contacts_refresh_lock = threading.Lock() +contacts_refresh_running = False _EMOJI_MD5_RE = re.compile(r'md5\s*=\s*[\"\']([0-9a-fA-F]{16,64})[\"\']', re.IGNORECASE) _EMOJI_TOTALLEN_RE = re.compile(r'(?:totallen|total_len|len)\s*=\s*[\"\'](\d+)[\"\']', re.IGNORECASE) @@ -70,6 +72,30 @@ def run_member_context_refresh_in_thread(func, *args, **kwargs): message_thread_pool.submit(run) +def run_contacts_refresh_in_thread(server): + """将通讯录刷新放到后台线程执行,避免阻塞后台请求与系统主链路。""" + global contacts_refresh_running + with contacts_refresh_lock: + if contacts_refresh_running: + return False + contacts_refresh_running = True + + def run(): + global contacts_refresh_running + try: + logger.info("通讯录后台刷新任务开始执行") + asyncio.run(server.robot.refresh_contacts_db()) + logger.info("通讯录后台刷新任务执行完成") + except Exception as e: + logger.error(f"通讯录后台刷新任务执行失败: {e}") + finally: + with contacts_refresh_lock: + contacts_refresh_running = False + + message_thread_pool.submit(run) + return True + + def _safe_text(value): return "" if value is None else str(value) @@ -616,12 +642,14 @@ def api_contacts_update(): """更新通讯录信息API""" try: server = current_app.dashboard_server - # 假设 contact_manager 有 update_contacts 方法用于同步通讯录 - result = asyncio.run(server.robot.refresh_contacts_db()) - if result: - return jsonify({"success": True, "message": "通讯录更新成功"}) - else: - return jsonify({"success": False, "message": "通讯录更新失败"}), 500 + # 通讯录刷新改成后台异步任务: + # 1. 远端拉联系人详情 + 群成员详情 + 头像缓存同步都可能较慢; + # 2. 若接口同步等待,会直接卡住后台请求线程,影响整体使用体验; + # 3. 这里提交任务后立刻返回,等后台线程慢慢完成刷新。 + submitted = run_contacts_refresh_in_thread(server) + if submitted: + return jsonify({"success": True, "message": "通讯录更新任务已提交,请稍后手动刷新查看结果"}) + return jsonify({"success": True, "message": "通讯录更新任务已在执行中,请稍后再刷新"}) except Exception as e: logger.error(f"更新通讯录失败: {e}") return jsonify({"success": False, "message": f"更新通讯录失败: {str(e)}"}), 500 diff --git a/admin/dashboard/templates/contacts_management.html b/admin/dashboard/templates/contacts_management.html index c091756..63b0196 100644 --- a/admin/dashboard/templates/contacts_management.html +++ b/admin/dashboard/templates/contacts_management.html @@ -964,7 +964,11 @@ }, updateContacts() { this.$message.info('正在更新通讯录...'); - axios.post('/contacts/api/update').then(res => { if (res.data.success) { this.$message.success('通讯录更新成功!'); this.refreshContacts(); } else { this.$message.error(res.data.message || '通讯录更新失败'); } }).catch(() => { this.$message.error('通讯录更新请求失败'); }); + // 通讯录刷新已改成后台异步任务: + // 1. 点击后只提交任务,不再阻塞等待全部联系人与头像同步完成; + // 2. 因为结果是异步落库,这里不再立刻 refreshContacts,避免用户误以为没生效; + // 3. 任务完成后用户手动点“刷新数据”即可看到最新结果。 + axios.post('/contacts/api/update').then(res => { if (res.data.success) { this.$message.success(res.data.message || '通讯录更新任务已提交'); } else { this.$message.error(res.data.message || '通讯录更新失败'); } }).catch(() => { this.$message.error('通讯录更新请求失败'); }); }, refreshContacts() { this.loadContactsData(); this.$message.success('联系人数据已刷新'); }, handleTabClick() { this.currentPage = 1; }, diff --git a/utils/wechat/contact_manager.py b/utils/wechat/contact_manager.py index 762ce81..d0b0225 100644 --- a/utils/wechat/contact_manager.py +++ b/utils/wechat/contact_manager.py @@ -33,6 +33,9 @@ class ContactManager: _avatar_cache_dir: Optional[Path] = None _avatar_manifest_path: Optional[Path] = None _avatar_manifest: Dict[str, Dict[str, str]] = {} + _avatar_sync_state_lock = threading.Lock() + _avatar_sync_running = False + _avatar_sync_pending = False # 定义公共好友列表 _PUBLIC_FRIENDS = { 'fmessage': '朋友推荐消息', @@ -85,19 +88,61 @@ class ContactManager: "gender": gender} chatroom_members: 所有的群成员昵称信息 """ - self._contacts = contacts - self._friends = friends - self._head_images = head_imgs - self._group_members = chatroom_members - # 通讯录刷新后顺手做一次头像缓存增量同步: - # 1. 只处理新增头像或 URL 已变化的联系人; - # 2. 不改动业务侧原有头像 URL 存储方式,避免影响其他调用链; - # 3. 让后台展示和后续图片渲染都能尽量命中本地缓存。 - self._sync_avatar_cache() + # 这些内存态会被后台头像同步线程读取,因此更新时统一加锁, + # 避免“刷新通讯录”和“后台下载头像”并发时出现字典遍历被修改的问题。 + with self._avatar_cache_lock: + self._contacts = contacts + self._friends = friends + self._head_images = head_imgs + self._group_members = chatroom_members + # 头像缓存同步改成后台异步触发: + # 1. set_contacts 常出现在“刷新通讯录”主链路里,若同步下载头像会明显阻塞系统; + # 2. 这里改成只提交后台任务,主线程先把联系人数据更新完立即返回; + # 3. 后台线程会基于当前最新 head_images 做增量同步,不会丢掉头像更新。 + self._schedule_avatar_cache_sync(reason="set_contacts") self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人") # 分类联系人 self._classify_contacts() + def _schedule_avatar_cache_sync(self, reason: str = "unknown") -> None: + """提交头像缓存后台同步任务,避免在主链路里同步下载头像。""" + with self._avatar_sync_state_lock: + self._avatar_sync_pending = True + if self._avatar_sync_running: + self._logger.debug(f"头像缓存后台同步已在执行,合并本次请求 reason={reason}") + return + self._avatar_sync_running = True + worker = threading.Thread( + target=self._avatar_sync_worker, + args=(reason,), + daemon=True, + name="contact-avatar-sync", + ) + worker.start() + self._logger.debug(f"头像缓存后台同步已提交 reason={reason}") + + def _avatar_sync_worker(self, initial_reason: str) -> None: + """串行消费头像缓存同步请求,确保高频更新时只跑最新一轮。""" + trigger_reason = initial_reason + while True: + with self._avatar_sync_state_lock: + self._avatar_sync_pending = False + try: + self._logger.debug(f"头像缓存后台同步开始 reason={trigger_reason}") + self._sync_avatar_cache() + except Exception as exc: + self._logger.error(f"头像缓存后台同步失败 reason={trigger_reason}: {exc}") + + with self._avatar_sync_state_lock: + # 如果在本轮执行期间又收到了新的同步请求,就立刻再跑一轮, + # 这样可以把多次 set_contacts/update_head_image 合并成少量批处理。 + if self._avatar_sync_pending: + trigger_reason = "pending_update" + continue + self._avatar_sync_running = False + self._logger.debug("头像缓存后台同步结束") + break + def _init_avatar_cache(self) -> None: """初始化头像缓存目录和 manifest。""" cache_dir = Path(__file__).resolve().parents[2] / "temp" / "contact_avatars" @@ -245,7 +290,9 @@ class ContactManager: def _sync_avatar_cache(self) -> None: """按当前头像 URL 增量同步本地缓存。""" - if not self._head_images: + with self._avatar_cache_lock: + head_images_snapshot = dict(self._head_images) + if not head_images_snapshot: return manifest_changed = False @@ -259,41 +306,45 @@ class ContactManager: replaced_count = 0 removed_contact_count = 0 removed_file_count = 0 - with self._avatar_cache_lock: - for wxid, avatar_url in self._head_images.items(): - remote_url = str(avatar_url or "").strip() - if not remote_url: - continue - manifest_item = self._avatar_manifest.get(wxid, {}) - old_file_name = str(manifest_item.get("file_name") or "").strip() - cached_path = self.get_cached_head_image_path(wxid) - # 只有“URL 变了”或“本地文件丢了”才重新下载,避免刷新通讯录时重复打远端。 - if manifest_item.get("remote_url") == remote_url and cached_path: - reuse_count += 1 - continue - downloaded_path = self._download_avatar_to_cache(wxid, remote_url) - if not downloaded_path: - self._logger.debug(f"头像缓存同步跳过 wxid={wxid} reason=download_failed") - continue + for wxid, avatar_url in head_images_snapshot.items(): + remote_url = str(avatar_url or "").strip() + if not remote_url: + continue + with self._avatar_cache_lock: + manifest_item = dict(self._avatar_manifest.get(wxid, {}) or {}) + old_file_name = str(manifest_item.get("file_name") or "").strip() + cached_path = self.get_cached_head_image_path(wxid) + # 只有“URL 变了”或“本地文件丢了”才重新下载,避免刷新通讯录时重复打远端。 + if manifest_item.get("remote_url") == remote_url and cached_path: + reuse_count += 1 + continue + downloaded_path = self._download_avatar_to_cache(wxid, remote_url) + if not downloaded_path: + self._logger.debug(f"头像缓存同步跳过 wxid={wxid} reason=download_failed") + continue + with self._avatar_cache_lock: self._avatar_manifest[wxid] = { "file_name": Path(downloaded_path).name, "remote_url": remote_url, } - download_count += 1 - # 同一联系人头像地址变化后,旧文件已经失去引用,这里立刻删掉旧版本。 - if old_file_name and old_file_name != Path(downloaded_path).name: - if self._delete_avatar_file_by_name(old_file_name): - removed_file_count += 1 - replaced_count += 1 - self._logger.debug( - f"头像缓存已替换 wxid={wxid} old={old_file_name} new={Path(downloaded_path).name}" - ) - else: - self._logger.debug(f"头像缓存已下载 wxid={wxid} file={Path(downloaded_path).name}") - manifest_changed = True + download_count += 1 + # 同一联系人头像地址变化后,旧文件已经失去引用,这里立刻删掉旧版本。 + if old_file_name and old_file_name != Path(downloaded_path).name: + if self._delete_avatar_file_by_name(old_file_name): + removed_file_count += 1 + replaced_count += 1 + self._logger.debug( + f"头像缓存已替换 wxid={wxid} old={old_file_name} new={Path(downloaded_path).name}" + ) + else: + self._logger.debug(f"头像缓存已下载 wxid={wxid} file={Path(downloaded_path).name}") + manifest_changed = True - # 把已经不在通讯录中的头像记录清理掉,避免 manifest 无限增长。 - removed_wxids = [wxid for wxid in self._avatar_manifest.keys() if wxid not in self._head_images] + with self._avatar_cache_lock: + # 删除记录时以“当前最新 head_images”为准,而不是旧快照, + # 避免同步过程中恰好有新联系人写入时被上一轮误删。 + active_wxids = set(self._head_images.keys()) + removed_wxids = [wxid for wxid in self._avatar_manifest.keys() if wxid not in active_wxids] for wxid in removed_wxids: removed_meta = self._avatar_manifest.pop(wxid, None) or {} removed_contact_count += 1 @@ -308,7 +359,7 @@ class ContactManager: cleanup_stats = self._cleanup_avatar_cache_files() self._logger.debug( "头像缓存同步完成 " - f"total_head_images={len(self._head_images)} " + f"total_head_images={len(active_wxids)} " f"manifest_entries={len(self._avatar_manifest)} " f"reuse={reuse_count} " f"downloaded={download_count} " @@ -500,10 +551,13 @@ class ContactManager: Returns: 对应的头像,如果不存在这返回"" """ - self._head_images.update({wxid: head_image}) - # 群成员头像变化通常意味着微信端已经给了新的资源地址, - # 这里即时重拉一次本地缓存,保证通讯录和后续渲染尽快拿到最新头像。 - self.ensure_head_image_cached(wxid) + with self._avatar_cache_lock: + self._head_images.update({wxid: head_image}) + # 单头像更新也改成后台异步: + # 1. 入群欢迎、成员变更等实时链路不能被头像下载阻塞; + # 2. 这里只更新最新远端 URL,并触发后台增量同步; + # 3. 若页面提前访问该头像,/api/avatar 仍会按需补下载,不影响可用性。 + self._schedule_avatar_cache_sync(reason=f"update_head_image:{wxid}") return True def get_group_name(self, roomid: str, wxid: str) -> str: