Files
abot/utils/wechat/contact_manager.py

291 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
联系人管理器 - 提供全局访问联系人信息的单例类
"""
from loguru import logger
from typing import Dict, Optional, List, Tuple
from utils.json_converter import json_to_object
class ContactManager:
"""联系人管理器单例类"""
_instance = None
_contacts: Dict[str, str] = {}
_group_contacts: Dict[str, str] = {} # 群组联系人
_personal_contacts: Dict[str, str] = {} # 个人联系人
_public_contacts: Dict[str, str] = {} # 公共好友
_official_accounts: Dict[str, str] = {} # 公众号
_head_images: Dict[str, str] = {} # 头像信息
_initialized = False
_logger = logger
_friends: List[str] = []
_group_members: List[Dict] = []
_group_contacts_friends: Dict[str, Dict[str, str]] = {}
# 定义公共好友列表
_PUBLIC_FRIENDS = {
'fmessage': '朋友推荐消息',
'medianote': '语音记事本',
'floatbottle': '漂流瓶',
'mphelper': '公众平台安全助手',
'filehelper': '文件传输助手',
'exmail_tool': '腾讯企业邮箱',
'weixin': '微信团队'
}
def __new__(cls):
if cls._instance is None:
cls._instance = super(ContactManager, cls).__new__(cls)
return cls._instance
def __init__(self):
# 确保初始化代码只执行一次
if not ContactManager._initialized:
self._logger.info("初始化联系人管理器")
ContactManager._initialized = True
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
cls._instance = ContactManager()
return cls._instance
def set_contacts(self, contacts: Dict[str, str], friends: List[str], head_imgs: Dict[str, str],
chatroom_members: List[dict]) -> None:
"""设置联系人字典
Args:
contacts: 联系人字典,格式为 {"wxid": "NickName"}
head_imgs: 联系人头像字典,格式为 {"wxid": "http://xxxxx"}
friends: 好友清单 contact = {
"wxid": cnt.get("wxid", ""),
"code": cnt.get("code", ""),
"remark": cnt.get("remark", ""),
"name": cnt.get("name", ""),
"country": cnt.get("country", ""),
"province": cnt.get("province", ""),
"city": cnt.get("city", ""),
"gender": gender}
chatroom_members: 所有的群成员昵称信息
"""
self._contacts = contacts
self._friends = friends
self._head_images = head_imgs
self._group_members = chatroom_members
self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人")
# 分类联系人
self._classify_contacts()
def _classify_contacts(self) -> None:
"""将联系人分类为群组、个人联系人、公共好友和公众号"""
self._group_contacts = {}
self._personal_contacts = {}
self._public_contacts = {}
self._official_accounts = {}
for wxid, nickname in self._contacts.items():
# 判断是否为公共好友
if wxid in self._PUBLIC_FRIENDS:
self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname)
# 判断是否为公众号wxid以gh_开头
elif wxid.startswith('gh_'):
self._official_accounts[wxid] = nickname
# 判断是否为群组wxid以@chatroom结尾
elif wxid.endswith('@chatroom'):
self._group_contacts[wxid] = nickname
# 确保群ID在字典中存在
if wxid not in self._group_contacts_friends:
self._group_contacts_friends[wxid] = {}
# 获取群成员信息:
for friend in self._group_members:
if friend.get('chatroom_id') == wxid:
self._group_contacts_friends[wxid].update(
{friend.get('wxid'): friend.get('nick_name', friend.get('wxid'))})
else:
# 判断 frinds 在contacts 里面,将在里面的用户分在
if wxid in self._friends:
self._personal_contacts[wxid] = nickname
self._logger.info(f"联系人分类完成: {len(self._group_contacts)} 个群组, "
f"{len(self._personal_contacts)} 个个人联系人, "
f"{len(self._public_contacts)} 个公共好友, "
f"{len(self._official_accounts)} 个公众号")
def get_contacts(self) -> Dict[str, str]:
"""获取所有联系人
Returns:
联系人字典,格式为 {"wxid": "NickName"}
"""
return self._contacts
def get_group_contacts(self) -> Dict[str, str]:
"""获取所有群组联系人
Returns:
群组联系人字典,格式为 {"wxid": "GroupName"}
"""
return self._group_contacts
def get_personal_contacts(self) -> Dict[str, str]:
"""获取所有个人联系人
Returns:
个人联系人字典,格式为 {"wxid": "NickName"}
"""
return self._personal_contacts
def get_public_contacts(self) -> Dict[str, str]:
"""获取所有公共好友
Returns:
公共好友字典,格式为 {"wxid": "NickName"}
"""
return self._public_contacts
def get_official_accounts(self) -> Dict[str, str]:
"""获取所有公众号
Returns:
公众号字典,格式为 {"wxid": "NickName"}
"""
return self._official_accounts
def get_nickname(self, wxid: str) -> str:
"""根据微信ID获取昵称
Args:
wxid: 微信ID
Returns:
对应的昵称如果不存在则返回wxid本身
"""
return self._contacts.get(wxid, wxid)
def get_all_head_images(self) -> Dict[str, str]:
"""返回所有的头像信息
Returns:
头像 {"wxid": "http://xxxxx"}
"""
return self._head_images
def get_head_image(self, wxid: str) -> str:
"""根据微信ID获取头像
Args:
wxid: 微信ID
Returns:
对应的头像,如果不存在这返回""
"""
return self._head_images.get(wxid, "")
def update_head_image(self, wxid: str, head_image: str) -> bool:
"""根据微信ID更新头像
Args:
wxid: 微信ID
head_image:头像地址
Returns:
对应的头像,如果不存在这返回""
"""
self._head_images.update({wxid: head_image})
return True
def get_group_name(self, roomid: str, wxid: str) -> str:
"""
Args:
roomid: 群ID
wxid: 微信ID
Returns:
对应的昵称如果不存在则返回wxid本身
"""
return self._group_contacts_friends.get(roomid, "").get(wxid, "未知昵称")
def get_group_members(self, roomid: str) -> Dict[str, str]:
"""获取指定群的成员列表
Args:
roomid: 群ID
Returns:
群成员字典,格式为 {"wxid": "NickName"}
"""
return self._group_contacts_friends.get(roomid, {})
def update_group_members(self, roomid: str, wxid: str, nick_name: str) -> bool:
"""更新指定群的成员列表
Args:
roomid: 群ID
Returns:
群成员字典,格式为 {"wxid": "NickName"}
"""
self._group_contacts_friends[roomid].update({wxid: nick_name})
return True
def update_contact(self, wxid: str, nickname: str) -> None:
"""更新单个联系人信息
Args:
wxid: 微信ID
nickname: 昵称
"""
self._contacts[wxid] = nickname
# 更新分类
if wxid in self._PUBLIC_FRIENDS:
self._public_contacts[wxid] = self._PUBLIC_FRIENDS.get(wxid, nickname)
elif wxid.startswith('gh_'):
self._official_accounts[wxid] = nickname
elif wxid.endswith('@chatroom'):
self._group_contacts[wxid] = nickname
# 确保群ID在字典中存在
if wxid not in self._group_contacts_friends:
self._group_contacts_friends[wxid] = {}
# 获取群成员信息:
for friend in self._group_members:
if friend.get('chatroom_id') == wxid:
self._group_contacts_friends[wxid].update(
{friend.get('wxid'): friend.get('nick_name', friend.get('wxid'))})
else:
if wxid in self._friends:
self._personal_contacts[wxid] = nickname
self._logger.debug(f"已更新联系人: {wxid} -> {nickname}")
def refresh_contacts(self, new_contacts: Dict[str, str], friends: List[str]) -> None:
"""刷新联系人信息
Args:
new_contacts: 新的联系人字典
friends: 联系人信息,格式为 ["wxid","wxid2"]
"""
self._contacts = new_contacts
self._friends = friends
# friends: 好友清单 contact = {
# "wxid": cnt.get("wxid", ""),
# "code": cnt.get("code", ""),
# "remark": cnt.get("remark", ""),
# "name": cnt.get("name", ""),
# "country": cnt.get("country", ""),
# "province": cnt.get("province", ""),
# "city": cnt.get("city", ""),
# "gender": gender}
self._logger.info(f"联系人信息已刷新,共 {len(new_contacts)} 个联系人")
self._classify_contacts()
def get_contact_statistics(self) -> Tuple[int, int, int, int, int]:
"""获取联系人统计信息
Returns:
包含总联系人数、群组数、个人联系人数、公共好友数和公众号数的元组
"""
return (len(self._contacts), len(self._group_contacts),
len(self._personal_contacts), len(self._public_contacts),
len(self._official_accounts))