Files
WeChatHookBot/utils/member_info_service.py

414 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
群成员信息服务
提供统一的用户信息查询接口,优先从 MemberSync 数据库读取,
避免频繁调用 API 和 Redis 缓存
"""
import aiosqlite
from pathlib import Path
from typing import Optional, Dict, List
from loguru import logger
class MemberInfoService:
"""
群成员信息服务(单例模式)
优先级:
1. MemberSync SQLite 数据库(最快、最可靠)
2. 其他缓存/历史(由调用方处理,本服务不触发 API
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# MemberSync 数据库路径
self.db_path = Path(__file__).parent.parent / "plugins" / "MemberSync" / "data" / "member_sync.db"
self._columns_cache = None
self._initialized = True
logger.info(f"[MemberInfoService] 初始化,数据库路径: {self.db_path}")
async def _get_table_columns(self, db: aiosqlite.Connection) -> set:
"""获取 group_members 表的列信息(带缓存)"""
if self._columns_cache is not None:
return self._columns_cache
try:
cursor = await db.execute("PRAGMA table_info(group_members)")
rows = await cursor.fetchall()
self._columns_cache = {row[1] for row in rows if row and len(row) > 1}
except Exception:
self._columns_cache = set()
return self._columns_cache
async def get_member_info(self, wxid: str) -> Optional[Dict]:
"""
获取成员信息(昵称 + 头像 + 个性签名)
Args:
wxid: 用户 wxid
Returns:
成员信息字典 {"wxid": str, "nickname": str, "avatar_url": str}
如果不存在返回 None
"""
if not self.db_path.exists():
logger.debug(f"[MemberInfoService] 数据库不存在: {self.db_path}")
return None
try:
async with aiosqlite.connect(self.db_path) as db:
columns = await self._get_table_columns(db)
fields = ["wxid", "nickname", "avatar_url"]
if "signature" in columns:
fields.append("signature")
if "group_nickname" in columns:
fields.append("group_nickname")
cursor = await db.execute(
f"SELECT {', '.join(fields)} FROM group_members WHERE wxid = ?",
(wxid,)
)
row = await cursor.fetchone()
if row:
result = {
"wxid": row[0],
"nickname": row[1],
"avatar_url": row[2] or ""
}
if "signature" in columns:
result["signature"] = row[fields.index("signature")] or ""
if "group_nickname" in columns:
result["group_nickname"] = row[fields.index("group_nickname")] or ""
logger.debug(f"[MemberInfoService] 数据库命中: {wxid} -> {result['nickname']}")
return result
else:
logger.debug(f"[MemberInfoService] 数据库未找到: {wxid}")
return None
except Exception as e:
logger.error(f"[MemberInfoService] 查询失败: {e}")
return None
async def get_member_nickname(self, wxid: str) -> Optional[str]:
"""
快速获取成员昵称
Args:
wxid: 用户 wxid
Returns:
昵称,如果不存在返回 None
"""
info = await self.get_member_info(wxid)
return info["nickname"] if info else None
async def get_member_avatar(self, wxid: str) -> Optional[str]:
"""
快速获取成员头像 URL
Args:
wxid: 用户 wxid
Returns:
头像 URL如果不存在返回 None
"""
info = await self.get_member_info(wxid)
return info["avatar_url"] if info else None
async def get_member_signature(self, wxid: str) -> Optional[str]:
"""快速获取成员个性签名"""
info = await self.get_member_info(wxid)
return info.get("signature") if info else None
async def get_chatroom_members(self, chatroom_wxid: str) -> List[Dict]:
"""
获取指定群聊的所有成员
Args:
chatroom_wxid: 群聊 ID
Returns:
成员信息列表
"""
if not self.db_path.exists():
return []
try:
async with aiosqlite.connect(self.db_path) as db:
columns = await self._get_table_columns(db)
fields = ["wxid", "nickname", "avatar_url"]
if "group_nickname" in columns:
fields.append("group_nickname")
if "signature" in columns:
fields.append("signature")
cursor = await db.execute(
f"SELECT {', '.join(fields)} FROM group_members WHERE chatroom_wxid = ?",
(chatroom_wxid,)
)
rows = await cursor.fetchall()
result = [
{
"wxid": row[0],
"nickname": row[1],
"avatar_url": row[2] or "",
"group_nickname": row[fields.index("group_nickname")] or "" if "group_nickname" in columns else "",
"signature": row[fields.index("signature")] or "" if "signature" in columns else "",
}
for row in rows
]
logger.debug(f"[MemberInfoService] 获取群 {chatroom_wxid} 成员: {len(result)}")
return result
except Exception as e:
logger.error(f"[MemberInfoService] 获取群成员失败: {e}")
return []
async def get_chatroom_member_info(self, chatroom_wxid: str, wxid: str) -> Optional[Dict]:
"""
获取指定群聊中的成员信息
Args:
chatroom_wxid: 群聊 ID
wxid: 用户 wxid
Returns:
成员信息字典 {"wxid": str, "nickname": str, "avatar_url": str}
如果不存在返回 None
"""
if not self.db_path.exists():
return None
if not chatroom_wxid or not wxid:
return None
try:
async with aiosqlite.connect(self.db_path) as db:
columns = await self._get_table_columns(db)
fields = ["wxid", "nickname", "avatar_url"]
if "group_nickname" in columns:
fields.append("group_nickname")
if "signature" in columns:
fields.append("signature")
cursor = await db.execute(
f"""
SELECT {', '.join(fields)}
FROM group_members
WHERE chatroom_wxid = ? AND wxid = ?
""",
(chatroom_wxid, wxid)
)
row = await cursor.fetchone()
if row:
result = {
"wxid": row[0],
"nickname": row[1],
"avatar_url": row[2] or "",
}
if "group_nickname" in columns:
result["group_nickname"] = row[fields.index("group_nickname")] or ""
if "signature" in columns:
result["signature"] = row[fields.index("signature")] or ""
return result
except Exception as e:
logger.error(f"[MemberInfoService] 查询群成员失败: {e}")
return None
async def get_chatroom_member_nickname(self, chatroom_wxid: str, wxid: str) -> Optional[str]:
"""获取指定群聊中的成员昵称"""
info = await self.get_chatroom_member_info(chatroom_wxid, wxid)
return info["nickname"] if info else None
async def get_chatroom_member_avatar(self, chatroom_wxid: str, wxid: str) -> Optional[str]:
"""获取指定群聊中的成员头像 URL"""
info = await self.get_chatroom_member_info(chatroom_wxid, wxid)
return info["avatar_url"] if info else None
async def get_chatroom_member_signature(self, chatroom_wxid: str, wxid: str) -> Optional[str]:
"""获取指定群聊中的成员个性签名"""
info = await self.get_chatroom_member_info(chatroom_wxid, wxid)
return info.get("signature") if info else None
async def get_chatroom_member_wxids(self, chatroom_wxid: str) -> List[str]:
"""
获取指定群聊的所有成员 wxid 列表
Args:
chatroom_wxid: 群聊 ID
Returns:
wxid 列表
"""
if not self.db_path.exists():
return []
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT wxid FROM group_members WHERE chatroom_wxid = ?",
(chatroom_wxid,)
)
rows = await cursor.fetchall()
return [row[0] for row in rows]
except Exception as e:
logger.error(f"[MemberInfoService] 获取群成员wxid列表失败: {e}")
return []
async def get_all_members(self) -> List[Dict]:
"""
获取所有成员信息
Returns:
成员信息列表
"""
if not self.db_path.exists():
logger.debug(f"[MemberInfoService] 数据库不存在: {self.db_path}")
return []
try:
async with aiosqlite.connect(self.db_path) as db:
columns = await self._get_table_columns(db)
fields = ["wxid", "nickname", "avatar_url"]
if "group_nickname" in columns:
fields.append("group_nickname")
if "signature" in columns:
fields.append("signature")
cursor = await db.execute(
f"SELECT {', '.join(fields)} FROM group_members ORDER BY updated_at DESC"
)
rows = await cursor.fetchall()
result = [
{
"wxid": row[0],
"nickname": row[1],
"avatar_url": row[2] or "",
"group_nickname": row[fields.index("group_nickname")] or "" if "group_nickname" in columns else "",
"signature": row[fields.index("signature")] or "" if "signature" in columns else "",
}
for row in rows
]
logger.debug(f"[MemberInfoService] 获取所有成员: {len(result)}")
return result
except Exception as e:
logger.error(f"[MemberInfoService] 查询所有成员失败: {e}")
return []
async def get_members_by_wxids(self, wxids: List[str]) -> Dict[str, Dict]:
"""
批量获取成员信息
Args:
wxids: wxid 列表
Returns:
{wxid: {"nickname": str, "avatar_url": str}} 字典
"""
if not self.db_path.exists() or not wxids:
return {}
try:
async with aiosqlite.connect(self.db_path) as db:
columns = await self._get_table_columns(db)
fields = ["wxid", "nickname", "avatar_url"]
if "group_nickname" in columns:
fields.append("group_nickname")
if "signature" in columns:
fields.append("signature")
# 构建 IN 查询
placeholders = ",".join("?" * len(wxids))
cursor = await db.execute(
f"SELECT {', '.join(fields)} FROM group_members WHERE wxid IN ({placeholders})",
wxids
)
rows = await cursor.fetchall()
result = {
row[0]: {
"nickname": row[1],
"avatar_url": row[2] or "",
"group_nickname": row[fields.index("group_nickname")] or "" if "group_nickname" in columns else "",
"signature": row[fields.index("signature")] or "" if "signature" in columns else "",
}
for row in rows
}
logger.debug(f"[MemberInfoService] 批量查询: 请求 {len(wxids)} 人,命中 {len(result)}")
return result
except Exception as e:
logger.error(f"[MemberInfoService] 批量查询失败: {e}")
return {}
async def check_member_exists(self, wxid: str) -> bool:
"""
检查成员是否存在于数据库
Args:
wxid: 用户 wxid
Returns:
是否存在
"""
if not self.db_path.exists():
return False
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT 1 FROM group_members WHERE wxid = ? LIMIT 1",
(wxid,)
)
row = await cursor.fetchone()
return row is not None
except Exception as e:
logger.error(f"[MemberInfoService] 检查成员存在失败: {e}")
return False
# 全局单例
_service_instance: Optional[MemberInfoService] = None
def get_member_service() -> MemberInfoService:
"""获取全局单例"""
global _service_instance
if _service_instance is None:
_service_instance = MemberInfoService()
return _service_instance
# 便捷函数
async def get_member_info(wxid: str) -> Optional[Dict]:
"""便捷函数:获取成员信息"""
return await get_member_service().get_member_info(wxid)
async def get_member_nickname(wxid: str) -> Optional[str]:
"""便捷函数:获取成员昵称"""
return await get_member_service().get_member_nickname(wxid)
async def get_member_avatar(wxid: str) -> Optional[str]:
"""便捷函数:获取成员头像"""
return await get_member_service().get_member_avatar(wxid)
async def get_member_signature(wxid: str) -> Optional[str]:
"""便捷函数:获取成员个性签名"""
return await get_member_service().get_member_signature(wxid)