580 lines
25 KiB
Python
580 lines
25 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
联系人管理器 - 提供全局访问联系人信息的单例类
|
||
"""
|
||
|
||
import hashlib
|
||
import json
|
||
import mimetypes
|
||
import threading
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple
|
||
from urllib.parse import urlparse
|
||
|
||
import requests
|
||
from loguru import logger
|
||
|
||
|
||
class ContactManager:
|
||
"""联系人管理器单例类"""
|
||
_instance = None
|
||
_contacts: Dict[str, str] = {}
|
||
_group_contacts: Dict[str, str] = {} # 群组联系人
|
||
_personal_contacts: Dict[str, str] = {} # 个人联系人
|
||
_public_contacts: Dict[str, str] = {} # 公共好友
|
||
_official_accounts: Dict[str, str] = {} # 公众号
|
||
_head_images: Dict[str, str] = {} # 头像信息
|
||
_initialized = False
|
||
_logger = logger
|
||
_friends: List[str] = []
|
||
_group_members: List[Dict] = []
|
||
_group_contacts_friends: Dict[str, Dict[str, str]] = {}
|
||
_avatar_cache_lock = threading.RLock()
|
||
_avatar_cache_dir: Optional[Path] = None
|
||
_avatar_manifest_path: Optional[Path] = None
|
||
_avatar_manifest: Dict[str, Dict[str, str]] = {}
|
||
# 定义公共好友列表
|
||
_PUBLIC_FRIENDS = {
|
||
'fmessage': '朋友推荐消息',
|
||
'medianote': '语音记事本',
|
||
'floatbottle': '漂流瓶',
|
||
'mphelper': '公众平台安全助手',
|
||
'filehelper': '文件传输助手',
|
||
'exmail_tool': '腾讯企业邮箱',
|
||
'weixin': '微信团队'
|
||
}
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super(ContactManager, cls).__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
# 确保初始化代码只执行一次
|
||
if not ContactManager._initialized:
|
||
self._logger.info("初始化联系人管理器")
|
||
# 头像缓存采用“本地文件 + manifest 索引”的方式:
|
||
# 1. 本地文件用于后台页面与服务端渲染复用,避免每次都走远端头像链接;
|
||
# 2. manifest 记录 wxid、头像源地址与本地文件名,重启后也能继续命中缓存;
|
||
# 3. 后续只在头像 URL 变化时重新下载,避免每次刷新通讯录都全量拉取头像。
|
||
self._init_avatar_cache()
|
||
ContactManager._initialized = True
|
||
|
||
@classmethod
|
||
def get_instance(cls):
|
||
"""获取单例实例"""
|
||
if cls._instance is None:
|
||
cls._instance = ContactManager()
|
||
return cls._instance
|
||
|
||
def set_contacts(self, contacts: Dict[str, str], friends: List[str], head_imgs: Dict[str, str],
|
||
chatroom_members: List[dict]) -> None:
|
||
"""设置联系人字典
|
||
|
||
Args:
|
||
contacts: 联系人字典,格式为 {"wxid": "NickName"}
|
||
head_imgs: 联系人头像字典,格式为 {"wxid": "http://xxxxx"}
|
||
friends: 好友清单 contact = {
|
||
"wxid": cnt.get("wxid", ""),
|
||
"code": cnt.get("code", ""),
|
||
"remark": cnt.get("remark", ""),
|
||
"name": cnt.get("name", ""),
|
||
"country": cnt.get("country", ""),
|
||
"province": cnt.get("province", ""),
|
||
"city": cnt.get("city", ""),
|
||
"gender": gender}
|
||
chatroom_members: 所有的群成员昵称信息
|
||
"""
|
||
self._contacts = contacts
|
||
self._friends = friends
|
||
self._head_images = head_imgs
|
||
self._group_members = chatroom_members
|
||
# 通讯录刷新后顺手做一次头像缓存增量同步:
|
||
# 1. 只处理新增头像或 URL 已变化的联系人;
|
||
# 2. 不改动业务侧原有头像 URL 存储方式,避免影响其他调用链;
|
||
# 3. 让后台展示和后续图片渲染都能尽量命中本地缓存。
|
||
self._sync_avatar_cache()
|
||
self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人")
|
||
# 分类联系人
|
||
self._classify_contacts()
|
||
|
||
def _init_avatar_cache(self) -> None:
|
||
"""初始化头像缓存目录和 manifest。"""
|
||
cache_dir = Path(__file__).resolve().parents[2] / "temp" / "contact_avatars"
|
||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||
self._avatar_cache_dir = cache_dir
|
||
self._avatar_manifest_path = cache_dir / "avatar_manifest.json"
|
||
self._load_avatar_manifest()
|
||
|
||
def _load_avatar_manifest(self) -> None:
|
||
"""加载头像缓存索引,只保留本地文件仍存在的记录。"""
|
||
if not self._avatar_manifest_path or not self._avatar_manifest_path.exists():
|
||
self._avatar_manifest = {}
|
||
return
|
||
try:
|
||
data = json.loads(self._avatar_manifest_path.read_text(encoding="utf-8"))
|
||
except Exception as exc:
|
||
self._logger.warning(f"加载头像缓存索引失败,将重新建立缓存: {exc}")
|
||
self._avatar_manifest = {}
|
||
return
|
||
|
||
normalized: Dict[str, Dict[str, str]] = {}
|
||
for wxid, meta in (data or {}).items():
|
||
if not isinstance(meta, dict):
|
||
continue
|
||
file_name = str(meta.get("file_name") or "").strip()
|
||
remote_url = str(meta.get("remote_url") or "").strip()
|
||
if not file_name or not remote_url:
|
||
continue
|
||
local_path = self._avatar_cache_dir / file_name if self._avatar_cache_dir else None
|
||
if not local_path or not local_path.exists():
|
||
continue
|
||
normalized[wxid] = {
|
||
"file_name": file_name,
|
||
"remote_url": remote_url,
|
||
}
|
||
self._avatar_manifest = normalized
|
||
|
||
def _save_avatar_manifest(self) -> None:
|
||
"""持久化头像缓存索引,保证重启后还能复用已经下载好的头像。"""
|
||
if not self._avatar_manifest_path:
|
||
return
|
||
try:
|
||
self._avatar_manifest_path.write_text(
|
||
json.dumps(self._avatar_manifest, ensure_ascii=False, indent=2),
|
||
encoding="utf-8",
|
||
)
|
||
except Exception as exc:
|
||
self._logger.warning(f"保存头像缓存索引失败: {exc}")
|
||
|
||
def _delete_avatar_file_by_name(self, file_name: str) -> bool:
|
||
"""按文件名删除缓存头像,删除失败时只记日志不打断主流程。"""
|
||
file_name = str(file_name or "").strip()
|
||
if not file_name or not self._avatar_cache_dir:
|
||
return False
|
||
target_path = self._avatar_cache_dir / file_name
|
||
if not target_path.exists():
|
||
return False
|
||
try:
|
||
target_path.unlink()
|
||
return True
|
||
except Exception as exc:
|
||
self._logger.warning(f"删除旧头像缓存失败 file={target_path}: {exc}")
|
||
return False
|
||
|
||
def _cleanup_avatar_cache_files(self) -> Dict[str, int]:
|
||
"""清理缓存目录里的孤儿头像文件与残留临时文件。"""
|
||
if not self._avatar_cache_dir:
|
||
return {"orphan_deleted": 0, "tmp_deleted": 0}
|
||
|
||
# 允许保留 manifest 自身,其余不在当前索引里的文件一律视为孤儿文件。
|
||
referenced_files = {
|
||
str(meta.get("file_name") or "").strip()
|
||
for meta in self._avatar_manifest.values()
|
||
if str(meta.get("file_name") or "").strip()
|
||
}
|
||
manifest_name = self._avatar_manifest_path.name if self._avatar_manifest_path else ""
|
||
cleanup_stats = {"orphan_deleted": 0, "tmp_deleted": 0}
|
||
|
||
for file_path in self._avatar_cache_dir.iterdir():
|
||
if not file_path.is_file():
|
||
continue
|
||
if file_path.name == manifest_name:
|
||
continue
|
||
# `.tmp` 文件说明上次下载未完整落盘,直接清掉,避免长时间堆积。
|
||
if file_path.suffix.lower() == ".tmp":
|
||
if self._delete_avatar_file_by_name(file_path.name):
|
||
cleanup_stats["tmp_deleted"] += 1
|
||
continue
|
||
if file_path.name not in referenced_files:
|
||
if self._delete_avatar_file_by_name(file_path.name):
|
||
cleanup_stats["orphan_deleted"] += 1
|
||
return cleanup_stats
|
||
|
||
def _guess_avatar_extension(self, avatar_url: str, content_type: str) -> str:
|
||
"""推断头像文件扩展名,尽量保留真实图片类型。"""
|
||
guessed_from_type = mimetypes.guess_extension((content_type or "").split(";")[0].strip()) or ""
|
||
if guessed_from_type in {".jpe", ".jpeg", ".jpg", ".png", ".gif", ".webp", ".bmp"}:
|
||
return ".jpg" if guessed_from_type == ".jpe" else guessed_from_type
|
||
parsed_path = Path(urlparse(str(avatar_url or "")).path)
|
||
suffix = parsed_path.suffix.lower().strip()
|
||
if suffix in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}:
|
||
return suffix
|
||
return ".jpg"
|
||
|
||
def _build_avatar_file_name(self, wxid: str, avatar_url: str, extension: str) -> str:
|
||
"""构造稳定文件名,让同一联系人头像 URL 变化后可以自然生成新文件。"""
|
||
wxid_hash = hashlib.sha1(str(wxid).encode("utf-8")).hexdigest()[:16]
|
||
url_hash = hashlib.sha1(str(avatar_url).encode("utf-8")).hexdigest()[:16]
|
||
return f"{wxid_hash}_{url_hash}{extension}"
|
||
|
||
def _download_avatar_to_cache(self, wxid: str, avatar_url: str) -> Optional[str]:
|
||
"""下载头像到本地缓存目录并返回本地文件路径。"""
|
||
avatar_url = str(avatar_url or "").strip()
|
||
if not avatar_url or not self._avatar_cache_dir:
|
||
return None
|
||
|
||
try:
|
||
response = requests.get(avatar_url, stream=True, timeout=12)
|
||
response.raise_for_status()
|
||
except Exception as exc:
|
||
self._logger.debug(f"下载头像失败 wxid={wxid}: {exc}")
|
||
return None
|
||
|
||
extension = self._guess_avatar_extension(avatar_url, response.headers.get("Content-Type", ""))
|
||
file_name = self._build_avatar_file_name(wxid, avatar_url, extension)
|
||
target_path = self._avatar_cache_dir / file_name
|
||
temp_path = target_path.with_suffix(f"{target_path.suffix}.tmp")
|
||
try:
|
||
with temp_path.open("wb") as file_obj:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
if chunk:
|
||
file_obj.write(chunk)
|
||
temp_path.replace(target_path)
|
||
except Exception as exc:
|
||
self._logger.warning(f"写入头像缓存失败 wxid={wxid}: {exc}")
|
||
try:
|
||
if temp_path.exists():
|
||
temp_path.unlink()
|
||
except Exception:
|
||
pass
|
||
return None
|
||
finally:
|
||
response.close()
|
||
return str(target_path)
|
||
|
||
def _sync_avatar_cache(self) -> None:
|
||
"""按当前头像 URL 增量同步本地缓存。"""
|
||
if not self._head_images:
|
||
return
|
||
|
||
manifest_changed = False
|
||
# 统计字段用于打 debug 汇总日志,方便观察“初始化第一批头像”时到底发生了什么:
|
||
# 1. reuse_count 表示命中本地缓存、无需下载;
|
||
# 2. download_count 表示本轮真正新增或重拉的头像数量;
|
||
# 3. replaced_count 表示头像 URL 变更导致发生了“新旧替换”;
|
||
# 4. removed_contact_count / removed_file_count 表示联系人消失后同步移除了多少记录和文件。
|
||
reuse_count = 0
|
||
download_count = 0
|
||
replaced_count = 0
|
||
removed_contact_count = 0
|
||
removed_file_count = 0
|
||
with self._avatar_cache_lock:
|
||
for wxid, avatar_url in self._head_images.items():
|
||
remote_url = str(avatar_url or "").strip()
|
||
if not remote_url:
|
||
continue
|
||
manifest_item = self._avatar_manifest.get(wxid, {})
|
||
old_file_name = str(manifest_item.get("file_name") or "").strip()
|
||
cached_path = self.get_cached_head_image_path(wxid)
|
||
# 只有“URL 变了”或“本地文件丢了”才重新下载,避免刷新通讯录时重复打远端。
|
||
if manifest_item.get("remote_url") == remote_url and cached_path:
|
||
reuse_count += 1
|
||
continue
|
||
downloaded_path = self._download_avatar_to_cache(wxid, remote_url)
|
||
if not downloaded_path:
|
||
self._logger.debug(f"头像缓存同步跳过 wxid={wxid} reason=download_failed")
|
||
continue
|
||
self._avatar_manifest[wxid] = {
|
||
"file_name": Path(downloaded_path).name,
|
||
"remote_url": remote_url,
|
||
}
|
||
download_count += 1
|
||
# 同一联系人头像地址变化后,旧文件已经失去引用,这里立刻删掉旧版本。
|
||
if old_file_name and old_file_name != Path(downloaded_path).name:
|
||
if self._delete_avatar_file_by_name(old_file_name):
|
||
removed_file_count += 1
|
||
replaced_count += 1
|
||
self._logger.debug(
|
||
f"头像缓存已替换 wxid={wxid} old={old_file_name} new={Path(downloaded_path).name}"
|
||
)
|
||
else:
|
||
self._logger.debug(f"头像缓存已下载 wxid={wxid} file={Path(downloaded_path).name}")
|
||
manifest_changed = True
|
||
|
||
# 把已经不在通讯录中的头像记录清理掉,避免 manifest 无限增长。
|
||
removed_wxids = [wxid for wxid in self._avatar_manifest.keys() if wxid not in self._head_images]
|
||
for wxid in removed_wxids:
|
||
removed_meta = self._avatar_manifest.pop(wxid, None) or {}
|
||
removed_contact_count += 1
|
||
if self._delete_avatar_file_by_name(str(removed_meta.get("file_name") or "").strip()):
|
||
removed_file_count += 1
|
||
manifest_changed = True
|
||
|
||
if manifest_changed:
|
||
self._save_avatar_manifest()
|
||
# 无论本轮 manifest 是否有变化,都顺手做一次目录对账,
|
||
# 保证历史异常中断留下的孤儿文件也能逐步被回收。
|
||
cleanup_stats = self._cleanup_avatar_cache_files()
|
||
self._logger.debug(
|
||
"头像缓存同步完成 "
|
||
f"total_head_images={len(self._head_images)} "
|
||
f"manifest_entries={len(self._avatar_manifest)} "
|
||
f"reuse={reuse_count} "
|
||
f"downloaded={download_count} "
|
||
f"replaced={replaced_count} "
|
||
f"removed_contacts={removed_contact_count} "
|
||
f"removed_files={removed_file_count} "
|
||
f"orphan_deleted={cleanup_stats.get('orphan_deleted', 0)} "
|
||
f"tmp_deleted={cleanup_stats.get('tmp_deleted', 0)}"
|
||
)
|
||
|
||
def get_cached_head_image_path(self, wxid: str) -> str:
|
||
"""返回头像缓存本地路径,若缓存不存在则返回空字符串。"""
|
||
with self._avatar_cache_lock:
|
||
meta = self._avatar_manifest.get(wxid) or {}
|
||
file_name = str(meta.get("file_name") or "").strip()
|
||
if not file_name or not self._avatar_cache_dir:
|
||
return ""
|
||
local_path = self._avatar_cache_dir / file_name
|
||
return str(local_path) if local_path.exists() else ""
|
||
|
||
def ensure_head_image_cached(self, wxid: str) -> str:
|
||
"""确保指定联系人的头像已缓存到本地,返回本地路径。"""
|
||
remote_url = str(self._head_images.get(wxid) or "").strip()
|
||
manifest_item = self._avatar_manifest.get(wxid) or {}
|
||
cached_path = self.get_cached_head_image_path(wxid)
|
||
# 如果本地已有缓存,且当前头像 URL 没变,就直接复用;
|
||
# 如果 URL 已变化,则继续往下重拉新头像,避免页面一直展示旧图。
|
||
if cached_path and (not remote_url or manifest_item.get("remote_url") == remote_url):
|
||
self._logger.debug(f"头像缓存命中 wxid={wxid} file={Path(cached_path).name}")
|
||
return cached_path
|
||
if not remote_url:
|
||
self._logger.debug(f"头像缓存缺失 wxid={wxid} reason=no_remote_url")
|
||
return ""
|
||
|
||
with self._avatar_cache_lock:
|
||
# 双重检查避免高并发下重复下载同一张头像。
|
||
manifest_item = self._avatar_manifest.get(wxid) or {}
|
||
old_file_name = str(manifest_item.get("file_name") or "").strip()
|
||
cached_path = self.get_cached_head_image_path(wxid)
|
||
if cached_path and manifest_item.get("remote_url") == remote_url:
|
||
self._logger.debug(f"头像缓存命中 wxid={wxid} file={Path(cached_path).name}")
|
||
return cached_path
|
||
downloaded_path = self._download_avatar_to_cache(wxid, remote_url)
|
||
if not downloaded_path:
|
||
self._logger.debug(f"头像缓存补下载失败 wxid={wxid}")
|
||
return ""
|
||
self._avatar_manifest[wxid] = {
|
||
"file_name": Path(downloaded_path).name,
|
||
"remote_url": remote_url,
|
||
}
|
||
# 单头像补下载时同样清掉旧版本,避免访问链路把目录越堆越大。
|
||
removed_file_count = 0
|
||
if old_file_name and old_file_name != Path(downloaded_path).name:
|
||
if self._delete_avatar_file_by_name(old_file_name):
|
||
removed_file_count += 1
|
||
self._save_avatar_manifest()
|
||
cleanup_stats = self._cleanup_avatar_cache_files()
|
||
self._logger.debug(
|
||
"头像缓存补下载完成 "
|
||
f"wxid={wxid} "
|
||
f"file={Path(downloaded_path).name} "
|
||
f"replaced={'Y' if bool(old_file_name and old_file_name != Path(downloaded_path).name) else 'N'} "
|
||
f"removed_files={removed_file_count} "
|
||
f"orphan_deleted={cleanup_stats.get('orphan_deleted', 0)} "
|
||
f"tmp_deleted={cleanup_stats.get('tmp_deleted', 0)}"
|
||
)
|
||
return downloaded_path
|
||
|
||
def get_head_image_version(self, wxid: str) -> str:
|
||
"""返回头像版本号,用于前端拼接缓存 bust 参数。"""
|
||
with self._avatar_cache_lock:
|
||
remote_url = str(self._head_images.get(wxid) or "").strip()
|
||
meta = self._avatar_manifest.get(wxid) or {}
|
||
file_name = str(meta.get("file_name") or "").strip()
|
||
version_source = f"{remote_url}|{file_name}"
|
||
return hashlib.sha1(version_source.encode("utf-8")).hexdigest()[:12] if version_source.strip("|") else ""
|
||
|
||
def _classify_contacts(self) -> None:
|
||
"""将联系人分类为群组、个人联系人、公共好友和公众号"""
|
||
self._group_contacts = {}
|
||
self._personal_contacts = {}
|
||
self._public_contacts = {}
|
||
self._official_accounts = {}
|
||
|
||
for wxid, nickname in self._contacts.items():
|
||
# 判断是否为公共好友
|
||
if wxid in self._PUBLIC_FRIENDS:
|
||
self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname)
|
||
# 判断是否为公众号(wxid以gh_开头)
|
||
elif wxid.startswith('gh_'):
|
||
self._official_accounts[wxid] = nickname
|
||
# 判断是否为群组(wxid以@chatroom结尾)
|
||
elif wxid.endswith('@chatroom'):
|
||
self._group_contacts[wxid] = nickname
|
||
# 确保群ID在字典中存在
|
||
if wxid not in self._group_contacts_friends:
|
||
self._group_contacts_friends[wxid] = {}
|
||
# 获取群成员信息:
|
||
for friend in self._group_members:
|
||
if friend.get('chatroom_id') == wxid:
|
||
self._group_contacts_friends[wxid].update(
|
||
{friend.get('wxid'): friend.get('nick_name', friend.get('wxid'))})
|
||
|
||
else:
|
||
# 判断 frinds 在contacts 里面,将在里面的用户分在
|
||
if wxid in self._friends:
|
||
self._personal_contacts[wxid] = nickname
|
||
self._logger.info(f"联系人分类完成: {len(self._group_contacts)} 个群组, "
|
||
f"{len(self._personal_contacts)} 个个人联系人, "
|
||
f"{len(self._public_contacts)} 个公共好友, "
|
||
f"{len(self._official_accounts)} 个公众号")
|
||
|
||
def get_contacts(self) -> Dict[str, str]:
|
||
"""获取所有联系人
|
||
|
||
Returns:
|
||
联系人字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._contacts
|
||
|
||
def get_group_contacts(self) -> Dict[str, str]:
|
||
"""获取所有群组联系人
|
||
|
||
Returns:
|
||
群组联系人字典,格式为 {"wxid": "GroupName"}
|
||
"""
|
||
return self._group_contacts
|
||
|
||
def get_personal_contacts(self) -> Dict[str, str]:
|
||
"""获取所有个人联系人
|
||
|
||
Returns:
|
||
个人联系人字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._personal_contacts
|
||
|
||
def get_public_contacts(self) -> Dict[str, str]:
|
||
"""获取所有公共好友
|
||
|
||
Returns:
|
||
公共好友字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._public_contacts
|
||
|
||
def get_official_accounts(self) -> Dict[str, str]:
|
||
"""获取所有公众号
|
||
|
||
Returns:
|
||
公众号字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._official_accounts
|
||
|
||
def get_nickname(self, wxid: str) -> str:
|
||
"""根据微信ID获取昵称
|
||
|
||
Args:
|
||
wxid: 微信ID
|
||
|
||
Returns:
|
||
对应的昵称,如果不存在则返回wxid本身
|
||
"""
|
||
return self._contacts.get(wxid, wxid)
|
||
|
||
def get_all_head_images(self) -> Dict[str, str]:
|
||
"""返回所有的头像信息
|
||
|
||
Returns:
|
||
头像 {"wxid": "http://xxxxx"}
|
||
"""
|
||
return self._head_images
|
||
|
||
def get_head_image(self, wxid: str) -> str:
|
||
"""根据微信ID获取头像
|
||
|
||
Args:
|
||
wxid: 微信ID
|
||
|
||
Returns:
|
||
对应的头像,如果不存在这返回""
|
||
"""
|
||
return self._head_images.get(wxid, "")
|
||
|
||
def update_head_image(self, wxid: str, head_image: str) -> bool:
|
||
"""根据微信ID更新头像
|
||
|
||
Args:
|
||
wxid: 微信ID
|
||
head_image:头像地址
|
||
Returns:
|
||
对应的头像,如果不存在这返回""
|
||
"""
|
||
self._head_images.update({wxid: head_image})
|
||
# 群成员头像变化通常意味着微信端已经给了新的资源地址,
|
||
# 这里即时重拉一次本地缓存,保证通讯录和后续渲染尽快拿到最新头像。
|
||
self.ensure_head_image_cached(wxid)
|
||
return True
|
||
|
||
def get_group_name(self, roomid: str, wxid: str) -> str:
|
||
"""
|
||
Args:
|
||
roomid: 群ID
|
||
wxid: 微信ID
|
||
|
||
Returns:
|
||
对应的昵称,如果不存在则返回wxid本身
|
||
"""
|
||
return self._group_contacts_friends.get(roomid, "").get(wxid, "未知昵称")
|
||
|
||
def get_group_members(self, roomid: str) -> Dict[str, str]:
|
||
"""获取指定群的成员列表
|
||
|
||
Args:
|
||
roomid: 群ID
|
||
|
||
Returns:
|
||
群成员字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._group_contacts_friends.get(roomid, {})
|
||
|
||
def update_group_members(self, roomid: str, wxid: str, nick_name: str) -> bool:
|
||
"""更新指定群的成员列表
|
||
|
||
Args:
|
||
roomid: 群ID
|
||
|
||
Returns:
|
||
群成员字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
self._group_contacts_friends[roomid].update({wxid: nick_name})
|
||
return True
|
||
|
||
def update_contact(self, wxid: str, nickname: str) -> None:
|
||
"""更新单个联系人信息
|
||
|
||
Args:
|
||
wxid: 微信ID
|
||
nickname: 昵称
|
||
"""
|
||
self._contacts[wxid] = nickname
|
||
# 更新分类
|
||
if wxid in self._PUBLIC_FRIENDS:
|
||
self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname)
|
||
elif wxid.startswith('gh_'):
|
||
self._official_accounts[wxid] = nickname
|
||
elif wxid.endswith('@chatroom'):
|
||
self._group_contacts[wxid] = nickname
|
||
# 确保群ID在字典中存在
|
||
if wxid not in self._group_contacts_friends:
|
||
self._group_contacts_friends[wxid] = {}
|
||
# 获取群成员信息:
|
||
for friend in self._group_members:
|
||
if friend.get('chatroom_id') == wxid:
|
||
self._group_contacts_friends[wxid].update(
|
||
{friend.get('wxid'): friend.get('nick_name', friend.get('wxid'))})
|
||
else:
|
||
if wxid in self._friends:
|
||
self._personal_contacts[wxid] = nickname
|
||
self._logger.debug(f"已更新联系人: {wxid} -> {nickname}")
|
||
|
||
def get_contact_statistics(self) -> Tuple[int, int, int, int, int]:
|
||
"""获取联系人统计信息
|
||
|
||
Returns:
|
||
包含总联系人数、群组数、个人联系人数、公共好友数和公众号数的元组
|
||
"""
|
||
return (len(self._contacts), len(self._group_contacts),
|
||
len(self._personal_contacts), len(self._public_contacts),
|
||
len(self._official_accounts))
|