# -*- coding: utf-8 -*- """ 联系人管理器 - 提供全局访问联系人信息的单例类 """ from loguru import logger from typing import Dict, Optional, List, Tuple from utils.json_converter import json_to_object class ContactManager: """联系人管理器单例类""" _instance = None _contacts: Dict[str, str] = {} _group_contacts: Dict[str, str] = {} # 群组联系人 _personal_contacts: Dict[str, str] = {} # 个人联系人 _public_contacts: Dict[str, str] = {} # 公共好友 _official_accounts: Dict[str, str] = {} # 公众号 _head_images: Dict[str, str] = {} # 头像信息 _initialized = False _logger = logger _friends: List[str] = [] _group_members: List[Dict] = [] _group_contacts_friends: Dict[str, Dict[str, str]] = {} # 定义公共好友列表 _PUBLIC_FRIENDS = { 'fmessage': '朋友推荐消息', 'medianote': '语音记事本', 'floatbottle': '漂流瓶', 'mphelper': '公众平台安全助手', 'filehelper': '文件传输助手', 'exmail_tool': '腾讯企业邮箱', 'weixin': '微信团队' } def __new__(cls): if cls._instance is None: cls._instance = super(ContactManager, cls).__new__(cls) return cls._instance def __init__(self): # 确保初始化代码只执行一次 if not ContactManager._initialized: self._logger.info("初始化联系人管理器") ContactManager._initialized = True @classmethod def get_instance(cls): """获取单例实例""" if cls._instance is None: cls._instance = ContactManager() return cls._instance def set_contacts(self, contacts: Dict[str, str], friends: List[str], head_imgs: Dict[str, str], chatroom_members: List[dict]) -> None: """设置联系人字典 Args: contacts: 联系人字典,格式为 {"wxid": "NickName"} head_imgs: 联系人头像字典,格式为 {"wxid": "http://xxxxx"} friends: 好友清单 contact = { "wxid": cnt.get("wxid", ""), "code": cnt.get("code", ""), "remark": cnt.get("remark", ""), "name": cnt.get("name", ""), "country": cnt.get("country", ""), "province": cnt.get("province", ""), "city": cnt.get("city", ""), "gender": gender} chatroom_members: 所有的群成员昵称信息 """ self._contacts = contacts self._friends = friends self._head_images = head_imgs self._group_members = chatroom_members self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人") # 分类联系人 self._classify_contacts() def _classify_contacts(self) -> None: """将联系人分类为群组、个人联系人、公共好友和公众号""" self._group_contacts = {} self._personal_contacts = {} self._public_contacts = {} self._official_accounts = {} for wxid, nickname in self._contacts.items(): # 判断是否为公共好友 if wxid in self._PUBLIC_FRIENDS: self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname) # 判断是否为公众号(wxid以gh_开头) elif wxid.startswith('gh_'): self._official_accounts[wxid] = nickname # 判断是否为群组(wxid以@chatroom结尾) elif wxid.endswith('@chatroom'): self._group_contacts[wxid] = nickname # 确保群ID在字典中存在 if wxid not in self._group_contacts_friends: self._group_contacts_friends[wxid] = {} # 获取群成员信息: for friend in self._group_members: if friend.get('chatroom_id') == wxid: self._group_contacts_friends[wxid].update( {friend.get('wxid'): friend.get('nick_name', friend.get('wxid'))}) else: # 判断 frinds 在contacts 里面,将在里面的用户分在 if wxid in self._friends: self._personal_contacts[wxid] = nickname self._logger.info(f"联系人分类完成: {len(self._group_contacts)} 个群组, " f"{len(self._personal_contacts)} 个个人联系人, " f"{len(self._public_contacts)} 个公共好友, " f"{len(self._official_accounts)} 个公众号") def get_contacts(self) -> Dict[str, str]: """获取所有联系人 Returns: 联系人字典,格式为 {"wxid": "NickName"} """ return self._contacts def get_group_contacts(self) -> Dict[str, str]: """获取所有群组联系人 Returns: 群组联系人字典,格式为 {"wxid": "GroupName"} """ return self._group_contacts def get_personal_contacts(self) -> Dict[str, str]: """获取所有个人联系人 Returns: 个人联系人字典,格式为 {"wxid": "NickName"} """ return self._personal_contacts def get_public_contacts(self) -> Dict[str, str]: """获取所有公共好友 Returns: 公共好友字典,格式为 {"wxid": "NickName"} """ return self._public_contacts def get_official_accounts(self) -> Dict[str, str]: """获取所有公众号 Returns: 公众号字典,格式为 {"wxid": "NickName"} """ return self._official_accounts def get_nickname(self, wxid: str) -> str: """根据微信ID获取昵称 Args: wxid: 微信ID Returns: 对应的昵称,如果不存在则返回wxid本身 """ return self._contacts.get(wxid, wxid) def get_all_head_images(self) -> Dict[str, str]: """返回所有的头像信息 Returns: 头像 {"wxid": "http://xxxxx"} """ return self._head_images def get_head_image(self, wxid: str) -> str: """根据微信ID获取头像 Args: wxid: 微信ID Returns: 对应的头像,如果不存在这返回"" """ return self._head_images.get(wxid, "") def update_head_image(self, wxid: str, head_image: str) -> bool: """根据微信ID更新头像 Args: wxid: 微信ID head_image:头像地址 Returns: 对应的头像,如果不存在这返回"" """ self._head_images.update({wxid: head_image}) return True def get_group_name(self, roomid: str, wxid: str) -> str: """ Args: roomid: 群ID wxid: 微信ID Returns: 对应的昵称,如果不存在则返回wxid本身 """ return self._group_contacts_friends.get(roomid, "").get(wxid, "未知昵称") def get_group_members(self, roomid: str) -> Dict[str, str]: """获取指定群的成员列表 Args: roomid: 群ID Returns: 群成员字典,格式为 {"wxid": "NickName"} """ return self._group_contacts_friends.get(roomid, {}) def update_group_members(self, roomid: str, wxid: str, nick_name: str) -> bool: """更新指定群的成员列表 Args: roomid: 群ID Returns: 群成员字典,格式为 {"wxid": "NickName"} """ self._group_contacts_friends[roomid].update({wxid: nick_name}) return True def update_contact(self, wxid: str, nickname: str) -> None: """更新单个联系人信息 Args: wxid: 微信ID nickname: 昵称 """ self._contacts[wxid] = nickname # 更新分类 if wxid in self._PUBLIC_FRIENDS: self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname) elif wxid.startswith('gh_'): self._official_accounts[wxid] = nickname elif wxid.endswith('@chatroom'): self._group_contacts[wxid] = nickname # 确保群ID在字典中存在 if wxid not in self._group_contacts_friends: self._group_contacts_friends[wxid] = {} # 获取群成员信息: for friend in self._group_members: if friend.get('chatroom_id') == wxid: self._group_contacts_friends[wxid].update( {friend.get('wxid'): friend.get('nick_name', friend.get('wxid'))}) else: if wxid in self._friends: self._personal_contacts[wxid] = nickname self._logger.debug(f"已更新联系人: {wxid} -> {nickname}") def get_contact_statistics(self) -> Tuple[int, int, int, int, int]: """获取联系人统计信息 Returns: 包含总联系人数、群组数、个人联系人数、公共好友数和公众号数的元组 """ return (len(self._contacts), len(self._group_contacts), len(self._personal_contacts), len(self._public_contacts), len(self._official_accounts))