244 lines
10 KiB
Python
244 lines
10 KiB
Python
import asyncio
|
||
from typing import List, Dict, Optional, Tuple, Set
|
||
from loguru import logger
|
||
|
||
from wechat_ipad import WechatAPIClient
|
||
from db.connection import DBConnectionManager
|
||
from db.contacts_db import ContactsDBOperator
|
||
|
||
|
||
class ChatroomMemberMonitor:
|
||
"""
|
||
群成员变动监控工具类
|
||
"""
|
||
def __init__(self, ipad_bot: WechatAPIClient):
|
||
self.ipad_bot = ipad_bot
|
||
self.logger = logger
|
||
# 获取 DBManager 单例
|
||
self.db_manager = DBConnectionManager.get_instance()
|
||
self.db = ContactsDBOperator(self.db_manager)
|
||
|
||
async def check_and_handle_changes(self, group_id: str):
|
||
"""
|
||
检查指定群的成员变动并处理
|
||
:param group_id: 群聊ID (xxx@chatroom)
|
||
"""
|
||
self.logger.info(f"开始检查群 {group_id} 成员变动...")
|
||
|
||
# 1. 获取最新群成员列表 (API)
|
||
try:
|
||
api_members = await self.ipad_bot.get_chatroom_member_list(group_id)
|
||
if not api_members:
|
||
self.logger.warning(f"API获取群 {group_id} 成员列表为空")
|
||
return
|
||
except Exception as e:
|
||
self.logger.error(f"API获取群 {group_id} 成员列表失败: {e}")
|
||
return
|
||
|
||
# 2. 获取缓存中的群成员列表 (DB)
|
||
db_members = self.db.get_chatroom_member_list(group_id)
|
||
|
||
# 3. 对比分析
|
||
changes = self._analyze_changes(api_members, db_members)
|
||
|
||
# 4. 逻辑处理
|
||
if changes['has_changes']:
|
||
await self._process_changes(group_id, changes, api_members)
|
||
|
||
# 5. 差异化更新缓存 (只处理变动的数据,避免大批量读写)
|
||
|
||
# 5.1 删除已经离开的成员
|
||
if changes['removed']:
|
||
removed_wxids = [m['wxid'] for m in changes['removed']]
|
||
self._remove_members_from_db(group_id, removed_wxids)
|
||
|
||
# 5.2 保存新增的成员(使用原始API数据以正确映射字段)
|
||
if changes['added']:
|
||
added_raw = [m.get('raw_data', m) for m in changes['added']]
|
||
self.db.save_chatroom_member_simple(group_id, added_raw)
|
||
self.logger.info(f"群 {group_id} 新增 {len(changes['added'])} 名成员缓存")
|
||
|
||
# 5.3 更新修改的成员(同样使用原始API数据)
|
||
if changes['modified']:
|
||
modified_raw = [m['member'].get('raw_data', m['member']) for m in changes['modified']]
|
||
self.db.save_chatroom_member_simple(group_id, modified_raw)
|
||
self.logger.info(f"群 {group_id} 更新 {len(modified_raw)} 名成员缓存")
|
||
|
||
else:
|
||
self.logger.info(f"群 {group_id} 成员无变化")
|
||
|
||
def _analyze_changes(self, api_members: List[Dict], db_members: List[Dict]) -> Dict:
|
||
"""
|
||
对比API数据和DB数据,找出差异
|
||
"""
|
||
# 构建 API 成员字典 {wxid: member_info}
|
||
# api_member keys: UserName, NickName, DisplayName, ...
|
||
api_map = {}
|
||
for m in api_members:
|
||
wxid = m.get('UserName')
|
||
if wxid:
|
||
api_map[wxid] = {
|
||
'wxid': wxid,
|
||
'nick_name': m.get('NickName', ''),
|
||
'display_name': m.get('DisplayName', ''),
|
||
'raw_data': m # 保存原始数据用于数据库更新
|
||
}
|
||
|
||
# 构建 DB 成员字典 {wxid: member_info}
|
||
# db_member keys: wxid, nick_name, display_name, ...
|
||
db_map = {}
|
||
for m in db_members:
|
||
wxid = m.get('wxid')
|
||
if wxid:
|
||
db_map[wxid] = {
|
||
'wxid': wxid,
|
||
'nick_name': m.get('nick_name', ''),
|
||
'display_name': m.get('display_name', ''),
|
||
'status': m.get('status', 1) # 默认为1
|
||
}
|
||
|
||
added = []
|
||
removed = []
|
||
modified = []
|
||
|
||
# 检查新增和修改
|
||
for wxid, api_m in api_map.items():
|
||
if wxid not in db_map:
|
||
added.append(api_m)
|
||
else:
|
||
db_m = db_map[wxid]
|
||
|
||
# 如果数据库中状态非1(已退群),但API中有,说明重新加入
|
||
if db_m['status'] != 1:
|
||
added.append(api_m)
|
||
else:
|
||
# 检查群昵称是否修改
|
||
# 逻辑:如果API的DisplayName与DB不同,或者(API没有DisplayName且NickName与DB不同)
|
||
# 简单起见,对比 display_name 和 nick_name
|
||
# 注意:API返回的 DisplayName 为空字符串时,通常表示未设置群昵称,此时展示的是 NickName
|
||
|
||
# 对比 display_name (群昵称)
|
||
api_disp = api_m['display_name']
|
||
db_disp = db_m['display_name']
|
||
|
||
# 对比 nick_name (微信昵称) - 虽然题目主要关注"用户群昵称修改",但昵称变了也算信息变更
|
||
api_nick = api_m['nick_name']
|
||
db_nick = db_m['nick_name']
|
||
|
||
if api_disp != db_disp:
|
||
modified.append({
|
||
'wxid': wxid,
|
||
'type': 'display_name_change',
|
||
'old': db_disp,
|
||
'new': api_disp,
|
||
'member': api_m
|
||
})
|
||
elif api_nick != db_nick: # 可选:如果只关心群昵称,可以注释掉这个elif
|
||
modified.append({
|
||
'wxid': wxid,
|
||
'type': 'nick_name_change',
|
||
'old': db_nick,
|
||
'new': api_nick,
|
||
'member': api_m
|
||
})
|
||
|
||
# 检查减少
|
||
for wxid, db_m in db_map.items():
|
||
if wxid not in api_map:
|
||
# 只有当前状态为1(在群里)的才算减少
|
||
if db_m['status'] == 1:
|
||
removed.append(db_m)
|
||
|
||
has_changes = bool(added or removed or modified)
|
||
|
||
return {
|
||
'has_changes': has_changes,
|
||
'added': added,
|
||
'removed': removed,
|
||
'modified': modified
|
||
}
|
||
|
||
async def _process_changes(self, group_id: str, changes: Dict, api_members: List[Dict]):
|
||
"""
|
||
处理具体的变动逻辑
|
||
"""
|
||
news = []
|
||
|
||
# 1. 用户减少
|
||
for m in changes['removed']:
|
||
name = m.get('display_name') or m.get('nick_name') or m.get('wxid')
|
||
self.logger.info(f"[成员减少] 群 {group_id}: 用户 {name} ({m['wxid']}) 已退出或被移除")
|
||
news.append(f"❌ 成员减少: {name}")
|
||
|
||
# 2. 用户增多
|
||
for m in changes['added']:
|
||
name = m.get('display_name') or m.get('nick_name') or m.get('wxid')
|
||
self.logger.info(f"[成员新增] 群 {group_id}: 用户 {name} ({m['wxid']}) 已加入")
|
||
news.append(f"🆕 成员新增: {name}")
|
||
|
||
# 3. 用户群昵称修改
|
||
for m in changes['modified']:
|
||
wxid = m['wxid']
|
||
name = m['member'].get('nick_name')
|
||
if m['type'] == 'display_name_change':
|
||
self.logger.info(f"[昵称修改] 群 {group_id}: 用户 {name} ({wxid}) 群昵称从 '{m['old']}' 修改为 '{m['new']}'")
|
||
news.append(f"📝 昵称修改: {name} ({m['old']} -> {m['new']})")
|
||
elif m['type'] == 'nick_name_change':
|
||
self.logger.info(f"[昵称修改] 群 {group_id}: 用户 {name} ({wxid}) 微信昵称从 '{m['old']}' 修改为 '{m['new']}'")
|
||
# 微信昵称修改通常不通报,除非特别要求
|
||
# news.append(f"📝 微信昵称修改: {m['old']} -> {m['new']}")
|
||
|
||
if news:
|
||
msg_content = "📋 群成员变动通知:\n" + "\n".join(news)
|
||
try:
|
||
await self.ipad_bot.send_text_message(group_id, msg_content)
|
||
self.logger.info(f"已发送群成员变动通知到 {group_id}")
|
||
except Exception as e:
|
||
self.logger.error(f"发送群成员变动通知失败: {e}")
|
||
|
||
def _remove_members_from_db(self, group_id: str, wxid_list: List[str]):
|
||
"""
|
||
从数据库中删除指定的群成员(逻辑删除)
|
||
"""
|
||
if not wxid_list:
|
||
return
|
||
|
||
try:
|
||
placeholders = ', '.join(['%s'] * len(wxid_list))
|
||
sql = f"""
|
||
UPDATE t_chatroom_member
|
||
SET status = 2, update_time = CURRENT_TIMESTAMP
|
||
WHERE chatroom_id = %s AND wxid IN ({placeholders})
|
||
"""
|
||
params = [group_id] + wxid_list
|
||
|
||
self.db.execute_update(sql, tuple(params))
|
||
self.logger.info(f"群 {group_id} 逻辑删除 {len(wxid_list)} 名成员")
|
||
except Exception as e:
|
||
self.logger.error(f"逻辑删除群 {group_id} 成员失败: {e}")
|
||
|
||
def parse_mod_contacts_msg(self, msg_content: Dict) -> List[str]:
|
||
"""
|
||
解析 ModContacts 消息,提取受影响的群ID
|
||
:param msg_content: 消息内容的字典 (例如 message.txt 中的内容)
|
||
:return: 涉及的群ID列表
|
||
"""
|
||
group_ids = set()
|
||
mod_contacts = msg_content.get('ModContacts', [])
|
||
if not mod_contacts:
|
||
return []
|
||
|
||
for contact in mod_contacts:
|
||
user_name_obj = contact.get('UserName', {})
|
||
# 兼容不同的数据结构,有的可能是直接字符串,有的是 {'string': '...'}
|
||
user_name = ""
|
||
if isinstance(user_name_obj, dict):
|
||
user_name = user_name_obj.get('string', '')
|
||
elif isinstance(user_name_obj, str):
|
||
user_name = user_name_obj
|
||
|
||
if user_name.endswith('@chatroom'):
|
||
group_ids.add(user_name)
|
||
|
||
return list(group_ids)
|