调整联系人管理功能

This commit is contained in:
liuwei
2025-04-08 14:30:59 +08:00
parent 2d775c227e
commit fc7ecbac8a
2 changed files with 45 additions and 22 deletions

View File

@@ -58,7 +58,7 @@ class Robot(Job):
# 初始化联系人管理器并设置联系人
self.contact_manager = ContactManager.get_instance()
self.allContacts = self.get_all_contacts()
self.contact_manager.set_contacts(self.allContacts)
self.contact_manager.set_contacts(self.allContacts, self.wcf.get_friends())
self.LOG.info(f"DB+REDIS 连接池开始初始化")
# 使用单例模式获取实例
@@ -545,5 +545,5 @@ class Robot(Job):
def refresh_contacts(self):
"""刷新联系人信息"""
self.allContacts = self.get_all_contacts()
self.contact_manager.refresh_contacts(self.allContacts)
self.contact_manager.refresh_contacts(self.allContacts, self.wcf.get_friends())
self.LOG.info("联系人信息已刷新")

View File

@@ -16,12 +16,12 @@ class ContactManager:
_official_accounts: Dict[str, str] = {} # 公众号
_initialized = False
_logger = logging.getLogger("ContactManager")
_friends: List[Dict] = []
# 定义公共好友列表
_PUBLIC_FRIENDS = {
'fmessage': '朋友推荐消息',
'medianote': '语音记事本',
'floatbottle': '漂流瓶',
'fmessage': '朋友推荐消息',
'medianote': '语音记事本',
'floatbottle': '漂流瓶',
'mphelper': '公众平台安全助手',
'filehelper': '文件传输助手'
}
@@ -44,13 +44,23 @@ class ContactManager:
cls._instance = ContactManager()
return cls._instance
def set_contacts(self, contacts: Dict[str, str]) -> None:
def set_contacts(self, contacts: Dict[str, str], friends: List[Dict]) -> None:
"""设置联系人字典
Args:
contacts: 联系人字典,格式为 {"wxid": "NickName"}
friends: 好友清单 contact = {
"wxid": cnt.get("wxid", ""),
"code": cnt.get("code", ""),
"remark": cnt.get("remark", ""),
"name": cnt.get("name", ""),
"country": cnt.get("country", ""),
"province": cnt.get("province", ""),
"city": cnt.get("city", ""),
"gender": gender}
"""
self._contacts = contacts
self._friends = friends
self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人")
# 分类联系人
self._classify_contacts()
@@ -61,7 +71,7 @@ class ContactManager:
self._personal_contacts = {}
self._public_contacts = {}
self._official_accounts = {}
for wxid, nickname in self._contacts.items():
# 判断是否为公共好友
if wxid in self._PUBLIC_FRIENDS:
@@ -72,14 +82,17 @@ class ContactManager:
# 判断是否为群组wxid以@chatroom结尾
elif wxid.endswith('@chatroom'):
self._group_contacts[wxid] = nickname
# 其他为普通好友和群成员
else:
self._personal_contacts[wxid] = nickname
# # 其他为普通好友和群成员
# else:
# self._personal_contacts[wxid] = nickname
for friend in self._friends:
# 过滤掉非好友
self._personal_contacts[friend.get("wxid", "")] = friend.get("name", "")
self._logger.info(f"联系人分类完成: {len(self._group_contacts)} 个群组, "
f"{len(self._personal_contacts)} 个个人联系人, "
f"{len(self._public_contacts)} 个公共好友, "
f"{len(self._official_accounts)} 个公众号")
f"{len(self._personal_contacts)} 个个人联系人, "
f"{len(self._public_contacts)} 个公共好友, "
f"{len(self._official_accounts)} 个公众号")
def get_contacts(self) -> Dict[str, str]:
"""获取所有联系人
@@ -96,7 +109,7 @@ class ContactManager:
群组联系人字典,格式为 {"wxid": "GroupName"}
"""
return self._group_contacts
def get_personal_contacts(self) -> Dict[str, str]:
"""获取所有个人联系人
@@ -104,7 +117,7 @@ class ContactManager:
个人联系人字典,格式为 {"wxid": "NickName"}
"""
return self._personal_contacts
def get_public_contacts(self) -> Dict[str, str]:
"""获取所有公共好友
@@ -112,7 +125,7 @@ class ContactManager:
公共好友字典,格式为 {"wxid": "NickName"}
"""
return self._public_contacts
def get_official_accounts(self) -> Dict[str, str]:
"""获取所有公众号
@@ -151,23 +164,33 @@ class ContactManager:
self._personal_contacts[wxid] = nickname
self._logger.debug(f"已更新联系人: {wxid} -> {nickname}")
def refresh_contacts(self, new_contacts: Dict[str, str]) -> None:
def refresh_contacts(self, new_contacts: Dict[str, str], friends: List[Dict]) -> None:
"""刷新联系人信息
Args:
new_contacts: 新的联系人字典
friends: 好友清单 contact = {
"wxid": cnt.get("wxid", ""),
"code": cnt.get("code", ""),
"remark": cnt.get("remark", ""),
"name": cnt.get("name", ""),
"country": cnt.get("country", ""),
"province": cnt.get("province", ""),
"city": cnt.get("city", ""),
"gender": gender}
"""
self._contacts = new_contacts
self._friends = friends
self._logger.info(f"联系人信息已刷新,共 {len(new_contacts)} 个联系人")
# 重新分类联系人
self._classify_contacts()
def get_contact_statistics(self) -> Tuple[int, int, int, int, int]:
"""获取联系人统计信息
Returns:
包含总联系人数、群组数、个人联系人数、公共好友数和公众号数的元组
"""
return (len(self._contacts), len(self._group_contacts),
return (len(self._contacts), len(self._group_contacts),
len(self._personal_contacts), len(self._public_contacts),
len(self._official_accounts))
len(self._official_accounts))