""" 群成员信息服务 提供统一的用户信息查询接口,优先从 MemberSync 数据库读取, 避免频繁调用 API 和 Redis 缓存 """ import aiosqlite from pathlib import Path from typing import Optional, Dict, List from loguru import logger class MemberInfoService: """ 群成员信息服务(单例模式) 优先级: 1. MemberSync SQLite 数据库(最快、最可靠) 2. 其他缓存/历史(由调用方处理,本服务不触发 API) """ _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return # MemberSync 数据库路径 self.db_path = Path(__file__).parent.parent / "plugins" / "MemberSync" / "data" / "member_sync.db" self._columns_cache = None self._initialized = True logger.info(f"[MemberInfoService] 初始化,数据库路径: {self.db_path}") async def _get_table_columns(self, db: aiosqlite.Connection) -> set: """获取 group_members 表的列信息(带缓存)""" if self._columns_cache is not None: return self._columns_cache try: cursor = await db.execute("PRAGMA table_info(group_members)") rows = await cursor.fetchall() self._columns_cache = {row[1] for row in rows if row and len(row) > 1} except Exception: self._columns_cache = set() return self._columns_cache async def get_member_info(self, wxid: str) -> Optional[Dict]: """ 获取成员信息(昵称 + 头像 + 个性签名) Args: wxid: 用户 wxid Returns: 成员信息字典 {"wxid": str, "nickname": str, "avatar_url": str} 如果不存在返回 None """ if not self.db_path.exists(): logger.debug(f"[MemberInfoService] 数据库不存在: {self.db_path}") return None try: async with aiosqlite.connect(self.db_path) as db: columns = await self._get_table_columns(db) fields = ["wxid", "nickname", "avatar_url"] if "signature" in columns: fields.append("signature") if "group_nickname" in columns: fields.append("group_nickname") cursor = await db.execute( f"SELECT {', '.join(fields)} FROM group_members WHERE wxid = ?", (wxid,) ) row = await cursor.fetchone() if row: result = { "wxid": row[0], "nickname": row[1], "avatar_url": row[2] or "" } if "signature" in columns: result["signature"] = row[fields.index("signature")] or "" if "group_nickname" in columns: result["group_nickname"] = row[fields.index("group_nickname")] or "" logger.debug(f"[MemberInfoService] 数据库命中: {wxid} -> {result['nickname']}") return result else: logger.debug(f"[MemberInfoService] 数据库未找到: {wxid}") return None except Exception as e: logger.error(f"[MemberInfoService] 查询失败: {e}") return None async def get_member_nickname(self, wxid: str) -> Optional[str]: """ 快速获取成员昵称 Args: wxid: 用户 wxid Returns: 昵称,如果不存在返回 None """ info = await self.get_member_info(wxid) return info["nickname"] if info else None async def get_member_avatar(self, wxid: str) -> Optional[str]: """ 快速获取成员头像 URL Args: wxid: 用户 wxid Returns: 头像 URL,如果不存在返回 None """ info = await self.get_member_info(wxid) return info["avatar_url"] if info else None async def get_member_signature(self, wxid: str) -> Optional[str]: """快速获取成员个性签名""" info = await self.get_member_info(wxid) return info.get("signature") if info else None async def get_chatroom_members(self, chatroom_wxid: str) -> List[Dict]: """ 获取指定群聊的所有成员 Args: chatroom_wxid: 群聊 ID Returns: 成员信息列表 """ if not self.db_path.exists(): return [] try: async with aiosqlite.connect(self.db_path) as db: columns = await self._get_table_columns(db) fields = ["wxid", "nickname", "avatar_url"] if "group_nickname" in columns: fields.append("group_nickname") if "signature" in columns: fields.append("signature") cursor = await db.execute( f"SELECT {', '.join(fields)} FROM group_members WHERE chatroom_wxid = ?", (chatroom_wxid,) ) rows = await cursor.fetchall() result = [ { "wxid": row[0], "nickname": row[1], "avatar_url": row[2] or "", "group_nickname": row[fields.index("group_nickname")] or "" if "group_nickname" in columns else "", "signature": row[fields.index("signature")] or "" if "signature" in columns else "", } for row in rows ] logger.debug(f"[MemberInfoService] 获取群 {chatroom_wxid} 成员: {len(result)} 人") return result except Exception as e: logger.error(f"[MemberInfoService] 获取群成员失败: {e}") return [] async def get_chatroom_member_info(self, chatroom_wxid: str, wxid: str) -> Optional[Dict]: """ 获取指定群聊中的成员信息 Args: chatroom_wxid: 群聊 ID wxid: 用户 wxid Returns: 成员信息字典 {"wxid": str, "nickname": str, "avatar_url": str} 如果不存在返回 None """ if not self.db_path.exists(): return None if not chatroom_wxid or not wxid: return None try: async with aiosqlite.connect(self.db_path) as db: columns = await self._get_table_columns(db) fields = ["wxid", "nickname", "avatar_url"] if "group_nickname" in columns: fields.append("group_nickname") if "signature" in columns: fields.append("signature") cursor = await db.execute( f""" SELECT {', '.join(fields)} FROM group_members WHERE chatroom_wxid = ? AND wxid = ? """, (chatroom_wxid, wxid) ) row = await cursor.fetchone() if row: result = { "wxid": row[0], "nickname": row[1], "avatar_url": row[2] or "", } if "group_nickname" in columns: result["group_nickname"] = row[fields.index("group_nickname")] or "" if "signature" in columns: result["signature"] = row[fields.index("signature")] or "" return result except Exception as e: logger.error(f"[MemberInfoService] 查询群成员失败: {e}") return None async def get_chatroom_member_nickname(self, chatroom_wxid: str, wxid: str) -> Optional[str]: """获取指定群聊中的成员昵称""" info = await self.get_chatroom_member_info(chatroom_wxid, wxid) return info["nickname"] if info else None async def get_chatroom_member_avatar(self, chatroom_wxid: str, wxid: str) -> Optional[str]: """获取指定群聊中的成员头像 URL""" info = await self.get_chatroom_member_info(chatroom_wxid, wxid) return info["avatar_url"] if info else None async def get_chatroom_member_signature(self, chatroom_wxid: str, wxid: str) -> Optional[str]: """获取指定群聊中的成员个性签名""" info = await self.get_chatroom_member_info(chatroom_wxid, wxid) return info.get("signature") if info else None async def get_chatroom_member_wxids(self, chatroom_wxid: str) -> List[str]: """ 获取指定群聊的所有成员 wxid 列表 Args: chatroom_wxid: 群聊 ID Returns: wxid 列表 """ if not self.db_path.exists(): return [] try: async with aiosqlite.connect(self.db_path) as db: cursor = await db.execute( "SELECT wxid FROM group_members WHERE chatroom_wxid = ?", (chatroom_wxid,) ) rows = await cursor.fetchall() return [row[0] for row in rows] except Exception as e: logger.error(f"[MemberInfoService] 获取群成员wxid列表失败: {e}") return [] async def get_all_members(self) -> List[Dict]: """ 获取所有成员信息 Returns: 成员信息列表 """ if not self.db_path.exists(): logger.debug(f"[MemberInfoService] 数据库不存在: {self.db_path}") return [] try: async with aiosqlite.connect(self.db_path) as db: columns = await self._get_table_columns(db) fields = ["wxid", "nickname", "avatar_url"] if "group_nickname" in columns: fields.append("group_nickname") if "signature" in columns: fields.append("signature") cursor = await db.execute( f"SELECT {', '.join(fields)} FROM group_members ORDER BY updated_at DESC" ) rows = await cursor.fetchall() result = [ { "wxid": row[0], "nickname": row[1], "avatar_url": row[2] or "", "group_nickname": row[fields.index("group_nickname")] or "" if "group_nickname" in columns else "", "signature": row[fields.index("signature")] or "" if "signature" in columns else "", } for row in rows ] logger.debug(f"[MemberInfoService] 获取所有成员: {len(result)} 人") return result except Exception as e: logger.error(f"[MemberInfoService] 查询所有成员失败: {e}") return [] async def get_members_by_wxids(self, wxids: List[str]) -> Dict[str, Dict]: """ 批量获取成员信息 Args: wxids: wxid 列表 Returns: {wxid: {"nickname": str, "avatar_url": str}} 字典 """ if not self.db_path.exists() or not wxids: return {} try: async with aiosqlite.connect(self.db_path) as db: columns = await self._get_table_columns(db) fields = ["wxid", "nickname", "avatar_url"] if "group_nickname" in columns: fields.append("group_nickname") if "signature" in columns: fields.append("signature") # 构建 IN 查询 placeholders = ",".join("?" * len(wxids)) cursor = await db.execute( f"SELECT {', '.join(fields)} FROM group_members WHERE wxid IN ({placeholders})", wxids ) rows = await cursor.fetchall() result = { row[0]: { "nickname": row[1], "avatar_url": row[2] or "", "group_nickname": row[fields.index("group_nickname")] or "" if "group_nickname" in columns else "", "signature": row[fields.index("signature")] or "" if "signature" in columns else "", } for row in rows } logger.debug(f"[MemberInfoService] 批量查询: 请求 {len(wxids)} 人,命中 {len(result)} 人") return result except Exception as e: logger.error(f"[MemberInfoService] 批量查询失败: {e}") return {} async def check_member_exists(self, wxid: str) -> bool: """ 检查成员是否存在于数据库 Args: wxid: 用户 wxid Returns: 是否存在 """ if not self.db_path.exists(): return False try: async with aiosqlite.connect(self.db_path) as db: cursor = await db.execute( "SELECT 1 FROM group_members WHERE wxid = ? LIMIT 1", (wxid,) ) row = await cursor.fetchone() return row is not None except Exception as e: logger.error(f"[MemberInfoService] 检查成员存在失败: {e}") return False # 全局单例 _service_instance: Optional[MemberInfoService] = None def get_member_service() -> MemberInfoService: """获取全局单例""" global _service_instance if _service_instance is None: _service_instance = MemberInfoService() return _service_instance # 便捷函数 async def get_member_info(wxid: str) -> Optional[Dict]: """便捷函数:获取成员信息""" return await get_member_service().get_member_info(wxid) async def get_member_nickname(wxid: str) -> Optional[str]: """便捷函数:获取成员昵称""" return await get_member_service().get_member_nickname(wxid) async def get_member_avatar(wxid: str) -> Optional[str]: """便捷函数:获取成员头像""" return await get_member_service().get_member_avatar(wxid) async def get_member_signature(wxid: str) -> Optional[str]: """便捷函数:获取成员个性签名""" return await get_member_service().get_member_signature(wxid)