Files
abot/utils/wechat/contact_manager.py

474 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
联系人管理器 - 提供全局访问联系人信息的单例类
"""
import hashlib
import json
import mimetypes
import threading
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse
import requests
from loguru import logger
class ContactManager:
"""联系人管理器单例类"""
_instance = None
_contacts: Dict[str, str] = {}
_group_contacts: Dict[str, str] = {} # 群组联系人
_personal_contacts: Dict[str, str] = {} # 个人联系人
_public_contacts: Dict[str, str] = {} # 公共好友
_official_accounts: Dict[str, str] = {} # 公众号
_head_images: Dict[str, str] = {} # 头像信息
_initialized = False
_logger = logger
_friends: List[str] = []
_group_members: List[Dict] = []
_group_contacts_friends: Dict[str, Dict[str, str]] = {}
_avatar_cache_lock = threading.RLock()
_avatar_cache_dir: Optional[Path] = None
_avatar_manifest_path: Optional[Path] = None
_avatar_manifest: Dict[str, Dict[str, str]] = {}
# 定义公共好友列表
_PUBLIC_FRIENDS = {
'fmessage': '朋友推荐消息',
'medianote': '语音记事本',
'floatbottle': '漂流瓶',
'mphelper': '公众平台安全助手',
'filehelper': '文件传输助手',
'exmail_tool': '腾讯企业邮箱',
'weixin': '微信团队'
}
def __new__(cls):
if cls._instance is None:
cls._instance = super(ContactManager, cls).__new__(cls)
return cls._instance
def __init__(self):
# 确保初始化代码只执行一次
if not ContactManager._initialized:
self._logger.info("初始化联系人管理器")
# 头像缓存采用“本地文件 + manifest 索引”的方式:
# 1. 本地文件用于后台页面与服务端渲染复用,避免每次都走远端头像链接;
# 2. manifest 记录 wxid、头像源地址与本地文件名重启后也能继续命中缓存
# 3. 后续只在头像 URL 变化时重新下载,避免每次刷新通讯录都全量拉取头像。
self._init_avatar_cache()
ContactManager._initialized = True
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
cls._instance = ContactManager()
return cls._instance
def set_contacts(self, contacts: Dict[str, str], friends: List[str], head_imgs: Dict[str, str],
chatroom_members: List[dict]) -> None:
"""设置联系人字典
Args:
contacts: 联系人字典,格式为 {"wxid": "NickName"}
head_imgs: 联系人头像字典,格式为 {"wxid": "http://xxxxx"}
friends: 好友清单 contact = {
"wxid": cnt.get("wxid", ""),
"code": cnt.get("code", ""),
"remark": cnt.get("remark", ""),
"name": cnt.get("name", ""),
"country": cnt.get("country", ""),
"province": cnt.get("province", ""),
"city": cnt.get("city", ""),
"gender": gender}
chatroom_members: 所有的群成员昵称信息
"""
self._contacts = contacts
self._friends = friends
self._head_images = head_imgs
self._group_members = chatroom_members
# 通讯录刷新后顺手做一次头像缓存增量同步:
# 1. 只处理新增头像或 URL 已变化的联系人;
# 2. 不改动业务侧原有头像 URL 存储方式,避免影响其他调用链;
# 3. 让后台展示和后续图片渲染都能尽量命中本地缓存。
self._sync_avatar_cache()
self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人")
# 分类联系人
self._classify_contacts()
def _init_avatar_cache(self) -> None:
"""初始化头像缓存目录和 manifest。"""
cache_dir = Path(__file__).resolve().parents[2] / "temp" / "contact_avatars"
cache_dir.mkdir(parents=True, exist_ok=True)
self._avatar_cache_dir = cache_dir
self._avatar_manifest_path = cache_dir / "avatar_manifest.json"
self._load_avatar_manifest()
def _load_avatar_manifest(self) -> None:
"""加载头像缓存索引,只保留本地文件仍存在的记录。"""
if not self._avatar_manifest_path or not self._avatar_manifest_path.exists():
self._avatar_manifest = {}
return
try:
data = json.loads(self._avatar_manifest_path.read_text(encoding="utf-8"))
except Exception as exc:
self._logger.warning(f"加载头像缓存索引失败,将重新建立缓存: {exc}")
self._avatar_manifest = {}
return
normalized: Dict[str, Dict[str, str]] = {}
for wxid, meta in (data or {}).items():
if not isinstance(meta, dict):
continue
file_name = str(meta.get("file_name") or "").strip()
remote_url = str(meta.get("remote_url") or "").strip()
if not file_name or not remote_url:
continue
local_path = self._avatar_cache_dir / file_name if self._avatar_cache_dir else None
if not local_path or not local_path.exists():
continue
normalized[wxid] = {
"file_name": file_name,
"remote_url": remote_url,
}
self._avatar_manifest = normalized
def _save_avatar_manifest(self) -> None:
"""持久化头像缓存索引,保证重启后还能复用已经下载好的头像。"""
if not self._avatar_manifest_path:
return
try:
self._avatar_manifest_path.write_text(
json.dumps(self._avatar_manifest, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except Exception as exc:
self._logger.warning(f"保存头像缓存索引失败: {exc}")
def _guess_avatar_extension(self, avatar_url: str, content_type: str) -> str:
"""推断头像文件扩展名,尽量保留真实图片类型。"""
guessed_from_type = mimetypes.guess_extension((content_type or "").split(";")[0].strip()) or ""
if guessed_from_type in {".jpe", ".jpeg", ".jpg", ".png", ".gif", ".webp", ".bmp"}:
return ".jpg" if guessed_from_type == ".jpe" else guessed_from_type
parsed_path = Path(urlparse(str(avatar_url or "")).path)
suffix = parsed_path.suffix.lower().strip()
if suffix in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}:
return suffix
return ".jpg"
def _build_avatar_file_name(self, wxid: str, avatar_url: str, extension: str) -> str:
"""构造稳定文件名,让同一联系人头像 URL 变化后可以自然生成新文件。"""
wxid_hash = hashlib.sha1(str(wxid).encode("utf-8")).hexdigest()[:16]
url_hash = hashlib.sha1(str(avatar_url).encode("utf-8")).hexdigest()[:16]
return f"{wxid_hash}_{url_hash}{extension}"
def _download_avatar_to_cache(self, wxid: str, avatar_url: str) -> Optional[str]:
"""下载头像到本地缓存目录并返回本地文件路径。"""
avatar_url = str(avatar_url or "").strip()
if not avatar_url or not self._avatar_cache_dir:
return None
try:
response = requests.get(avatar_url, stream=True, timeout=12)
response.raise_for_status()
except Exception as exc:
self._logger.debug(f"下载头像失败 wxid={wxid}: {exc}")
return None
extension = self._guess_avatar_extension(avatar_url, response.headers.get("Content-Type", ""))
file_name = self._build_avatar_file_name(wxid, avatar_url, extension)
target_path = self._avatar_cache_dir / file_name
temp_path = target_path.with_suffix(f"{target_path.suffix}.tmp")
try:
with temp_path.open("wb") as file_obj:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file_obj.write(chunk)
temp_path.replace(target_path)
except Exception as exc:
self._logger.warning(f"写入头像缓存失败 wxid={wxid}: {exc}")
try:
if temp_path.exists():
temp_path.unlink()
except Exception:
pass
return None
finally:
response.close()
return str(target_path)
def _sync_avatar_cache(self) -> None:
"""按当前头像 URL 增量同步本地缓存。"""
if not self._head_images:
return
manifest_changed = False
with self._avatar_cache_lock:
for wxid, avatar_url in self._head_images.items():
remote_url = str(avatar_url or "").strip()
if not remote_url:
continue
manifest_item = self._avatar_manifest.get(wxid, {})
cached_path = self.get_cached_head_image_path(wxid)
# 只有“URL 变了”或“本地文件丢了”才重新下载,避免刷新通讯录时重复打远端。
if manifest_item.get("remote_url") == remote_url and cached_path:
continue
downloaded_path = self._download_avatar_to_cache(wxid, remote_url)
if not downloaded_path:
continue
self._avatar_manifest[wxid] = {
"file_name": Path(downloaded_path).name,
"remote_url": remote_url,
}
manifest_changed = True
# 把已经不在通讯录中的头像记录清理掉,避免 manifest 无限增长。
removed_wxids = [wxid for wxid in self._avatar_manifest.keys() if wxid not in self._head_images]
for wxid in removed_wxids:
self._avatar_manifest.pop(wxid, None)
manifest_changed = True
if manifest_changed:
self._save_avatar_manifest()
def get_cached_head_image_path(self, wxid: str) -> str:
"""返回头像缓存本地路径,若缓存不存在则返回空字符串。"""
with self._avatar_cache_lock:
meta = self._avatar_manifest.get(wxid) or {}
file_name = str(meta.get("file_name") or "").strip()
if not file_name or not self._avatar_cache_dir:
return ""
local_path = self._avatar_cache_dir / file_name
return str(local_path) if local_path.exists() else ""
def ensure_head_image_cached(self, wxid: str) -> str:
"""确保指定联系人的头像已缓存到本地,返回本地路径。"""
remote_url = str(self._head_images.get(wxid) or "").strip()
manifest_item = self._avatar_manifest.get(wxid) or {}
cached_path = self.get_cached_head_image_path(wxid)
# 如果本地已有缓存,且当前头像 URL 没变,就直接复用;
# 如果 URL 已变化,则继续往下重拉新头像,避免页面一直展示旧图。
if cached_path and (not remote_url or manifest_item.get("remote_url") == remote_url):
return cached_path
if not remote_url:
return ""
with self._avatar_cache_lock:
# 双重检查避免高并发下重复下载同一张头像。
manifest_item = self._avatar_manifest.get(wxid) or {}
cached_path = self.get_cached_head_image_path(wxid)
if cached_path and manifest_item.get("remote_url") == remote_url:
return cached_path
downloaded_path = self._download_avatar_to_cache(wxid, remote_url)
if not downloaded_path:
return ""
self._avatar_manifest[wxid] = {
"file_name": Path(downloaded_path).name,
"remote_url": remote_url,
}
self._save_avatar_manifest()
return downloaded_path
def get_head_image_version(self, wxid: str) -> str:
"""返回头像版本号,用于前端拼接缓存 bust 参数。"""
with self._avatar_cache_lock:
remote_url = str(self._head_images.get(wxid) or "").strip()
meta = self._avatar_manifest.get(wxid) or {}
file_name = str(meta.get("file_name") or "").strip()
version_source = f"{remote_url}|{file_name}"
return hashlib.sha1(version_source.encode("utf-8")).hexdigest()[:12] if version_source.strip("|") else ""
def _classify_contacts(self) -> None:
"""将联系人分类为群组、个人联系人、公共好友和公众号"""
self._group_contacts = {}
self._personal_contacts = {}
self._public_contacts = {}
self._official_accounts = {}
for wxid, nickname in self._contacts.items():
# 判断是否为公共好友
if wxid in self._PUBLIC_FRIENDS:
self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname)
# 判断是否为公众号wxid以gh_开头
elif wxid.startswith('gh_'):
self._official_accounts[wxid] = nickname
# 判断是否为群组wxid以@chatroom结尾
elif wxid.endswith('@chatroom'):
self._group_contacts[wxid] = nickname
# 确保群ID在字典中存在
if wxid not in self._group_contacts_friends:
self._group_contacts_friends[wxid] = {}
# 获取群成员信息:
for friend in self._group_members:
if friend.get('chatroom_id') == wxid:
self._group_contacts_friends[wxid].update(
{friend.get('wxid'): friend.get('nick_name', friend.get('wxid'))})
else:
# 判断 frinds 在contacts 里面,将在里面的用户分在
if wxid in self._friends:
self._personal_contacts[wxid] = nickname
self._logger.info(f"联系人分类完成: {len(self._group_contacts)} 个群组, "
f"{len(self._personal_contacts)} 个个人联系人, "
f"{len(self._public_contacts)} 个公共好友, "
f"{len(self._official_accounts)} 个公众号")
def get_contacts(self) -> Dict[str, str]:
"""获取所有联系人
Returns:
联系人字典,格式为 {"wxid": "NickName"}
"""
return self._contacts
def get_group_contacts(self) -> Dict[str, str]:
"""获取所有群组联系人
Returns:
群组联系人字典,格式为 {"wxid": "GroupName"}
"""
return self._group_contacts
def get_personal_contacts(self) -> Dict[str, str]:
"""获取所有个人联系人
Returns:
个人联系人字典,格式为 {"wxid": "NickName"}
"""
return self._personal_contacts
def get_public_contacts(self) -> Dict[str, str]:
"""获取所有公共好友
Returns:
公共好友字典,格式为 {"wxid": "NickName"}
"""
return self._public_contacts
def get_official_accounts(self) -> Dict[str, str]:
"""获取所有公众号
Returns:
公众号字典,格式为 {"wxid": "NickName"}
"""
return self._official_accounts
def get_nickname(self, wxid: str) -> str:
"""根据微信ID获取昵称
Args:
wxid: 微信ID
Returns:
对应的昵称如果不存在则返回wxid本身
"""
return self._contacts.get(wxid, wxid)
def get_all_head_images(self) -> Dict[str, str]:
"""返回所有的头像信息
Returns:
头像 {"wxid": "http://xxxxx"}
"""
return self._head_images
def get_head_image(self, wxid: str) -> str:
"""根据微信ID获取头像
Args:
wxid: 微信ID
Returns:
对应的头像,如果不存在这返回""
"""
return self._head_images.get(wxid, "")
def update_head_image(self, wxid: str, head_image: str) -> bool:
"""根据微信ID更新头像
Args:
wxid: 微信ID
head_image:头像地址
Returns:
对应的头像,如果不存在这返回""
"""
self._head_images.update({wxid: head_image})
# 群成员头像变化通常意味着微信端已经给了新的资源地址,
# 这里即时重拉一次本地缓存,保证通讯录和后续渲染尽快拿到最新头像。
self.ensure_head_image_cached(wxid)
return True
def get_group_name(self, roomid: str, wxid: str) -> str:
"""
Args:
roomid: 群ID
wxid: 微信ID
Returns:
对应的昵称如果不存在则返回wxid本身
"""
return self._group_contacts_friends.get(roomid, "").get(wxid, "未知昵称")
def get_group_members(self, roomid: str) -> Dict[str, str]:
"""获取指定群的成员列表
Args:
roomid: 群ID
Returns:
群成员字典,格式为 {"wxid": "NickName"}
"""
return self._group_contacts_friends.get(roomid, {})
def update_group_members(self, roomid: str, wxid: str, nick_name: str) -> bool:
"""更新指定群的成员列表
Args:
roomid: 群ID
Returns:
群成员字典,格式为 {"wxid": "NickName"}
"""
self._group_contacts_friends[roomid].update({wxid: nick_name})
return True
def update_contact(self, wxid: str, nickname: str) -> None:
"""更新单个联系人信息
Args:
wxid: 微信ID
nickname: 昵称
"""
self._contacts[wxid] = nickname
# 更新分类
if wxid in self._PUBLIC_FRIENDS:
self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname)
elif wxid.startswith('gh_'):
self._official_accounts[wxid] = nickname
elif wxid.endswith('@chatroom'):
self._group_contacts[wxid] = nickname
# 确保群ID在字典中存在
if wxid not in self._group_contacts_friends:
self._group_contacts_friends[wxid] = {}
# 获取群成员信息:
for friend in self._group_members:
if friend.get('chatroom_id') == wxid:
self._group_contacts_friends[wxid].update(
{friend.get('wxid'): friend.get('nick_name', friend.get('wxid'))})
else:
if wxid in self._friends:
self._personal_contacts[wxid] = nickname
self._logger.debug(f"已更新联系人: {wxid} -> {nickname}")
def get_contact_statistics(self) -> Tuple[int, int, int, int, int]:
"""获取联系人统计信息
Returns:
包含总联系人数、群组数、个人联系人数、公共好友数和公众号数的元组
"""
return (len(self._contacts), len(self._group_contacts),
len(self._personal_contacts), len(self._public_contacts),
len(self._official_accounts))