Files
abot/utils/wechat/member_monitor.py
2026-01-19 16:34:50 +08:00

244 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
from typing import List, Dict, Optional, Tuple, Set
from loguru import logger
from wechat_ipad import WechatAPIClient
from db.connection import DBConnectionManager
from db.contacts_db import ContactsDBOperator
class ChatroomMemberMonitor:
"""
群成员变动监控工具类
"""
def __init__(self, ipad_bot: WechatAPIClient):
self.ipad_bot = ipad_bot
self.logger = logger
# 获取 DBManager 单例
self.db_manager = DBConnectionManager.get_instance()
self.db = ContactsDBOperator(self.db_manager)
async def check_and_handle_changes(self, group_id: str):
"""
检查指定群的成员变动并处理
:param group_id: 群聊ID (xxx@chatroom)
"""
self.logger.info(f"开始检查群 {group_id} 成员变动...")
# 1. 获取最新群成员列表 (API)
try:
api_members = await self.ipad_bot.get_chatroom_member_list(group_id)
if not api_members:
self.logger.warning(f"API获取群 {group_id} 成员列表为空")
return
except Exception as e:
self.logger.error(f"API获取群 {group_id} 成员列表失败: {e}")
return
# 2. 获取缓存中的群成员列表 (DB)
db_members = self.db.get_chatroom_member_list(group_id)
# 3. 对比分析
changes = self._analyze_changes(api_members, db_members)
# 4. 逻辑处理
if changes['has_changes']:
await self._process_changes(group_id, changes, api_members)
# 5. 差异化更新缓存 (只处理变动的数据,避免大批量读写)
# 5.1 删除已经离开的成员
if changes['removed']:
removed_wxids = [m['wxid'] for m in changes['removed']]
self._remove_members_from_db(group_id, removed_wxids)
# 5.2 保存新增的成员使用原始API数据以正确映射字段
if changes['added']:
added_raw = [m.get('raw_data', m) for m in changes['added']]
self.db.save_chatroom_member_simple(group_id, added_raw)
self.logger.info(f"{group_id} 新增 {len(changes['added'])} 名成员缓存")
# 5.3 更新修改的成员同样使用原始API数据
if changes['modified']:
modified_raw = [m['member'].get('raw_data', m['member']) for m in changes['modified']]
self.db.save_chatroom_member_simple(group_id, modified_raw)
self.logger.info(f"{group_id} 更新 {len(modified_raw)} 名成员缓存")
else:
self.logger.info(f"{group_id} 成员无变化")
def _analyze_changes(self, api_members: List[Dict], db_members: List[Dict]) -> Dict:
"""
对比API数据和DB数据找出差异
"""
# 构建 API 成员字典 {wxid: member_info}
# api_member keys: UserName, NickName, DisplayName, ...
api_map = {}
for m in api_members:
wxid = m.get('UserName')
if wxid:
api_map[wxid] = {
'wxid': wxid,
'nick_name': m.get('NickName', ''),
'display_name': m.get('DisplayName', ''),
'raw_data': m # 保存原始数据用于数据库更新
}
# 构建 DB 成员字典 {wxid: member_info}
# db_member keys: wxid, nick_name, display_name, ...
db_map = {}
for m in db_members:
wxid = m.get('wxid')
if wxid:
db_map[wxid] = {
'wxid': wxid,
'nick_name': m.get('nick_name', ''),
'display_name': m.get('display_name', ''),
'status': m.get('status', 1) # 默认为1
}
added = []
removed = []
modified = []
# 检查新增和修改
for wxid, api_m in api_map.items():
if wxid not in db_map:
added.append(api_m)
else:
db_m = db_map[wxid]
# 如果数据库中状态非1已退群但API中有说明重新加入
if db_m['status'] != 1:
added.append(api_m)
else:
# 检查群昵称是否修改
# 逻辑如果API的DisplayName与DB不同或者(API没有DisplayName且NickName与DB不同)
# 简单起见,对比 display_name 和 nick_name
# 注意API返回的 DisplayName 为空字符串时,通常表示未设置群昵称,此时展示的是 NickName
# 对比 display_name (群昵称)
api_disp = api_m['display_name']
db_disp = db_m['display_name']
# 对比 nick_name (微信昵称) - 虽然题目主要关注"用户群昵称修改",但昵称变了也算信息变更
api_nick = api_m['nick_name']
db_nick = db_m['nick_name']
if api_disp != db_disp:
modified.append({
'wxid': wxid,
'type': 'display_name_change',
'old': db_disp,
'new': api_disp,
'member': api_m
})
elif api_nick != db_nick: # 可选如果只关心群昵称可以注释掉这个elif
modified.append({
'wxid': wxid,
'type': 'nick_name_change',
'old': db_nick,
'new': api_nick,
'member': api_m
})
# 检查减少
for wxid, db_m in db_map.items():
if wxid not in api_map:
# 只有当前状态为1在群里的才算减少
if db_m['status'] == 1:
removed.append(db_m)
has_changes = bool(added or removed or modified)
return {
'has_changes': has_changes,
'added': added,
'removed': removed,
'modified': modified
}
async def _process_changes(self, group_id: str, changes: Dict, api_members: List[Dict]):
"""
处理具体的变动逻辑
"""
news = []
# 1. 用户减少
for m in changes['removed']:
name = m.get('display_name') or m.get('nick_name') or m.get('wxid')
self.logger.info(f"[成员减少] 群 {group_id}: 用户 {name} ({m['wxid']}) 已退出或被移除")
news.append(f"❌ 成员减少: {name}")
# 2. 用户增多
for m in changes['added']:
name = m.get('display_name') or m.get('nick_name') or m.get('wxid')
self.logger.info(f"[成员新增] 群 {group_id}: 用户 {name} ({m['wxid']}) 已加入")
news.append(f"🆕 成员新增: {name}")
# 3. 用户群昵称修改
for m in changes['modified']:
wxid = m['wxid']
name = m['member'].get('nick_name')
if m['type'] == 'display_name_change':
self.logger.info(f"[昵称修改] 群 {group_id}: 用户 {name} ({wxid}) 群昵称从 '{m['old']}' 修改为 '{m['new']}'")
news.append(f"📝 昵称修改: {name} ({m['old']} -> {m['new']})")
elif m['type'] == 'nick_name_change':
self.logger.info(f"[昵称修改] 群 {group_id}: 用户 {name} ({wxid}) 微信昵称从 '{m['old']}' 修改为 '{m['new']}'")
# 微信昵称修改通常不通报,除非特别要求
# news.append(f"📝 微信昵称修改: {m['old']} -> {m['new']}")
if news:
msg_content = "📋 群成员变动通知:\n" + "\n".join(news)
try:
await self.ipad_bot.send_text_message(group_id, msg_content)
self.logger.info(f"已发送群成员变动通知到 {group_id}")
except Exception as e:
self.logger.error(f"发送群成员变动通知失败: {e}")
def _remove_members_from_db(self, group_id: str, wxid_list: List[str]):
"""
从数据库中删除指定的群成员(逻辑删除)
"""
if not wxid_list:
return
try:
placeholders = ', '.join(['%s'] * len(wxid_list))
sql = f"""
UPDATE t_chatroom_member
SET status = 2, update_time = CURRENT_TIMESTAMP
WHERE chatroom_id = %s AND wxid IN ({placeholders})
"""
params = [group_id] + wxid_list
self.db.execute_update(sql, tuple(params))
self.logger.info(f"{group_id} 逻辑删除 {len(wxid_list)} 名成员")
except Exception as e:
self.logger.error(f"逻辑删除群 {group_id} 成员失败: {e}")
def parse_mod_contacts_msg(self, msg_content: Dict) -> List[str]:
"""
解析 ModContacts 消息提取受影响的群ID
:param msg_content: 消息内容的字典 (例如 message.txt 中的内容)
:return: 涉及的群ID列表
"""
group_ids = set()
mod_contacts = msg_content.get('ModContacts', [])
if not mod_contacts:
return []
for contact in mod_contacts:
user_name_obj = contact.get('UserName', {})
# 兼容不同的数据结构,有的可能是直接字符串,有的是 {'string': '...'}
user_name = ""
if isinstance(user_name_obj, dict):
user_name = user_name_obj.get('string', '')
elif isinstance(user_name_obj, str):
user_name = user_name_obj
if user_name.endswith('@chatroom'):
group_ids.add(user_name)
return list(group_ids)