将通讯录刷新与头像缓存同步改为异步处理
This commit is contained in:
@@ -18,6 +18,8 @@ message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="mes
|
||||
# 创建共享的事件循环
|
||||
shared_loop = None
|
||||
loop_lock = threading.Lock()
|
||||
contacts_refresh_lock = threading.Lock()
|
||||
contacts_refresh_running = False
|
||||
_EMOJI_MD5_RE = re.compile(r'md5\s*=\s*[\"\']([0-9a-fA-F]{16,64})[\"\']', re.IGNORECASE)
|
||||
_EMOJI_TOTALLEN_RE = re.compile(r'(?:totallen|total_len|len)\s*=\s*[\"\'](\d+)[\"\']', re.IGNORECASE)
|
||||
|
||||
@@ -70,6 +72,30 @@ def run_member_context_refresh_in_thread(func, *args, **kwargs):
|
||||
message_thread_pool.submit(run)
|
||||
|
||||
|
||||
def run_contacts_refresh_in_thread(server):
|
||||
"""将通讯录刷新放到后台线程执行,避免阻塞后台请求与系统主链路。"""
|
||||
global contacts_refresh_running
|
||||
with contacts_refresh_lock:
|
||||
if contacts_refresh_running:
|
||||
return False
|
||||
contacts_refresh_running = True
|
||||
|
||||
def run():
|
||||
global contacts_refresh_running
|
||||
try:
|
||||
logger.info("通讯录后台刷新任务开始执行")
|
||||
asyncio.run(server.robot.refresh_contacts_db())
|
||||
logger.info("通讯录后台刷新任务执行完成")
|
||||
except Exception as e:
|
||||
logger.error(f"通讯录后台刷新任务执行失败: {e}")
|
||||
finally:
|
||||
with contacts_refresh_lock:
|
||||
contacts_refresh_running = False
|
||||
|
||||
message_thread_pool.submit(run)
|
||||
return True
|
||||
|
||||
|
||||
def _safe_text(value):
|
||||
return "" if value is None else str(value)
|
||||
|
||||
@@ -616,12 +642,14 @@ def api_contacts_update():
|
||||
"""更新通讯录信息API"""
|
||||
try:
|
||||
server = current_app.dashboard_server
|
||||
# 假设 contact_manager 有 update_contacts 方法用于同步通讯录
|
||||
result = asyncio.run(server.robot.refresh_contacts_db())
|
||||
if result:
|
||||
return jsonify({"success": True, "message": "通讯录更新成功"})
|
||||
else:
|
||||
return jsonify({"success": False, "message": "通讯录更新失败"}), 500
|
||||
# 通讯录刷新改成后台异步任务:
|
||||
# 1. 远端拉联系人详情 + 群成员详情 + 头像缓存同步都可能较慢;
|
||||
# 2. 若接口同步等待,会直接卡住后台请求线程,影响整体使用体验;
|
||||
# 3. 这里提交任务后立刻返回,等后台线程慢慢完成刷新。
|
||||
submitted = run_contacts_refresh_in_thread(server)
|
||||
if submitted:
|
||||
return jsonify({"success": True, "message": "通讯录更新任务已提交,请稍后手动刷新查看结果"})
|
||||
return jsonify({"success": True, "message": "通讯录更新任务已在执行中,请稍后再刷新"})
|
||||
except Exception as e:
|
||||
logger.error(f"更新通讯录失败: {e}")
|
||||
return jsonify({"success": False, "message": f"更新通讯录失败: {str(e)}"}), 500
|
||||
|
||||
@@ -964,7 +964,11 @@
|
||||
},
|
||||
updateContacts() {
|
||||
this.$message.info('正在更新通讯录...');
|
||||
axios.post('/contacts/api/update').then(res => { if (res.data.success) { this.$message.success('通讯录更新成功!'); this.refreshContacts(); } else { this.$message.error(res.data.message || '通讯录更新失败'); } }).catch(() => { this.$message.error('通讯录更新请求失败'); });
|
||||
// 通讯录刷新已改成后台异步任务:
|
||||
// 1. 点击后只提交任务,不再阻塞等待全部联系人与头像同步完成;
|
||||
// 2. 因为结果是异步落库,这里不再立刻 refreshContacts,避免用户误以为没生效;
|
||||
// 3. 任务完成后用户手动点“刷新数据”即可看到最新结果。
|
||||
axios.post('/contacts/api/update').then(res => { if (res.data.success) { this.$message.success(res.data.message || '通讯录更新任务已提交'); } else { this.$message.error(res.data.message || '通讯录更新失败'); } }).catch(() => { this.$message.error('通讯录更新请求失败'); });
|
||||
},
|
||||
refreshContacts() { this.loadContactsData(); this.$message.success('联系人数据已刷新'); },
|
||||
handleTabClick() { this.currentPage = 1; },
|
||||
|
||||
@@ -33,6 +33,9 @@ class ContactManager:
|
||||
_avatar_cache_dir: Optional[Path] = None
|
||||
_avatar_manifest_path: Optional[Path] = None
|
||||
_avatar_manifest: Dict[str, Dict[str, str]] = {}
|
||||
_avatar_sync_state_lock = threading.Lock()
|
||||
_avatar_sync_running = False
|
||||
_avatar_sync_pending = False
|
||||
# 定义公共好友列表
|
||||
_PUBLIC_FRIENDS = {
|
||||
'fmessage': '朋友推荐消息',
|
||||
@@ -85,19 +88,61 @@ class ContactManager:
|
||||
"gender": gender}
|
||||
chatroom_members: 所有的群成员昵称信息
|
||||
"""
|
||||
self._contacts = contacts
|
||||
self._friends = friends
|
||||
self._head_images = head_imgs
|
||||
self._group_members = chatroom_members
|
||||
# 通讯录刷新后顺手做一次头像缓存增量同步:
|
||||
# 1. 只处理新增头像或 URL 已变化的联系人;
|
||||
# 2. 不改动业务侧原有头像 URL 存储方式,避免影响其他调用链;
|
||||
# 3. 让后台展示和后续图片渲染都能尽量命中本地缓存。
|
||||
self._sync_avatar_cache()
|
||||
# 这些内存态会被后台头像同步线程读取,因此更新时统一加锁,
|
||||
# 避免“刷新通讯录”和“后台下载头像”并发时出现字典遍历被修改的问题。
|
||||
with self._avatar_cache_lock:
|
||||
self._contacts = contacts
|
||||
self._friends = friends
|
||||
self._head_images = head_imgs
|
||||
self._group_members = chatroom_members
|
||||
# 头像缓存同步改成后台异步触发:
|
||||
# 1. set_contacts 常出现在“刷新通讯录”主链路里,若同步下载头像会明显阻塞系统;
|
||||
# 2. 这里改成只提交后台任务,主线程先把联系人数据更新完立即返回;
|
||||
# 3. 后台线程会基于当前最新 head_images 做增量同步,不会丢掉头像更新。
|
||||
self._schedule_avatar_cache_sync(reason="set_contacts")
|
||||
self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人")
|
||||
# 分类联系人
|
||||
self._classify_contacts()
|
||||
|
||||
def _schedule_avatar_cache_sync(self, reason: str = "unknown") -> None:
|
||||
"""提交头像缓存后台同步任务,避免在主链路里同步下载头像。"""
|
||||
with self._avatar_sync_state_lock:
|
||||
self._avatar_sync_pending = True
|
||||
if self._avatar_sync_running:
|
||||
self._logger.debug(f"头像缓存后台同步已在执行,合并本次请求 reason={reason}")
|
||||
return
|
||||
self._avatar_sync_running = True
|
||||
worker = threading.Thread(
|
||||
target=self._avatar_sync_worker,
|
||||
args=(reason,),
|
||||
daemon=True,
|
||||
name="contact-avatar-sync",
|
||||
)
|
||||
worker.start()
|
||||
self._logger.debug(f"头像缓存后台同步已提交 reason={reason}")
|
||||
|
||||
def _avatar_sync_worker(self, initial_reason: str) -> None:
|
||||
"""串行消费头像缓存同步请求,确保高频更新时只跑最新一轮。"""
|
||||
trigger_reason = initial_reason
|
||||
while True:
|
||||
with self._avatar_sync_state_lock:
|
||||
self._avatar_sync_pending = False
|
||||
try:
|
||||
self._logger.debug(f"头像缓存后台同步开始 reason={trigger_reason}")
|
||||
self._sync_avatar_cache()
|
||||
except Exception as exc:
|
||||
self._logger.error(f"头像缓存后台同步失败 reason={trigger_reason}: {exc}")
|
||||
|
||||
with self._avatar_sync_state_lock:
|
||||
# 如果在本轮执行期间又收到了新的同步请求,就立刻再跑一轮,
|
||||
# 这样可以把多次 set_contacts/update_head_image 合并成少量批处理。
|
||||
if self._avatar_sync_pending:
|
||||
trigger_reason = "pending_update"
|
||||
continue
|
||||
self._avatar_sync_running = False
|
||||
self._logger.debug("头像缓存后台同步结束")
|
||||
break
|
||||
|
||||
def _init_avatar_cache(self) -> None:
|
||||
"""初始化头像缓存目录和 manifest。"""
|
||||
cache_dir = Path(__file__).resolve().parents[2] / "temp" / "contact_avatars"
|
||||
@@ -245,7 +290,9 @@ class ContactManager:
|
||||
|
||||
def _sync_avatar_cache(self) -> None:
|
||||
"""按当前头像 URL 增量同步本地缓存。"""
|
||||
if not self._head_images:
|
||||
with self._avatar_cache_lock:
|
||||
head_images_snapshot = dict(self._head_images)
|
||||
if not head_images_snapshot:
|
||||
return
|
||||
|
||||
manifest_changed = False
|
||||
@@ -259,41 +306,45 @@ class ContactManager:
|
||||
replaced_count = 0
|
||||
removed_contact_count = 0
|
||||
removed_file_count = 0
|
||||
with self._avatar_cache_lock:
|
||||
for wxid, avatar_url in self._head_images.items():
|
||||
remote_url = str(avatar_url or "").strip()
|
||||
if not remote_url:
|
||||
continue
|
||||
manifest_item = self._avatar_manifest.get(wxid, {})
|
||||
old_file_name = str(manifest_item.get("file_name") or "").strip()
|
||||
cached_path = self.get_cached_head_image_path(wxid)
|
||||
# 只有“URL 变了”或“本地文件丢了”才重新下载,避免刷新通讯录时重复打远端。
|
||||
if manifest_item.get("remote_url") == remote_url and cached_path:
|
||||
reuse_count += 1
|
||||
continue
|
||||
downloaded_path = self._download_avatar_to_cache(wxid, remote_url)
|
||||
if not downloaded_path:
|
||||
self._logger.debug(f"头像缓存同步跳过 wxid={wxid} reason=download_failed")
|
||||
continue
|
||||
for wxid, avatar_url in head_images_snapshot.items():
|
||||
remote_url = str(avatar_url or "").strip()
|
||||
if not remote_url:
|
||||
continue
|
||||
with self._avatar_cache_lock:
|
||||
manifest_item = dict(self._avatar_manifest.get(wxid, {}) or {})
|
||||
old_file_name = str(manifest_item.get("file_name") or "").strip()
|
||||
cached_path = self.get_cached_head_image_path(wxid)
|
||||
# 只有“URL 变了”或“本地文件丢了”才重新下载,避免刷新通讯录时重复打远端。
|
||||
if manifest_item.get("remote_url") == remote_url and cached_path:
|
||||
reuse_count += 1
|
||||
continue
|
||||
downloaded_path = self._download_avatar_to_cache(wxid, remote_url)
|
||||
if not downloaded_path:
|
||||
self._logger.debug(f"头像缓存同步跳过 wxid={wxid} reason=download_failed")
|
||||
continue
|
||||
with self._avatar_cache_lock:
|
||||
self._avatar_manifest[wxid] = {
|
||||
"file_name": Path(downloaded_path).name,
|
||||
"remote_url": remote_url,
|
||||
}
|
||||
download_count += 1
|
||||
# 同一联系人头像地址变化后,旧文件已经失去引用,这里立刻删掉旧版本。
|
||||
if old_file_name and old_file_name != Path(downloaded_path).name:
|
||||
if self._delete_avatar_file_by_name(old_file_name):
|
||||
removed_file_count += 1
|
||||
replaced_count += 1
|
||||
self._logger.debug(
|
||||
f"头像缓存已替换 wxid={wxid} old={old_file_name} new={Path(downloaded_path).name}"
|
||||
)
|
||||
else:
|
||||
self._logger.debug(f"头像缓存已下载 wxid={wxid} file={Path(downloaded_path).name}")
|
||||
manifest_changed = True
|
||||
download_count += 1
|
||||
# 同一联系人头像地址变化后,旧文件已经失去引用,这里立刻删掉旧版本。
|
||||
if old_file_name and old_file_name != Path(downloaded_path).name:
|
||||
if self._delete_avatar_file_by_name(old_file_name):
|
||||
removed_file_count += 1
|
||||
replaced_count += 1
|
||||
self._logger.debug(
|
||||
f"头像缓存已替换 wxid={wxid} old={old_file_name} new={Path(downloaded_path).name}"
|
||||
)
|
||||
else:
|
||||
self._logger.debug(f"头像缓存已下载 wxid={wxid} file={Path(downloaded_path).name}")
|
||||
manifest_changed = True
|
||||
|
||||
# 把已经不在通讯录中的头像记录清理掉,避免 manifest 无限增长。
|
||||
removed_wxids = [wxid for wxid in self._avatar_manifest.keys() if wxid not in self._head_images]
|
||||
with self._avatar_cache_lock:
|
||||
# 删除记录时以“当前最新 head_images”为准,而不是旧快照,
|
||||
# 避免同步过程中恰好有新联系人写入时被上一轮误删。
|
||||
active_wxids = set(self._head_images.keys())
|
||||
removed_wxids = [wxid for wxid in self._avatar_manifest.keys() if wxid not in active_wxids]
|
||||
for wxid in removed_wxids:
|
||||
removed_meta = self._avatar_manifest.pop(wxid, None) or {}
|
||||
removed_contact_count += 1
|
||||
@@ -308,7 +359,7 @@ class ContactManager:
|
||||
cleanup_stats = self._cleanup_avatar_cache_files()
|
||||
self._logger.debug(
|
||||
"头像缓存同步完成 "
|
||||
f"total_head_images={len(self._head_images)} "
|
||||
f"total_head_images={len(active_wxids)} "
|
||||
f"manifest_entries={len(self._avatar_manifest)} "
|
||||
f"reuse={reuse_count} "
|
||||
f"downloaded={download_count} "
|
||||
@@ -500,10 +551,13 @@ class ContactManager:
|
||||
Returns:
|
||||
对应的头像,如果不存在这返回""
|
||||
"""
|
||||
self._head_images.update({wxid: head_image})
|
||||
# 群成员头像变化通常意味着微信端已经给了新的资源地址,
|
||||
# 这里即时重拉一次本地缓存,保证通讯录和后续渲染尽快拿到最新头像。
|
||||
self.ensure_head_image_cached(wxid)
|
||||
with self._avatar_cache_lock:
|
||||
self._head_images.update({wxid: head_image})
|
||||
# 单头像更新也改成后台异步:
|
||||
# 1. 入群欢迎、成员变更等实时链路不能被头像下载阻塞;
|
||||
# 2. 这里只更新最新远端 URL,并触发后台增量同步;
|
||||
# 3. 若页面提前访问该头像,/api/avatar 仍会按需补下载,不影响可用性。
|
||||
self._schedule_avatar_cache_sync(reason=f"update_head_image:{wxid}")
|
||||
return True
|
||||
|
||||
def get_group_name(self, roomid: str, wxid: str) -> str:
|
||||
|
||||
Reference in New Issue
Block a user