Files
abot/utils/wechat/member_monitor.py
2026-01-19 15:37:43 +08:00

233 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
from typing import List, Dict, Optional, Tuple, Set
from loguru import logger
from wechat_ipad import WechatAPIClient
from db.connection import DBConnectionManager
from db.contacts_db import ContactsDBOperator
class ChatroomMemberMonitor:
"""
群成员变动监控工具类
"""
def __init__(self, ipad_bot: WechatAPIClient):
self.ipad_bot = ipad_bot
self.logger = logger
# 获取 DBManager 单例
self.db_manager = DBConnectionManager.get_instance()
self.db = ContactsDBOperator(self.db_manager)
async def check_and_handle_changes(self, group_id: str):
"""
检查指定群的成员变动并处理
:param group_id: 群聊ID (xxx@chatroom)
"""
self.logger.info(f"开始检查群 {group_id} 成员变动...")
# 1. 获取最新群成员列表 (API)
try:
api_members = await self.ipad_bot.get_chatroom_member_list(group_id)
if not api_members:
self.logger.warning(f"API获取群 {group_id} 成员列表为空")
return
except Exception as e:
self.logger.error(f"API获取群 {group_id} 成员列表失败: {e}")
return
# 2. 获取缓存中的群成员列表 (DB)
db_members = self.db.get_chatroom_member_list(group_id)
# 3. 对比分析
changes = self._analyze_changes(api_members, db_members)
# 4. 逻辑处理
if changes['has_changes']:
await self._process_changes(group_id, changes, api_members)
# 5. 更新缓存 (将最新的列表保存到数据库)
# 注意:这里我们使用 save_chatroom_member_simple 更新所有成员
# 为了保持一致性,可以选择先删除旧的再插入,或者依赖 save_chatroom_member_simple 的 ON DUPLICATE KEY UPDATE
# 考虑到有人离开,单纯 update 可能不够(离开的人还在库里),所以应该处理删除逻辑
# save_chatroom_member_simple 主要是插入/更新,不会删除不存在的人
# 所以需要从数据库中删除已经离开的成员
if changes['removed']:
removed_wxids = [m['wxid'] for m in changes['removed']]
self._remove_members_from_db(group_id, removed_wxids)
# 保存/更新现有的成员
if api_members:
self.db.save_chatroom_member_simple(group_id, api_members)
self.logger.info(f"{group_id} 成员缓存已更新")
else:
self.logger.info(f"{group_id} 成员无变化")
def _analyze_changes(self, api_members: List[Dict], db_members: List[Dict]) -> Dict:
"""
对比API数据和DB数据找出差异
"""
# 构建 API 成员字典 {wxid: member_info}
# api_member keys: UserName, NickName, DisplayName, ...
api_map = {}
for m in api_members:
wxid = m.get('UserName')
if wxid:
api_map[wxid] = {
'wxid': wxid,
'nick_name': m.get('NickName', ''),
'display_name': m.get('DisplayName', ''),
# 可以在这里添加更多需要对比的字段
}
# 构建 DB 成员字典 {wxid: member_info}
# db_member keys: wxid, nick_name, display_name, ...
db_map = {}
for m in db_members:
wxid = m.get('wxid')
if wxid:
db_map[wxid] = {
'wxid': wxid,
'nick_name': m.get('nick_name', ''),
'display_name': m.get('display_name', ''),
}
added = []
removed = []
modified = []
# 检查新增和修改
for wxid, api_m in api_map.items():
if wxid not in db_map:
added.append(api_m)
else:
db_m = db_map[wxid]
# 检查群昵称是否修改
# 逻辑如果API的DisplayName与DB不同或者(API没有DisplayName且NickName与DB不同)
# 简单起见,对比 display_name 和 nick_name
# 注意API返回的 DisplayName 为空字符串时,通常表示未设置群昵称,此时展示的是 NickName
# 对比 display_name (群昵称)
api_disp = api_m['display_name']
db_disp = db_m['display_name']
# 对比 nick_name (微信昵称) - 虽然题目主要关注"用户群昵称修改",但昵称变了也算信息变更
api_nick = api_m['nick_name']
db_nick = db_m['nick_name']
if api_disp != db_disp:
modified.append({
'wxid': wxid,
'type': 'display_name_change',
'old': db_disp,
'new': api_disp,
'member': api_m
})
elif api_nick != db_nick: # 可选如果只关心群昵称可以注释掉这个elif
modified.append({
'wxid': wxid,
'type': 'nick_name_change',
'old': db_nick,
'new': api_nick,
'member': api_m
})
# 检查减少
for wxid, db_m in db_map.items():
if wxid not in api_map:
removed.append(db_m)
has_changes = bool(added or removed or modified)
return {
'has_changes': has_changes,
'added': added,
'removed': removed,
'modified': modified
}
async def _process_changes(self, group_id: str, changes: Dict, api_members: List[Dict]):
"""
处理具体的变动逻辑
"""
news = []
# 1. 用户减少
for m in changes['removed']:
name = m.get('display_name') or m.get('nick_name') or m.get('wxid')
self.logger.info(f"[成员减少] 群 {group_id}: 用户 {name} ({m['wxid']}) 已退出或被移除")
news.append(f"❌ 成员减少: {name}")
# 2. 用户增多
for m in changes['added']:
name = m.get('display_name') or m.get('nick_name') or m.get('wxid')
self.logger.info(f"[成员新增] 群 {group_id}: 用户 {name} ({m['wxid']}) 已加入")
news.append(f"🆕 成员新增: {name}")
# 3. 用户群昵称修改
for m in changes['modified']:
wxid = m['wxid']
name = m['member'].get('nick_name')
if m['type'] == 'display_name_change':
self.logger.info(f"[昵称修改] 群 {group_id}: 用户 {name} ({wxid}) 群昵称从 '{m['old']}' 修改为 '{m['new']}'")
news.append(f"📝 昵称修改: {name} ({m['old']} -> {m['new']})")
elif m['type'] == 'nick_name_change':
self.logger.info(f"[昵称修改] 群 {group_id}: 用户 {name} ({wxid}) 微信昵称从 '{m['old']}' 修改为 '{m['new']}'")
# 微信昵称修改通常不通报,除非特别要求
# news.append(f"📝 微信昵称修改: {m['old']} -> {m['new']}")
if news:
msg_content = "📋 群成员变动通知:\n" + "\n".join(news)
try:
await self.ipad_bot.send_text_message(group_id, msg_content)
self.logger.info(f"已发送群成员变动通知到 {group_id}")
except Exception as e:
self.logger.error(f"发送群成员变动通知失败: {e}")
def _remove_members_from_db(self, group_id: str, wxid_list: List[str]):
"""
从数据库中删除指定的群成员
"""
if not wxid_list:
return
try:
# 构造 SQL 删除语句
# 注意ContactsDB 类可能没有直接删除特定成员的方法,这里我们需要手动执行 SQL 或者扩展 ContactsDB
# 查看 contacts_db.py 发现有 execute_update 方法,但通常建议封装在 DB 类中
# 这里直接使用 execute_update 如果权限允许,或者我们假设 db 有这个能力
placeholders = ', '.join(['%s'] * len(wxid_list))
sql = f"DELETE FROM t_chatroom_member WHERE chatroom_id = %s AND wxid IN ({placeholders})"
params = [group_id] + wxid_list
self.db.execute_update(sql, tuple(params))
self.logger.info(f"从数据库移除 {len(wxid_list)} 个成员: {wxid_list}")
except Exception as e:
self.logger.error(f"从数据库移除成员失败: {e}")
def parse_mod_contacts_msg(self, msg_content: Dict) -> List[str]:
"""
解析 ModContacts 消息提取受影响的群ID
:param msg_content: 消息内容的字典 (例如 message.txt 中的内容)
:return: 涉及的群ID列表
"""
group_ids = set()
mod_contacts = msg_content.get('ModContacts', [])
if not mod_contacts:
return []
for contact in mod_contacts:
user_name_obj = contact.get('UserName', {})
# 兼容不同的数据结构,有的可能是直接字符串,有的是 {'string': '...'}
user_name = ""
if isinstance(user_name_obj, dict):
user_name = user_name_obj.get('string', '')
elif isinstance(user_name_obj, str):
user_name = user_name_obj
if user_name.endswith('@chatroom'):
group_ids.add(user_name)
return list(group_ids)