import asyncio from typing import List, Dict, Optional, Tuple, Set from loguru import logger from wechat_ipad import WechatAPIClient from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator class ChatroomMemberMonitor: """ 群成员变动监控工具类 """ def __init__(self, ipad_bot: WechatAPIClient): self.ipad_bot = ipad_bot self.logger = logger # 获取 DBManager 单例 self.db_manager = DBConnectionManager.get_instance() self.db = ContactsDBOperator(self.db_manager) async def check_and_handle_changes(self, group_id: str): """ 检查指定群的成员变动并处理 :param group_id: 群聊ID (xxx@chatroom) """ self.logger.info(f"开始检查群 {group_id} 成员变动...") # 1. 获取最新群成员列表 (API) try: api_members = await self.ipad_bot.get_chatroom_member_list(group_id) if not api_members: self.logger.warning(f"API获取群 {group_id} 成员列表为空") return except Exception as e: self.logger.error(f"API获取群 {group_id} 成员列表失败: {e}") return # 2. 获取缓存中的群成员列表 (DB) db_members = self.db.get_chatroom_member_list(group_id) # 3. 对比分析 changes = self._analyze_changes(api_members, db_members) # 4. 逻辑处理 if changes['has_changes']: await self._process_changes(group_id, changes, api_members) # 5. 差异化更新缓存 (只处理变动的数据,避免大批量读写) # 5.1 删除已经离开的成员 if changes['removed']: removed_wxids = [m['wxid'] for m in changes['removed']] self._remove_members_from_db(group_id, removed_wxids) # 5.2 保存新增的成员(使用原始API数据以正确映射字段) if changes['added']: added_raw = [m.get('raw_data', m) for m in changes['added']] self.db.save_chatroom_member_simple(group_id, added_raw) self.logger.info(f"群 {group_id} 新增 {len(changes['added'])} 名成员缓存") # 5.3 更新修改的成员(同样使用原始API数据) if changes['modified']: modified_raw = [m['member'].get('raw_data', m['member']) for m in changes['modified']] self.db.save_chatroom_member_simple(group_id, modified_raw) self.logger.info(f"群 {group_id} 更新 {len(modified_raw)} 名成员缓存") else: self.logger.info(f"群 {group_id} 成员无变化") def _analyze_changes(self, api_members: List[Dict], db_members: List[Dict]) -> Dict: """ 对比API数据和DB数据,找出差异 """ # 构建 API 成员字典 {wxid: member_info} # api_member keys: UserName, NickName, DisplayName, ... api_map = {} for m in api_members: wxid = m.get('UserName') if wxid: api_map[wxid] = { 'wxid': wxid, 'nick_name': m.get('NickName', ''), 'display_name': m.get('DisplayName', ''), 'raw_data': m # 保存原始数据用于数据库更新 } # 构建 DB 成员字典 {wxid: member_info} # db_member keys: wxid, nick_name, display_name, ... db_map = {} for m in db_members: wxid = m.get('wxid') if wxid: db_map[wxid] = { 'wxid': wxid, 'nick_name': m.get('nick_name', ''), 'display_name': m.get('display_name', ''), 'status': m.get('status', 1) # 默认为1 } added = [] removed = [] modified = [] # 检查新增和修改 for wxid, api_m in api_map.items(): if wxid not in db_map: added.append(api_m) else: db_m = db_map[wxid] # 如果数据库中状态非1(已退群),但API中有,说明重新加入 if db_m['status'] != 1: added.append(api_m) else: # 检查群昵称是否修改 # 逻辑:如果API的DisplayName与DB不同,或者(API没有DisplayName且NickName与DB不同) # 简单起见,对比 display_name 和 nick_name # 注意:API返回的 DisplayName 为空字符串时,通常表示未设置群昵称,此时展示的是 NickName # 对比 display_name (群昵称) api_disp = api_m['display_name'] db_disp = db_m['display_name'] # 对比 nick_name (微信昵称) - 虽然题目主要关注"用户群昵称修改",但昵称变了也算信息变更 api_nick = api_m['nick_name'] db_nick = db_m['nick_name'] if api_disp != db_disp: modified.append({ 'wxid': wxid, 'type': 'display_name_change', 'old': db_disp, 'new': api_disp, 'member': api_m }) elif api_nick != db_nick: # 可选:如果只关心群昵称,可以注释掉这个elif modified.append({ 'wxid': wxid, 'type': 'nick_name_change', 'old': db_nick, 'new': api_nick, 'member': api_m }) # 检查减少 for wxid, db_m in db_map.items(): if wxid not in api_map: # 只有当前状态为1(在群里)的才算减少 if db_m['status'] == 1: removed.append(db_m) has_changes = bool(added or removed or modified) return { 'has_changes': has_changes, 'added': added, 'removed': removed, 'modified': modified } async def _process_changes(self, group_id: str, changes: Dict, api_members: List[Dict]): """ 处理具体的变动逻辑 """ news = [] # 1. 用户减少 for m in changes['removed']: name = m.get('display_name') or m.get('nick_name') or m.get('wxid') self.logger.info(f"[成员减少] 群 {group_id}: 用户 {name} ({m['wxid']}) 已退出或被移除") news.append(f"❌ 成员减少: {name}") # 2. 用户增多 for m in changes['added']: name = m.get('display_name') or m.get('nick_name') or m.get('wxid') self.logger.info(f"[成员新增] 群 {group_id}: 用户 {name} ({m['wxid']}) 已加入") news.append(f"🆕 成员新增: {name}") # 3. 用户群昵称修改 for m in changes['modified']: wxid = m['wxid'] name = m['member'].get('nick_name') if m['type'] == 'display_name_change': self.logger.info(f"[昵称修改] 群 {group_id}: 用户 {name} ({wxid}) 群昵称从 '{m['old']}' 修改为 '{m['new']}'") news.append(f"📝 昵称修改: {name} ({m['old']} -> {m['new']})") elif m['type'] == 'nick_name_change': self.logger.info(f"[昵称修改] 群 {group_id}: 用户 {name} ({wxid}) 微信昵称从 '{m['old']}' 修改为 '{m['new']}'") # 微信昵称修改通常不通报,除非特别要求 # news.append(f"📝 微信昵称修改: {m['old']} -> {m['new']}") if news: msg_content = "📋 群成员变动通知:\n" + "\n".join(news) try: await self.ipad_bot.send_text_message(group_id, msg_content) self.logger.info(f"已发送群成员变动通知到 {group_id}") except Exception as e: self.logger.error(f"发送群成员变动通知失败: {e}") def _remove_members_from_db(self, group_id: str, wxid_list: List[str]): """ 从数据库中删除指定的群成员(逻辑删除) """ if not wxid_list: return try: placeholders = ', '.join(['%s'] * len(wxid_list)) sql = f""" UPDATE t_chatroom_member SET status = 2, update_time = CURRENT_TIMESTAMP WHERE chatroom_id = %s AND wxid IN ({placeholders}) """ params = [group_id] + wxid_list self.db.execute_update(sql, tuple(params)) self.logger.info(f"群 {group_id} 逻辑删除 {len(wxid_list)} 名成员") except Exception as e: self.logger.error(f"逻辑删除群 {group_id} 成员失败: {e}") def parse_mod_contacts_msg(self, msg_content: Dict) -> List[str]: """ 解析 ModContacts 消息,提取受影响的群ID :param msg_content: 消息内容的字典 (例如 message.txt 中的内容) :return: 涉及的群ID列表 """ group_ids = set() mod_contacts = msg_content.get('ModContacts', []) if not mod_contacts: return [] for contact in mod_contacts: user_name_obj = contact.get('UserName', {}) # 兼容不同的数据结构,有的可能是直接字符串,有的是 {'string': '...'} user_name = "" if isinstance(user_name_obj, dict): user_name = user_name_obj.get('string', '') elif isinstance(user_name_obj, str): user_name = user_name_obj if user_name.endswith('@chatroom'): group_ids.add(user_name) return list(group_ids)