""" 群成员信息同步插件 功能: 1. 手动触发同步群成员信息到本地 SQLite 数据库 2. 保存 wxid、用户名、头像 URL 3. 通过 wxid 唯一标识,支持更新已存在的成员信息 """ import asyncio import tomllib import aiosqlite from pathlib import Path from typing import Optional from loguru import logger from utils.plugin_base import PluginBase from utils.decorators import ( on_text_message, on_system_message, on_chatroom_member_add, on_chatroom_member_nickname_change, ) from utils.operation_lock import OperationLock class MemberSync(PluginBase): """群成员信息同步插件""" # 插件元数据 description = "同步群成员信息到本地数据库" author = "Assistant" version = "1.0.0" def __init__(self): super().__init__() self.config = None self.db_path = None self._full_sync_task: Optional[asyncio.Task] = None async def async_init(self): """插件异步初始化""" # 读取配置 config_path = Path(__file__).parent / "config.toml" with open(config_path, "rb") as f: self.config = tomllib.load(f) # 设置数据库路径 db_relative_path = self.config["database"]["db_path"] self.db_path = Path(__file__).parent / db_relative_path # 确保数据目录存在 self.db_path.parent.mkdir(parents=True, exist_ok=True) # 初始化数据库 await self._init_database() logger.info(f"群成员信息同步插件已加载,数据库: {self.db_path}") async def _init_database(self): """初始化数据库和表结构""" try: # 创建表(如果不存在) await self._create_table() logger.success(f"数据库初始化成功: {self.db_path}") except Exception as e: logger.error(f"初始化数据库失败: {e}") async def _create_table(self): """创建群成员信息表""" create_table_sql = """ CREATE TABLE IF NOT EXISTS group_members ( id INTEGER PRIMARY KEY AUTOINCREMENT, chatroom_wxid TEXT NOT NULL, wxid TEXT NOT NULL, nickname TEXT NOT NULL, group_nickname TEXT, signature TEXT, avatar_url TEXT, last_msg_at TEXT, daily_key TEXT, daily_count INTEGER DEFAULT 0, weekly_key TEXT, weekly_count INTEGER DEFAULT 0, monthly_key TEXT, monthly_count INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(chatroom_wxid, wxid) ) """ # 创建索引 create_index_sql = """ CREATE INDEX IF NOT EXISTS idx_chatroom ON group_members(chatroom_wxid) """ try: async with aiosqlite.connect(self.db_path) as db: await db.execute(create_table_sql) await db.execute(create_index_sql) await self._ensure_column(db, "group_nickname", "TEXT") await self._ensure_column(db, "signature", "TEXT") await self._ensure_column(db, "last_msg_at", "TEXT") await self._ensure_column(db, "daily_key", "TEXT") await self._ensure_column(db, "daily_count", "INTEGER DEFAULT 0") await self._ensure_column(db, "weekly_key", "TEXT") await self._ensure_column(db, "weekly_count", "INTEGER DEFAULT 0") await self._ensure_column(db, "monthly_key", "TEXT") await self._ensure_column(db, "monthly_count", "INTEGER DEFAULT 0") await db.commit() logger.success("数据库表检查/创建完成: group_members") except Exception as e: logger.error(f"创建数据库表失败: {e}") async def _ensure_column(self, db: aiosqlite.Connection, column: str, col_type: str): """确保指定列存在""" try: cursor = await db.execute("PRAGMA table_info(group_members)") rows = await cursor.fetchall() columns = {row[1] for row in rows if row and len(row) > 1} if column not in columns: await db.execute(f"ALTER TABLE group_members ADD COLUMN {column} {col_type}") except Exception as e: logger.warning(f"[MemberSync] 列检查失败: {column} -> {e}") @on_text_message(priority=50) async def handle_sync_command(self, bot, message: dict): """处理同步命令""" # 检查是否启用 if not self.config["behavior"]["enabled"]: return content = message.get("Content", "").strip() from_wxid = message.get("FromWxid", "") sender_wxid = message.get("SenderWxid", "") is_group = message.get("IsGroup", False) # 检查是否是同步命令 update_keywords = self.config["behavior"]["update_keywords"] update_all_keywords = self.config["behavior"].get("update_all_keywords", []) if content not in update_keywords and content not in update_all_keywords: return # 检查权限(只有管理员可以触发) admins = self.config["behavior"]["admins"] operator_wxid = sender_wxid if is_group else from_wxid if operator_wxid not in admins: await self._send_text_safe(bot, from_wxid, self.config["messages"]["permission_denied"]) return False # 全量同步命令 if content in update_all_keywords: await self._start_full_sync_task(bot, message) return False # 已切换为后台任务执行,这里不再走同步分支 return False OperationLock.pause("member_sync_all") try: room_ids = await self._get_full_sync_room_ids(bot, fallback_room=from_wxid) await bot.send_text(from_wxid, f"已获取群聊数: {len(room_ids)},开始同步...") result = await self._sync_all_chatrooms(bot, room_ids=room_ids) success_msg = self.config["messages"]["update_all_success"].format( groups=result["groups"], total=result["total"], added=result["added"], updated=result["updated"] ) await bot.send_text(from_wxid, success_msg) except Exception as e: logger.error(f"全量同步群成员信息失败: {e}") error_msg = self.config["messages"]["update_all_error"].format(error=str(e)) await bot.send_text(from_wxid, error_msg) finally: OperationLock.resume() return False # 单群同步必须在群聊中使用 if not is_group: await self._send_text_safe(bot, from_wxid, self.config["messages"]["not_in_group"]) return False # 检查群聊过滤 if not self._should_sync(from_wxid): return False # 发送开始消息 await self._send_text_safe(bot, from_wxid, self.config["messages"]["update_start"]) # 执行同步 try: result = await self._sync_group_members(bot, from_wxid) # 发送结果消息 success_msg = self.config["messages"]["update_success"].format( total=result["total"], added=result["added"], updated=result["updated"] ) await self._send_text_safe(bot, from_wxid, success_msg) except Exception as e: logger.error(f"同步群成员信息失败: {e}") error_msg = self.config["messages"]["update_error"].format(error=str(e)) await self._send_text_safe(bot, from_wxid, error_msg) return False # 阻止后续处理 async def _start_full_sync_task(self, bot, message: dict): """启动全量同步后台任务。""" reply_wxid = self._resolve_reply_target(message) fallback_room = self._resolve_fallback_room(message) if self._full_sync_task and not self._full_sync_task.done(): await self._send_text_safe(bot, reply_wxid, "全量同步任务正在执行,请稍后再试") return await self._send_text_safe(bot, reply_wxid, self.config["messages"]["update_all_start"]) OperationLock.pause("member_sync_all") self._full_sync_task = asyncio.create_task( self._run_full_sync(bot, reply_wxid, fallback_room) ) def _resolve_reply_target(self, message: dict) -> str: """解析命令回复目标,优先回群聊,避免误回私聊。""" room_wxid = str(message.get("RoomWxid", "") or "") from_wxid = str(message.get("FromWxid", "") or "") to_wxid = str(message.get("ToWxid", "") or "") for wxid in (room_wxid, from_wxid, to_wxid): if wxid.endswith("@chatroom"): return wxid return from_wxid or to_wxid def _resolve_fallback_room(self, message: dict) -> str: """解析可作为全量同步兜底的群 ID。""" room_wxid = str(message.get("RoomWxid", "") or "") from_wxid = str(message.get("FromWxid", "") or "") to_wxid = str(message.get("ToWxid", "") or "") for wxid in (room_wxid, from_wxid, to_wxid): if wxid.endswith("@chatroom"): return wxid return "" async def _send_text_safe(self, bot, to_wxid: str, content: str) -> bool: """安全发送消息,避免空目标导致异常。""" target = (to_wxid or "").strip() if not target: logger.error(f"[MemberSync] 发送消息失败,目标 wxid 为空: {content}") return False try: result = await bot.send_text(target, content) if not result: logger.warning(f"[MemberSync] 发送消息失败: to={target}, content={content}") return bool(result) except Exception as e: logger.error(f"[MemberSync] 发送消息异常: to={target}, error={e}") return False async def _run_full_sync(self, bot, reply_wxid: str, fallback_room: str): """后台执行全量同步,避免主消息协程超时。""" try: room_ids = await self._get_full_sync_room_ids(bot, fallback_room=fallback_room) await self._send_text_safe(bot, reply_wxid, f"已获取群聊数: {len(room_ids)},开始同步...") result = await self._sync_all_chatrooms(bot, room_ids=room_ids) success_msg = self.config["messages"]["update_all_success"].format( groups=result["groups"], total=result["total"], added=result["added"], updated=result["updated"] ) await self._send_text_safe(bot, reply_wxid, success_msg) except asyncio.CancelledError: logger.warning("[MemberSync] 全量同步任务被取消") raise except Exception as e: logger.error(f"[MemberSync] 全量同步群成员信息失败: {e}") error_msg = self.config["messages"]["update_all_error"].format(error=str(e)) await self._send_text_safe(bot, reply_wxid, error_msg) finally: OperationLock.resume() self._full_sync_task = None def _extract_member_wxid(self, member) -> str: """从成员条目中提取 wxid""" if isinstance(member, str): return member if not isinstance(member, dict): return "" wxid = member.get("wxid") or member.get("userName") or member.get("UserName") if isinstance(wxid, dict): return wxid.get("String", "") or wxid.get("string", "") return str(wxid) if wxid else "" def _extract_member_nickname(self, member, default: str = "") -> str: """从成员条目中提取昵称""" if not isinstance(member, dict): return default nick_name = member.get("nickname") or member.get("nickName") or member.get("display_name") if isinstance(nick_name, dict): value = nick_name.get("string", "") or nick_name.get("String", "") or "" value = value.strip() return value or default if isinstance(nick_name, str): value = nick_name.strip() return value or default return default def _extract_member_group_nickname(self, member, default: str = "") -> str: """从成员条目中提取群昵称(群内备注)""" if not isinstance(member, dict): return default display = member.get("displayName") or member.get("display_name") if isinstance(display, dict): value = display.get("string", "") or display.get("String", "") or "" value = value.strip() return value or default if isinstance(display, str): value = display.strip() return value or default return default def _extract_signature(self, user_info, default: str = "") -> str: """从联系人详情中提取个性签名""" if not isinstance(user_info, dict): return default signature = user_info.get("signature") or user_info.get("Signature") if isinstance(signature, dict): value = signature.get("string", "") or signature.get("String", "") or "" value = value.strip() return value or default if isinstance(signature, str): value = signature.strip() return value or default return default async def _sync_member_list(self, bot, room_wxid: str, member_list, request_delay: float = 0.2) -> dict: """增量同步指定成员列表""" results = {"total": 0, "added": 0, "updated": 0} if not member_list: return results seen = set() for member in member_list: wxid = self._extract_member_wxid(member) if not wxid or wxid in seen: continue seen.add(wxid) results["total"] += 1 try: await asyncio.sleep(request_delay) user_info = await bot.get_group_member_contact(room_wxid, wxid) nickname = "" avatar_url = "" if user_info: nick_name = user_info.get("nickName", {}) if isinstance(nick_name, dict): nickname = nick_name.get("string", "") or nick_name.get("String", "") else: nickname = nick_name if isinstance(nick_name, str) else "" big_head = user_info.get("bigHeadImgUrl", "") if isinstance(big_head, dict): avatar_url = big_head.get("string", "") else: avatar_url = big_head if isinstance(big_head, str) else "" signature = self._extract_signature(user_info, "") else: signature = "" if not nickname: nickname = self._extract_member_nickname(member, "") group_nickname = self._extract_member_group_nickname(member, "") if not group_nickname: group_nickname = nickname or None is_new = await self._save_member( room_wxid, wxid, nickname, avatar_url, group_nickname=group_nickname, signature=signature or None, ) if is_new: results["added"] += 1 else: results["updated"] += 1 except Exception as e: logger.error(f"增量同步成员失败: {wxid}, error={e}") return results @on_chatroom_member_add(priority=45) async def handle_member_add_event(self, bot, message: dict): """处理群成员新增事件(增量更新数据库)""" if not self.config["behavior"].get("enabled", False): return if not self.config["behavior"].get("auto_sync_new_members", False): return room_wxid = message.get("RoomWxid", "") member_list = message.get("MemberList", []) if not room_wxid or not self._should_sync(room_wxid): return if not member_list: return request_delay = float(self.config["behavior"].get("request_delay", 0.2)) logger.info(f"[MemberSync] 增量同步新成员: room={room_wxid}, count={len(member_list)}") await self._sync_member_list(bot, room_wxid, member_list, request_delay=request_delay) @on_chatroom_member_nickname_change(priority=40) async def handle_member_nickname_change(self, bot, message: dict): """处理群成员昵称变更事件(更新群昵称字段)""" if not self.config["behavior"].get("enabled", False): return True if not self.config["behavior"].get("auto_sync_nickname_change", True): return True room_wxid = message.get("RoomWxid", "") member_list = message.get("MemberList", []) if not room_wxid or not self._should_sync(room_wxid): return True if not member_list: return True updated = 0 for member in member_list: wxid = member.get("wxid") or "" if not wxid: continue group_nickname = (member.get("display_name") or member.get("nickname") or "").strip() if not group_nickname: continue avatar_url = member.get("avatar") or "" nickname = (member.get("nickname") or "").strip() or group_nickname await self._save_member( room_wxid, wxid, nickname, avatar_url, group_nickname=group_nickname, ) updated += 1 if updated: logger.info(f"[MemberSync] 群昵称已更新: room={room_wxid}, count={updated}") return True def _extract_chatroom_wxid(self, chatroom_entry) -> str: """从好友列表条目中提取群聊 wxid""" if isinstance(chatroom_entry, str): return chatroom_entry if not isinstance(chatroom_entry, dict): return "" contact = chatroom_entry.get("contact", chatroom_entry) username = contact.get("userName", "") if isinstance(username, dict): return username.get("String", "") return str(username) if username else "" async def _get_known_chatrooms_from_db(self) -> list: """从本地数据库获取已知群聊列表""" if not self.db_path: return [] if not self.db_path.exists(): return [] try: async with aiosqlite.connect(self.db_path) as db: cursor = await db.execute( "SELECT DISTINCT chatroom_wxid FROM group_members WHERE chatroom_wxid != ''" ) rows = await cursor.fetchall() return [row[0] for row in rows if row and row[0]] except Exception as e: logger.error(f"[MemberSync] 获取已知群聊失败: {e}") return [] async def _get_full_sync_room_ids(self, bot, fallback_room: str = "") -> list: """获取全量同步的群聊列表(带兜底)""" chatrooms = await bot.get_chatroom_list(force_refresh=True) room_ids = [] for chatroom in chatrooms: wxid = self._extract_chatroom_wxid(chatroom) if wxid and wxid.endswith("@chatroom") and self._should_sync(wxid): room_ids.append(wxid) if not room_ids: room_ids = await self._get_known_chatrooms_from_db() if not room_ids: enabled_groups = self.config["behavior"].get("enabled_groups", []) room_ids = [room for room in enabled_groups if isinstance(room, str)] if not room_ids and fallback_room and fallback_room.endswith("@chatroom"): room_ids = [fallback_room] room_ids = [room for room in room_ids if room.endswith("@chatroom") and self._should_sync(room)] room_ids = list(dict.fromkeys(room_ids)) if not room_ids: raise Exception("未获取到任何群聊") return room_ids async def _sync_all_chatrooms(self, bot, room_ids: Optional[list] = None, fallback_room: str = "") -> dict: """同步机器人加入的所有群成员信息(串行,带延迟)""" if not self.db_path: raise Exception("数据库未初始化") if room_ids is None: room_ids = await self._get_full_sync_room_ids(bot, fallback_room=fallback_room) total = 0 added = 0 updated = 0 groups = 0 for room_wxid in room_ids: logger.info(f"开始同步群成员信息(全量): {room_wxid}") result = await self._sync_group_members_serial(bot, room_wxid, request_delay=0.2) groups += 1 total += result["total"] added += result["added"] updated += result["updated"] logger.success( f"全量同步完成: 群聊数={groups}, 总计={total}, 新增={added}, 更新={updated}" ) return {"groups": groups, "total": total, "added": added, "updated": updated} async def _sync_group_members_serial(self, bot, room_wxid: str, request_delay: float = 0.2) -> dict: """ 同步群成员信息到数据库(串行 + 固定延迟) Args: bot: WechatHookClient 实例 room_wxid: 群聊 ID request_delay: 每次 API 调用延迟(秒) Returns: dict: 同步结果统计 {total, added, updated} """ wxids, total, basic_map = await self._get_room_members_snapshot(bot, room_wxid) if not wxids: raise Exception("无法获取群成员列表") logger.info(f"开始串行同步群成员信息: {room_wxid}, 成员数: {total}") results = {"added": 0, "updated": 0} for wxid in wxids: try: await asyncio.sleep(request_delay) user_info = await bot.get_group_member_contact(room_wxid, wxid) if user_info: nick_name = user_info.get("nickName", {}) if isinstance(nick_name, dict): nickname = nick_name.get("string", "") else: nickname = nick_name if isinstance(nick_name, str) else "" big_head = user_info.get("bigHeadImgUrl", "") if isinstance(big_head, dict): avatar_url = big_head.get("string", "") else: avatar_url = big_head if isinstance(big_head, str) else "" signature = self._extract_signature(user_info, "") else: basic = basic_map.get(wxid, {}) nickname = basic.get("nickname", "") avatar_url = basic.get("avatar_url", "") signature = "" if not nickname or not avatar_url: basic = basic_map.get(wxid, {}) nickname = nickname or basic.get("nickname", "") avatar_url = avatar_url or basic.get("avatar_url", "") group_nickname = basic_map.get(wxid, {}).get("group_nickname") or "" if not group_nickname: group_nickname = nickname or None is_new = await self._save_member( room_wxid, wxid, nickname, avatar_url, group_nickname=group_nickname, signature=signature or None, ) if is_new: results["added"] += 1 else: results["updated"] += 1 except Exception as e: logger.error(f"同步成员 {wxid} 失败: {e}") logger.success( f"群 {room_wxid} 同步完成: 总计={total}, 新增={results['added']}, 更新={results['updated']}" ) return {"total": total, "added": results["added"], "updated": results["updated"]} async def _sync_group_members(self, bot, room_wxid: str) -> dict: """ 同步群成员信息到数据库(并发处理) Args: bot: WechatHookClient 实例 room_wxid: 群聊 ID Returns: dict: 同步结果统计 {total, added, updated} """ if not self.db_path: raise Exception("数据库未初始化") wxids, total, basic_map = await self._get_room_members_snapshot(bot, room_wxid) if not wxids: raise Exception("无法获取群成员列表") logger.info(f"开始同步群成员信息: {room_wxid}, 成员数: {total}") # 获取并发数配置 max_concurrency = self.config["behavior"].get("max_concurrency", 10) semaphore = asyncio.Semaphore(max_concurrency) # 用于统计的计数器 results = {"added": 0, "updated": 0} async def process_member(wxid: str): if not wxid: return async with semaphore: try: # 添加请求延迟,避免API过载 request_delay = self.config["behavior"].get("request_delay", 0.1) await asyncio.sleep(request_delay) # 获取成员详细信息(包括头像) user_info = await bot.get_group_member_contact(room_wxid, wxid) if user_info: # 提取昵称 nick_name = user_info.get("nickName", {}) if isinstance(nick_name, dict): nickname = nick_name.get("string", "") else: nickname = nick_name if isinstance(nick_name, str) else "" # 提取头像 URL big_head = user_info.get("bigHeadImgUrl", "") if isinstance(big_head, dict): avatar_url = big_head.get("string", "") else: avatar_url = big_head if isinstance(big_head, str) else "" signature = self._extract_signature(user_info, "") else: # 如果获取详细信息失败,使用基本信息 basic = basic_map.get(wxid, {}) nickname = basic.get("nickname", "") avatar_url = basic.get("avatar_url", "") signature = "" if not nickname or not avatar_url: basic = basic_map.get(wxid, {}) nickname = nickname or basic.get("nickname", "") avatar_url = avatar_url or basic.get("avatar_url", "") group_nickname = basic_map.get(wxid, {}).get("group_nickname") or "" if not group_nickname: group_nickname = nickname or None # 保存到数据库 is_new = await self._save_member( room_wxid, wxid, nickname, avatar_url, group_nickname=group_nickname, signature=signature or None, ) if is_new: results["added"] += 1 else: results["updated"] += 1 logger.debug(f"同步成员: {nickname} ({wxid}), 新增={is_new}") except Exception as e: logger.error(f"同步成员 {wxid} 失败: {e}") # 并发处理所有成员 tasks = [process_member(wxid) for wxid in wxids] await asyncio.gather(*tasks) logger.success(f"同步完成: 总计={total}, 新增={results['added']}, 更新={results['updated']}") return { "total": total, "added": results["added"], "updated": results["updated"] } async def _get_room_members_snapshot(self, bot, room_wxid: str) -> tuple: """获取群成员 wxid 列表与基础信息映射(尽量全量)""" basic_map = {} wxids = [] total = 0 info = await bot.get_chatroom_info(room_wxid) if info: new_data = info.get("newChatroomData", {}) for m in new_data.get("chatRoomMember", []) or []: wxid = m.get("userName", "") or m.get("wxid", "") if not wxid: continue basic_map[wxid] = { "nickname": m.get("nickName", "") or "", "avatar_url": m.get("bigHeadImgUrl", "") or "", "group_nickname": self._extract_member_group_nickname(m, ""), } all_list = info.get("allMemberUserNameList") or [] if all_list: for entry in all_list: if isinstance(entry, dict): wxid = entry.get("String", "") or entry.get("string", "") else: wxid = str(entry) if wxid: wxids.append(wxid) else: wxids = list(basic_map.keys()) if wxids: wxids = list(dict.fromkeys(wxids)) total = int(info.get("allMemberCount") or len(wxids)) if total and len(wxids) < total: logger.warning( f"[MemberSync] 群成员数量不完整,尝试补全: room={room_wxid}, all={total}, listed={len(wxids)}" ) members = await bot.get_chatroom_members(room_wxid) for m in members: mwxid = m.get("wxid") or m.get("userName") or "" if not mwxid: continue if mwxid not in basic_map: basic_map[mwxid] = { "nickname": m.get("nickname", "") or "", "avatar_url": m.get("avatar", "") or "", "group_nickname": self._extract_member_group_nickname(m, ""), } wxids.append(mwxid) if wxids: wxids = list(dict.fromkeys(wxids)) return wxids, total, basic_map members = await bot.get_chatroom_members(room_wxid) for m in members: wxid = m.get("wxid", "") or m.get("userName", "") if not wxid: continue wxids.append(wxid) basic_map[wxid] = { "nickname": m.get("nickname", "") or m.get("nickName", "") or "", "avatar_url": m.get("avatar", "") or m.get("bigHeadImgUrl", "") or "", "group_nickname": self._extract_member_group_nickname(m, ""), } total = len(wxids) return wxids, total, basic_map async def _save_member( self, chatroom_wxid: str, wxid: str, nickname: str, avatar_url: str, group_nickname: Optional[str] = None, signature: Optional[str] = None, ) -> bool: """ 保存或更新成员信息到数据库 Args: chatroom_wxid: 群聊 ID wxid: 微信 ID nickname: 昵称 avatar_url: 头像 URL group_nickname: 群昵称(群内备注) signature: 个性签名 Returns: bool: True=新增, False=更新 """ try: async with aiosqlite.connect(self.db_path) as db: # 先检查是否存在 cursor = await db.execute( "SELECT id FROM group_members WHERE chatroom_wxid = ? AND wxid = ?", (chatroom_wxid, wxid) ) existing = await cursor.fetchone() if existing: # 更新 fields = ["nickname = ?", "avatar_url = ?"] params = [nickname, avatar_url] if group_nickname is not None: fields.append("group_nickname = ?") params.append(group_nickname) if signature is not None: fields.append("signature = ?") params.append(signature) fields.append("updated_at = CURRENT_TIMESTAMP") sql = ( "UPDATE group_members SET " + ", ".join(fields) + " WHERE chatroom_wxid = ? AND wxid = ?" ) params.extend([chatroom_wxid, wxid]) await db.execute(sql, tuple(params)) await db.commit() return False else: # 插入 await db.execute( """INSERT INTO group_members (chatroom_wxid, wxid, nickname, group_nickname, signature, avatar_url) VALUES (?, ?, ?, ?, ?, ?)""", (chatroom_wxid, wxid, nickname, group_nickname, signature, avatar_url) ) await db.commit() return True except Exception as e: logger.error(f"保存成员信息失败: wxid={wxid}, error={e}") raise def _should_sync(self, room_wxid: str) -> bool: """判断是否应该同步该群""" enabled_groups = self.config["behavior"]["enabled_groups"] disabled_groups = self.config["behavior"]["disabled_groups"] # 如果在禁用列表中,不同步 if room_wxid in disabled_groups: return False # 如果启用列表为空,对所有群生效 if not enabled_groups: return True # 否则只对启用列表中的群生效 return room_wxid in enabled_groups @on_system_message(priority=50) async def handle_member_join(self, bot, message: dict): """处理新成员加入事件""" logger.debug(f"[MemberSync] 收到系统消息") # 检查是否启用自动同步 if not self.config["behavior"].get("auto_sync_new_members", False): logger.debug(f"[MemberSync] 自动同步未启用") return True # 检查是否启用插件 if not self.config["behavior"]["enabled"]: logger.debug(f"[MemberSync] 插件未启用") return True content = message.get("Content", "") from_wxid = message.get("FromWxid", "") is_group = message.get("IsGroup", False) logger.debug(f"[MemberSync] 消息内容: {content}, 群聊: {is_group}") # 必须是群聊消息 if not is_group: logger.debug(f"[MemberSync] 不是群聊消息") return True # 检查群聊过滤 if not self._should_sync(from_wxid): logger.debug(f"[MemberSync] 群聊 {from_wxid} 不在同步列表中") return True # 检查是否是成员加入消息 join_keywords = ["加入群聊", "加入了群聊", "invited", "邀请", "通过扫描"] if not any(keyword in content for keyword in join_keywords): logger.debug(f"[MemberSync] 不是成员加入消息") return True logger.info(f"[MemberSync] 检测到新成员加入: {from_wxid}, 消息: {content}") # 异步执行同步(不阻塞消息处理) asyncio.create_task(self._sync_new_members(bot, from_wxid, content)) return True async def _sync_new_members(self, bot, room_wxid: str, content: str): """同步新加入的成员信息""" try: # 等待一小段时间,确保微信服务器已更新成员列表 await asyncio.sleep(2) # 获取最新的群成员列表(尽量全量) wxids, _, basic_map = await self._get_room_members_snapshot(bot, room_wxid) if not wxids: logger.warning(f"无法获取群成员列表: {room_wxid}") return logger.info(f"开始同步新成员信息: {room_wxid}") # 遍历所有成员,检查数据库中是否存在 new_count = 0 request_delay = float(self.config["behavior"].get("request_delay", 0.2)) for wxid in wxids: if not wxid: continue # 检查数据库中是否已存在 async with aiosqlite.connect(self.db_path) as db: cursor = await db.execute( "SELECT id FROM group_members WHERE wxid = ?", (wxid,) ) existing = await cursor.fetchone() # 如果不存在,则同步该成员 if not existing: try: # 添加延迟避免API过载 await asyncio.sleep(request_delay) # 获取成员详细信息 user_info = await bot.get_group_member_contact(room_wxid, wxid) if user_info: # 提取昵称 nick_name = user_info.get("nickName", {}) if isinstance(nick_name, dict): nickname = nick_name.get("string", "") else: nickname = nick_name if isinstance(nick_name, str) else "" # 提取头像 URL big_head = user_info.get("bigHeadImgUrl", "") if isinstance(big_head, dict): avatar_url = big_head.get("string", "") else: avatar_url = big_head if isinstance(big_head, str) else "" signature = self._extract_signature(user_info, "") else: basic = basic_map.get(wxid, {}) nickname = basic.get("nickname", "") avatar_url = basic.get("avatar_url", "") signature = "" if not nickname or not avatar_url: basic = basic_map.get(wxid, {}) nickname = nickname or basic.get("nickname", "") avatar_url = avatar_url or basic.get("avatar_url", "") # 保存到数据库 group_nickname = basic_map.get(wxid, {}).get("group_nickname") or "" if not group_nickname: group_nickname = nickname or None await self._save_member( room_wxid, wxid, nickname, avatar_url, group_nickname=group_nickname, signature=signature or None, ) new_count += 1 logger.info(f"自动同步新成员: {nickname} ({wxid})") except Exception as e: logger.error(f"同步新成员 {wxid} 失败: {e}") if new_count > 0: logger.success(f"自动同步完成: 新增 {new_count} 个成员") except Exception as e: logger.error(f"自动同步新成员失败: {e}") async def cleanup(self): if self._full_sync_task and not self._full_sync_task.done(): self._full_sync_task.cancel() try: await self._full_sync_task except asyncio.CancelledError: logger.info("[MemberSync] 全量同步后台任务已取消") """插件清理""" logger.info("MemberSync 插件已清理")