From fc7ecbac8ac938695b61d0ed12bdde5c216317dc Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 8 Apr 2025 14:30:59 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E8=81=94=E7=B3=BB=E4=BA=BA?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- robot.py | 4 +-- utils/wechat/contact_manager.py | 63 ++++++++++++++++++++++----------- 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/robot.py b/robot.py index 165c50d..479f97f 100644 --- a/robot.py +++ b/robot.py @@ -58,7 +58,7 @@ class Robot(Job): # 初始化联系人管理器并设置联系人 self.contact_manager = ContactManager.get_instance() self.allContacts = self.get_all_contacts() - self.contact_manager.set_contacts(self.allContacts) + self.contact_manager.set_contacts(self.allContacts, self.wcf.get_friends()) self.LOG.info(f"DB+REDIS 连接池开始初始化") # 使用单例模式获取实例 @@ -545,5 +545,5 @@ class Robot(Job): def refresh_contacts(self): """刷新联系人信息""" self.allContacts = self.get_all_contacts() - self.contact_manager.refresh_contacts(self.allContacts) + self.contact_manager.refresh_contacts(self.allContacts, self.wcf.get_friends()) self.LOG.info("联系人信息已刷新") diff --git a/utils/wechat/contact_manager.py b/utils/wechat/contact_manager.py index bcfba39..16f4f15 100644 --- a/utils/wechat/contact_manager.py +++ b/utils/wechat/contact_manager.py @@ -16,12 +16,12 @@ class ContactManager: _official_accounts: Dict[str, str] = {} # 公众号 _initialized = False _logger = logging.getLogger("ContactManager") - + _friends: List[Dict] = [] # 定义公共好友列表 _PUBLIC_FRIENDS = { - 'fmessage': '朋友推荐消息', - 'medianote': '语音记事本', - 'floatbottle': '漂流瓶', + 'fmessage': '朋友推荐消息', + 'medianote': '语音记事本', + 'floatbottle': '漂流瓶', 'mphelper': '公众平台安全助手', 'filehelper': '文件传输助手' } @@ -44,13 +44,23 @@ class ContactManager: cls._instance = ContactManager() return cls._instance - def set_contacts(self, contacts: Dict[str, str]) -> None: + def set_contacts(self, contacts: Dict[str, str], friends: List[Dict]) -> None: """设置联系人字典 Args: contacts: 联系人字典,格式为 {"wxid": "NickName"} + friends: 好友清单 contact = { + "wxid": cnt.get("wxid", ""), + "code": cnt.get("code", ""), + "remark": cnt.get("remark", ""), + "name": cnt.get("name", ""), + "country": cnt.get("country", ""), + "province": cnt.get("province", ""), + "city": cnt.get("city", ""), + "gender": gender} """ self._contacts = contacts + self._friends = friends self._logger.info(f"联系人信息已更新,共 {len(contacts)} 个联系人") # 分类联系人 self._classify_contacts() @@ -61,7 +71,7 @@ class ContactManager: self._personal_contacts = {} self._public_contacts = {} self._official_accounts = {} - + for wxid, nickname in self._contacts.items(): # 判断是否为公共好友 if wxid in self._PUBLIC_FRIENDS: @@ -72,14 +82,17 @@ class ContactManager: # 判断是否为群组(wxid以@chatroom结尾) elif wxid.endswith('@chatroom'): self._group_contacts[wxid] = nickname - # 其他为普通好友和群成员 - else: - self._personal_contacts[wxid] = nickname - + # # 其他为普通好友和群成员 + # else: + # self._personal_contacts[wxid] = nickname + for friend in self._friends: + # 过滤掉非好友 + self._personal_contacts[friend.get("wxid", "")] = friend.get("name", "") + self._logger.info(f"联系人分类完成: {len(self._group_contacts)} 个群组, " - f"{len(self._personal_contacts)} 个个人联系人, " - f"{len(self._public_contacts)} 个公共好友, " - f"{len(self._official_accounts)} 个公众号") + f"{len(self._personal_contacts)} 个个人联系人, " + f"{len(self._public_contacts)} 个公共好友, " + f"{len(self._official_accounts)} 个公众号") def get_contacts(self) -> Dict[str, str]: """获取所有联系人 @@ -96,7 +109,7 @@ class ContactManager: 群组联系人字典,格式为 {"wxid": "GroupName"} """ return self._group_contacts - + def get_personal_contacts(self) -> Dict[str, str]: """获取所有个人联系人 @@ -104,7 +117,7 @@ class ContactManager: 个人联系人字典,格式为 {"wxid": "NickName"} """ return self._personal_contacts - + def get_public_contacts(self) -> Dict[str, str]: """获取所有公共好友 @@ -112,7 +125,7 @@ class ContactManager: 公共好友字典,格式为 {"wxid": "NickName"} """ return self._public_contacts - + def get_official_accounts(self) -> Dict[str, str]: """获取所有公众号 @@ -151,23 +164,33 @@ class ContactManager: self._personal_contacts[wxid] = nickname self._logger.debug(f"已更新联系人: {wxid} -> {nickname}") - def refresh_contacts(self, new_contacts: Dict[str, str]) -> None: + def refresh_contacts(self, new_contacts: Dict[str, str], friends: List[Dict]) -> None: """刷新联系人信息 Args: new_contacts: 新的联系人字典 + friends: 好友清单 contact = { + "wxid": cnt.get("wxid", ""), + "code": cnt.get("code", ""), + "remark": cnt.get("remark", ""), + "name": cnt.get("name", ""), + "country": cnt.get("country", ""), + "province": cnt.get("province", ""), + "city": cnt.get("city", ""), + "gender": gender} """ self._contacts = new_contacts + self._friends = friends self._logger.info(f"联系人信息已刷新,共 {len(new_contacts)} 个联系人") # 重新分类联系人 self._classify_contacts() - + def get_contact_statistics(self) -> Tuple[int, int, int, int, int]: """获取联系人统计信息 Returns: 包含总联系人数、群组数、个人联系人数、公共好友数和公众号数的元组 """ - return (len(self._contacts), len(self._group_contacts), + return (len(self._contacts), len(self._group_contacts), len(self._personal_contacts), len(self._public_contacts), - len(self._official_accounts)) \ No newline at end of file + len(self._official_accounts))