244 lines
8.3 KiB
Python
244 lines
8.3 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
联系人管理器 - 提供全局访问联系人信息的单例类
|
||
"""
|
||
|
||
from loguru import logger
|
||
from typing import Dict, Optional, List, Tuple
|
||
|
||
|
||
from utils.json_converter import json_to_object
|
||
|
||
|
||
class ContactManager:
|
||
"""联系人管理器单例类"""
|
||
_instance = None
|
||
_contacts: Dict[str, str] = {}
|
||
_group_contacts: Dict[str, str] = {} # 群组联系人
|
||
_personal_contacts: Dict[str, str] = {} # 个人联系人
|
||
_public_contacts: Dict[str, str] = {} # 公共好友
|
||
_official_accounts: Dict[str, str] = {} # 公众号
|
||
_head_images: Dict[str, str] = {} # 头像信息
|
||
_initialized = False
|
||
_logger = logger
|
||
_friends: List[Dict] = []
|
||
_group_contacts_friends: Dict[str, Dict[str, str]] = {}
|
||
# 定义公共好友列表
|
||
_PUBLIC_FRIENDS = {
|
||
'fmessage': '朋友推荐消息',
|
||
'medianote': '语音记事本',
|
||
'floatbottle': '漂流瓶',
|
||
'mphelper': '公众平台安全助手',
|
||
'filehelper': '文件传输助手',
|
||
'exmail_tool': '腾讯企业邮箱',
|
||
'weixin': '微信团队'
|
||
}
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super(ContactManager, cls).__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
# 确保初始化代码只执行一次
|
||
if not ContactManager._initialized:
|
||
self._logger.info("初始化联系人管理器")
|
||
ContactManager._initialized = True
|
||
|
||
@classmethod
|
||
def get_instance(cls):
|
||
"""获取单例实例"""
|
||
if cls._instance is None:
|
||
cls._instance = ContactManager()
|
||
return cls._instance
|
||
|
||
def set_contacts(self, contacts: Dict[str, str]) -> None:
|
||
"""设置联系人字典
|
||
|
||
Args:
|
||
contacts: 联系人字典,格式为 {"wxid": "NickName"}
|
||
head_imgs: 联系人头像字典,格式为 {"wxid": "http://xxxxx"}
|
||
friends: 好友清单 contact = {
|
||
"wxid": cnt.get("wxid", ""),
|
||
"code": cnt.get("code", ""),
|
||
"remark": cnt.get("remark", ""),
|
||
"name": cnt.get("name", ""),
|
||
"country": cnt.get("country", ""),
|
||
"province": cnt.get("province", ""),
|
||
"city": cnt.get("city", ""),
|
||
"gender": gender}
|
||
"""
|
||
self._contacts = contacts
|
||
self._friends = contacts
|
||
self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人")
|
||
# 分类联系人
|
||
self._classify_contacts()
|
||
|
||
def _classify_contacts(self) -> None:
|
||
"""将联系人分类为群组、个人联系人、公共好友和公众号"""
|
||
self._group_contacts = {}
|
||
self._personal_contacts = {}
|
||
self._public_contacts = {}
|
||
self._official_accounts = {}
|
||
|
||
for wxid, nickname in self._contacts.items():
|
||
# 判断是否为公共好友
|
||
if wxid in self._PUBLIC_FRIENDS:
|
||
self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname)
|
||
# 判断是否为公众号(wxid以gh_开头)
|
||
elif wxid.startswith('gh_'):
|
||
self._official_accounts[wxid] = nickname
|
||
# 判断是否为群组(wxid以@chatroom结尾)
|
||
elif wxid.endswith('@chatroom'):
|
||
self._group_contacts[wxid] = nickname
|
||
|
||
self._logger.info(f"联系人分类完成: {len(self._group_contacts)} 个群组, "
|
||
f"{len(self._personal_contacts)} 个个人联系人, "
|
||
f"{len(self._public_contacts)} 个公共好友, "
|
||
f"{len(self._official_accounts)} 个公众号")
|
||
|
||
def get_contacts(self) -> Dict[str, str]:
|
||
"""获取所有联系人
|
||
|
||
Returns:
|
||
联系人字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._contacts
|
||
|
||
def get_group_contacts(self) -> Dict[str, str]:
|
||
"""获取所有群组联系人
|
||
|
||
Returns:
|
||
群组联系人字典,格式为 {"wxid": "GroupName"}
|
||
"""
|
||
return self._group_contacts
|
||
|
||
def get_personal_contacts(self) -> Dict[str, str]:
|
||
"""获取所有个人联系人
|
||
|
||
Returns:
|
||
个人联系人字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._personal_contacts
|
||
|
||
def get_public_contacts(self) -> Dict[str, str]:
|
||
"""获取所有公共好友
|
||
|
||
Returns:
|
||
公共好友字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._public_contacts
|
||
|
||
def get_official_accounts(self) -> Dict[str, str]:
|
||
"""获取所有公众号
|
||
|
||
Returns:
|
||
公众号字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._official_accounts
|
||
|
||
def get_nickname(self, wxid: str) -> str:
|
||
"""根据微信ID获取昵称
|
||
|
||
Args:
|
||
wxid: 微信ID
|
||
|
||
Returns:
|
||
对应的昵称,如果不存在则返回wxid本身
|
||
"""
|
||
return self._contacts.get(wxid, wxid)
|
||
|
||
def get_all_head_images(self) -> Dict[str, str]:
|
||
"""返回所有的头像信息
|
||
|
||
Returns:
|
||
头像 {"wxid": "http://xxxxx"}
|
||
"""
|
||
return self._head_images
|
||
|
||
def get_head_image(self, wxid: str) -> str:
|
||
"""根据微信ID获取头像
|
||
|
||
Args:
|
||
wxid: 微信ID
|
||
|
||
Returns:
|
||
对应的头像,如果不存在这返回""
|
||
"""
|
||
return self._head_images.get(wxid, "")
|
||
|
||
def get_group_name(self, roomid: str, wxid: str) -> str:
|
||
"""
|
||
Args:
|
||
roomid: 群ID
|
||
wxid: 微信ID
|
||
|
||
Returns:
|
||
对应的昵称,如果不存在则返回wxid本身
|
||
"""
|
||
return self._group_contacts_friends.get(roomid, "").get(wxid, "未知昵称")
|
||
|
||
def get_group_members(self, roomid: str) -> Dict[str, str]:
|
||
"""获取指定群的成员列表
|
||
|
||
Args:
|
||
roomid: 群ID
|
||
|
||
Returns:
|
||
群成员字典,格式为 {"wxid": "NickName"}
|
||
"""
|
||
return self._group_contacts_friends.get(roomid, {})
|
||
|
||
def update_contact(self, wxid: str, nickname: str) -> None:
|
||
"""更新单个联系人信息
|
||
|
||
Args:
|
||
wxid: 微信ID
|
||
nickname: 昵称
|
||
"""
|
||
self._contacts[wxid] = nickname
|
||
# 更新分类
|
||
if wxid in self._PUBLIC_FRIENDS:
|
||
self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname)
|
||
elif wxid.startswith('gh_'):
|
||
self._official_accounts[wxid] = nickname
|
||
elif wxid.endswith('@chatroom'):
|
||
self._group_contacts[wxid] = nickname
|
||
# 需要获取群成员昵称信息; 从数据库里面提取。
|
||
# self._group_contacts_friends[wxid] = {}
|
||
else:
|
||
self._personal_contacts[wxid] = nickname
|
||
self._logger.debug(f"已更新联系人: {wxid} -> {nickname}")
|
||
|
||
def refresh_contacts(self, new_contacts: Dict[str, str]) -> None:
|
||
"""刷新联系人信息
|
||
|
||
Args:
|
||
new_contacts: 新的联系人字典
|
||
head_imgs: 联系人头像字典,格式为 {"wxid": "http://xxxxx"}
|
||
wcf :wcf
|
||
|
||
"""
|
||
self._contacts = new_contacts
|
||
# friends: 好友清单 contact = {
|
||
# "wxid": cnt.get("wxid", ""),
|
||
# "code": cnt.get("code", ""),
|
||
# "remark": cnt.get("remark", ""),
|
||
# "name": cnt.get("name", ""),
|
||
# "country": cnt.get("country", ""),
|
||
# "province": cnt.get("province", ""),
|
||
# "city": cnt.get("city", ""),
|
||
# "gender": gender}
|
||
self._logger.info(f"联系人信息已刷新,共 {len(new_contacts)} 个联系人")
|
||
self._classify_contacts()
|
||
|
||
def get_contact_statistics(self) -> Tuple[int, int, int, int, int]:
|
||
"""获取联系人统计信息
|
||
|
||
Returns:
|
||
包含总联系人数、群组数、个人联系人数、公共好友数和公众号数的元组
|
||
"""
|
||
return (len(self._contacts), len(self._group_contacts),
|
||
len(self._personal_contacts), len(self._public_contacts),
|
||
len(self._official_accounts))
|