Files

1023 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
群成员信息同步插件
功能:
1. 手动触发同步群成员信息到本地 SQLite 数据库
2. 保存 wxid、用户名、头像 URL
3. 通过 wxid 唯一标识,支持更新已存在的成员信息
"""
import asyncio
import tomllib
import aiosqlite
from pathlib import Path
from typing import Optional
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import (
on_text_message,
on_system_message,
on_chatroom_member_add,
on_chatroom_member_nickname_change,
)
from utils.operation_lock import OperationLock
class MemberSync(PluginBase):
"""群成员信息同步插件"""
# 插件元数据
description = "同步群成员信息到本地数据库"
author = "Assistant"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
self.db_path = None
self._full_sync_task: Optional[asyncio.Task] = None
async def async_init(self):
"""插件异步初始化"""
# 读取配置
config_path = Path(__file__).parent / "config.toml"
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
# 设置数据库路径
db_relative_path = self.config["database"]["db_path"]
self.db_path = Path(__file__).parent / db_relative_path
# 确保数据目录存在
self.db_path.parent.mkdir(parents=True, exist_ok=True)
# 初始化数据库
await self._init_database()
logger.info(f"群成员信息同步插件已加载,数据库: {self.db_path}")
async def _init_database(self):
"""初始化数据库和表结构"""
try:
# 创建表(如果不存在)
await self._create_table()
logger.success(f"数据库初始化成功: {self.db_path}")
except Exception as e:
logger.error(f"初始化数据库失败: {e}")
async def _create_table(self):
"""创建群成员信息表"""
create_table_sql = """
CREATE TABLE IF NOT EXISTS group_members (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chatroom_wxid TEXT NOT NULL,
wxid TEXT NOT NULL,
nickname TEXT NOT NULL,
group_nickname TEXT,
signature TEXT,
avatar_url TEXT,
last_msg_at TEXT,
daily_key TEXT,
daily_count INTEGER DEFAULT 0,
weekly_key TEXT,
weekly_count INTEGER DEFAULT 0,
monthly_key TEXT,
monthly_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(chatroom_wxid, wxid)
)
"""
# 创建索引
create_index_sql = """
CREATE INDEX IF NOT EXISTS idx_chatroom ON group_members(chatroom_wxid)
"""
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute(create_table_sql)
await db.execute(create_index_sql)
await self._ensure_column(db, "group_nickname", "TEXT")
await self._ensure_column(db, "signature", "TEXT")
await self._ensure_column(db, "last_msg_at", "TEXT")
await self._ensure_column(db, "daily_key", "TEXT")
await self._ensure_column(db, "daily_count", "INTEGER DEFAULT 0")
await self._ensure_column(db, "weekly_key", "TEXT")
await self._ensure_column(db, "weekly_count", "INTEGER DEFAULT 0")
await self._ensure_column(db, "monthly_key", "TEXT")
await self._ensure_column(db, "monthly_count", "INTEGER DEFAULT 0")
await db.commit()
logger.success("数据库表检查/创建完成: group_members")
except Exception as e:
logger.error(f"创建数据库表失败: {e}")
async def _ensure_column(self, db: aiosqlite.Connection, column: str, col_type: str):
"""确保指定列存在"""
try:
cursor = await db.execute("PRAGMA table_info(group_members)")
rows = await cursor.fetchall()
columns = {row[1] for row in rows if row and len(row) > 1}
if column not in columns:
await db.execute(f"ALTER TABLE group_members ADD COLUMN {column} {col_type}")
except Exception as e:
logger.warning(f"[MemberSync] 列检查失败: {column} -> {e}")
@on_text_message(priority=50)
async def handle_sync_command(self, bot, message: dict):
"""处理同步命令"""
# 检查是否启用
if not self.config["behavior"]["enabled"]:
return
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
sender_wxid = message.get("SenderWxid", "")
is_group = message.get("IsGroup", False)
# 检查是否是同步命令
update_keywords = self.config["behavior"]["update_keywords"]
update_all_keywords = self.config["behavior"].get("update_all_keywords", [])
if content not in update_keywords and content not in update_all_keywords:
return
# 检查权限(只有管理员可以触发)
admins = self.config["behavior"]["admins"]
operator_wxid = sender_wxid if is_group else from_wxid
if operator_wxid not in admins:
await self._send_text_safe(bot, from_wxid, self.config["messages"]["permission_denied"])
return False
# 全量同步命令
if content in update_all_keywords:
await self._start_full_sync_task(bot, message)
return False
# 已切换为后台任务执行,这里不再走同步分支
return False
OperationLock.pause("member_sync_all")
try:
room_ids = await self._get_full_sync_room_ids(bot, fallback_room=from_wxid)
await bot.send_text(from_wxid, f"已获取群聊数: {len(room_ids)},开始同步...")
result = await self._sync_all_chatrooms(bot, room_ids=room_ids)
success_msg = self.config["messages"]["update_all_success"].format(
groups=result["groups"],
total=result["total"],
added=result["added"],
updated=result["updated"]
)
await bot.send_text(from_wxid, success_msg)
except Exception as e:
logger.error(f"全量同步群成员信息失败: {e}")
error_msg = self.config["messages"]["update_all_error"].format(error=str(e))
await bot.send_text(from_wxid, error_msg)
finally:
OperationLock.resume()
return False
# 单群同步必须在群聊中使用
if not is_group:
await self._send_text_safe(bot, from_wxid, self.config["messages"]["not_in_group"])
return False
# 检查群聊过滤
if not self._should_sync(from_wxid):
return False
# 发送开始消息
await self._send_text_safe(bot, from_wxid, self.config["messages"]["update_start"])
# 执行同步
try:
result = await self._sync_group_members(bot, from_wxid)
# 发送结果消息
success_msg = self.config["messages"]["update_success"].format(
total=result["total"],
added=result["added"],
updated=result["updated"]
)
await self._send_text_safe(bot, from_wxid, success_msg)
except Exception as e:
logger.error(f"同步群成员信息失败: {e}")
error_msg = self.config["messages"]["update_error"].format(error=str(e))
await self._send_text_safe(bot, from_wxid, error_msg)
return False # 阻止后续处理
async def _start_full_sync_task(self, bot, message: dict):
"""启动全量同步后台任务。"""
reply_wxid = self._resolve_reply_target(message)
fallback_room = self._resolve_fallback_room(message)
if self._full_sync_task and not self._full_sync_task.done():
await self._send_text_safe(bot, reply_wxid, "全量同步任务正在执行,请稍后再试")
return
await self._send_text_safe(bot, reply_wxid, self.config["messages"]["update_all_start"])
OperationLock.pause("member_sync_all")
self._full_sync_task = asyncio.create_task(
self._run_full_sync(bot, reply_wxid, fallback_room)
)
def _resolve_reply_target(self, message: dict) -> str:
"""解析命令回复目标,优先回群聊,避免误回私聊。"""
room_wxid = str(message.get("RoomWxid", "") or "")
from_wxid = str(message.get("FromWxid", "") or "")
to_wxid = str(message.get("ToWxid", "") or "")
for wxid in (room_wxid, from_wxid, to_wxid):
if wxid.endswith("@chatroom"):
return wxid
return from_wxid or to_wxid
def _resolve_fallback_room(self, message: dict) -> str:
"""解析可作为全量同步兜底的群 ID。"""
room_wxid = str(message.get("RoomWxid", "") or "")
from_wxid = str(message.get("FromWxid", "") or "")
to_wxid = str(message.get("ToWxid", "") or "")
for wxid in (room_wxid, from_wxid, to_wxid):
if wxid.endswith("@chatroom"):
return wxid
return ""
async def _send_text_safe(self, bot, to_wxid: str, content: str) -> bool:
"""安全发送消息,避免空目标导致异常。"""
target = (to_wxid or "").strip()
if not target:
logger.error(f"[MemberSync] 发送消息失败,目标 wxid 为空: {content}")
return False
try:
result = await bot.send_text(target, content)
if not result:
logger.warning(f"[MemberSync] 发送消息失败: to={target}, content={content}")
return bool(result)
except Exception as e:
logger.error(f"[MemberSync] 发送消息异常: to={target}, error={e}")
return False
async def _run_full_sync(self, bot, reply_wxid: str, fallback_room: str):
"""后台执行全量同步,避免主消息协程超时。"""
try:
room_ids = await self._get_full_sync_room_ids(bot, fallback_room=fallback_room)
await self._send_text_safe(bot, reply_wxid, f"已获取群聊数: {len(room_ids)},开始同步...")
result = await self._sync_all_chatrooms(bot, room_ids=room_ids)
success_msg = self.config["messages"]["update_all_success"].format(
groups=result["groups"],
total=result["total"],
added=result["added"],
updated=result["updated"]
)
await self._send_text_safe(bot, reply_wxid, success_msg)
except asyncio.CancelledError:
logger.warning("[MemberSync] 全量同步任务被取消")
raise
except Exception as e:
logger.error(f"[MemberSync] 全量同步群成员信息失败: {e}")
error_msg = self.config["messages"]["update_all_error"].format(error=str(e))
await self._send_text_safe(bot, reply_wxid, error_msg)
finally:
OperationLock.resume()
self._full_sync_task = None
def _extract_member_wxid(self, member) -> str:
"""从成员条目中提取 wxid"""
if isinstance(member, str):
return member
if not isinstance(member, dict):
return ""
wxid = member.get("wxid") or member.get("userName") or member.get("UserName")
if isinstance(wxid, dict):
return wxid.get("String", "") or wxid.get("string", "")
return str(wxid) if wxid else ""
def _extract_member_nickname(self, member, default: str = "") -> str:
"""从成员条目中提取昵称"""
if not isinstance(member, dict):
return default
nick_name = member.get("nickname") or member.get("nickName") or member.get("display_name")
if isinstance(nick_name, dict):
value = nick_name.get("string", "") or nick_name.get("String", "") or ""
value = value.strip()
return value or default
if isinstance(nick_name, str):
value = nick_name.strip()
return value or default
return default
def _extract_member_group_nickname(self, member, default: str = "") -> str:
"""从成员条目中提取群昵称(群内备注)"""
if not isinstance(member, dict):
return default
display = member.get("displayName") or member.get("display_name")
if isinstance(display, dict):
value = display.get("string", "") or display.get("String", "") or ""
value = value.strip()
return value or default
if isinstance(display, str):
value = display.strip()
return value or default
return default
def _extract_signature(self, user_info, default: str = "") -> str:
"""从联系人详情中提取个性签名"""
if not isinstance(user_info, dict):
return default
signature = user_info.get("signature") or user_info.get("Signature")
if isinstance(signature, dict):
value = signature.get("string", "") or signature.get("String", "") or ""
value = value.strip()
return value or default
if isinstance(signature, str):
value = signature.strip()
return value or default
return default
async def _sync_member_list(self, bot, room_wxid: str, member_list, request_delay: float = 0.2) -> dict:
"""增量同步指定成员列表"""
results = {"total": 0, "added": 0, "updated": 0}
if not member_list:
return results
seen = set()
for member in member_list:
wxid = self._extract_member_wxid(member)
if not wxid or wxid in seen:
continue
seen.add(wxid)
results["total"] += 1
try:
await asyncio.sleep(request_delay)
user_info = await bot.get_group_member_contact(room_wxid, wxid)
nickname = ""
avatar_url = ""
if user_info:
nick_name = user_info.get("nickName", {})
if isinstance(nick_name, dict):
nickname = nick_name.get("string", "") or nick_name.get("String", "")
else:
nickname = nick_name if isinstance(nick_name, str) else ""
big_head = user_info.get("bigHeadImgUrl", "")
if isinstance(big_head, dict):
avatar_url = big_head.get("string", "")
else:
avatar_url = big_head if isinstance(big_head, str) else ""
signature = self._extract_signature(user_info, "")
else:
signature = ""
if not nickname:
nickname = self._extract_member_nickname(member, "")
group_nickname = self._extract_member_group_nickname(member, "")
if not group_nickname:
group_nickname = nickname or None
is_new = await self._save_member(
room_wxid,
wxid,
nickname,
avatar_url,
group_nickname=group_nickname,
signature=signature or None,
)
if is_new:
results["added"] += 1
else:
results["updated"] += 1
except Exception as e:
logger.error(f"增量同步成员失败: {wxid}, error={e}")
return results
@on_chatroom_member_add(priority=45)
async def handle_member_add_event(self, bot, message: dict):
"""处理群成员新增事件(增量更新数据库)"""
if not self.config["behavior"].get("enabled", False):
return
if not self.config["behavior"].get("auto_sync_new_members", False):
return
room_wxid = message.get("RoomWxid", "")
member_list = message.get("MemberList", [])
if not room_wxid or not self._should_sync(room_wxid):
return
if not member_list:
return
request_delay = float(self.config["behavior"].get("request_delay", 0.2))
logger.info(f"[MemberSync] 增量同步新成员: room={room_wxid}, count={len(member_list)}")
await self._sync_member_list(bot, room_wxid, member_list, request_delay=request_delay)
@on_chatroom_member_nickname_change(priority=40)
async def handle_member_nickname_change(self, bot, message: dict):
"""处理群成员昵称变更事件(更新群昵称字段)"""
if not self.config["behavior"].get("enabled", False):
return True
if not self.config["behavior"].get("auto_sync_nickname_change", True):
return True
room_wxid = message.get("RoomWxid", "")
member_list = message.get("MemberList", [])
if not room_wxid or not self._should_sync(room_wxid):
return True
if not member_list:
return True
updated = 0
for member in member_list:
wxid = member.get("wxid") or ""
if not wxid:
continue
group_nickname = (member.get("display_name") or member.get("nickname") or "").strip()
if not group_nickname:
continue
avatar_url = member.get("avatar") or ""
nickname = (member.get("nickname") or "").strip() or group_nickname
await self._save_member(
room_wxid,
wxid,
nickname,
avatar_url,
group_nickname=group_nickname,
)
updated += 1
if updated:
logger.info(f"[MemberSync] 群昵称已更新: room={room_wxid}, count={updated}")
return True
def _extract_chatroom_wxid(self, chatroom_entry) -> str:
"""从好友列表条目中提取群聊 wxid"""
if isinstance(chatroom_entry, str):
return chatroom_entry
if not isinstance(chatroom_entry, dict):
return ""
contact = chatroom_entry.get("contact", chatroom_entry)
username = contact.get("userName", "")
if isinstance(username, dict):
return username.get("String", "")
return str(username) if username else ""
async def _get_known_chatrooms_from_db(self) -> list:
"""从本地数据库获取已知群聊列表"""
if not self.db_path:
return []
if not self.db_path.exists():
return []
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT DISTINCT chatroom_wxid FROM group_members WHERE chatroom_wxid != ''"
)
rows = await cursor.fetchall()
return [row[0] for row in rows if row and row[0]]
except Exception as e:
logger.error(f"[MemberSync] 获取已知群聊失败: {e}")
return []
async def _get_full_sync_room_ids(self, bot, fallback_room: str = "") -> list:
"""获取全量同步的群聊列表(带兜底)"""
chatrooms = await bot.get_chatroom_list(force_refresh=True)
room_ids = []
for chatroom in chatrooms:
wxid = self._extract_chatroom_wxid(chatroom)
if wxid and wxid.endswith("@chatroom") and self._should_sync(wxid):
room_ids.append(wxid)
if not room_ids:
room_ids = await self._get_known_chatrooms_from_db()
if not room_ids:
enabled_groups = self.config["behavior"].get("enabled_groups", [])
room_ids = [room for room in enabled_groups if isinstance(room, str)]
if not room_ids and fallback_room and fallback_room.endswith("@chatroom"):
room_ids = [fallback_room]
room_ids = [room for room in room_ids if room.endswith("@chatroom") and self._should_sync(room)]
room_ids = list(dict.fromkeys(room_ids))
if not room_ids:
raise Exception("未获取到任何群聊")
return room_ids
async def _sync_all_chatrooms(self, bot, room_ids: Optional[list] = None, fallback_room: str = "") -> dict:
"""同步机器人加入的所有群成员信息(串行,带延迟)"""
if not self.db_path:
raise Exception("数据库未初始化")
if room_ids is None:
room_ids = await self._get_full_sync_room_ids(bot, fallback_room=fallback_room)
total = 0
added = 0
updated = 0
groups = 0
for room_wxid in room_ids:
logger.info(f"开始同步群成员信息(全量): {room_wxid}")
result = await self._sync_group_members_serial(bot, room_wxid, request_delay=0.2)
groups += 1
total += result["total"]
added += result["added"]
updated += result["updated"]
logger.success(
f"全量同步完成: 群聊数={groups}, 总计={total}, 新增={added}, 更新={updated}"
)
return {"groups": groups, "total": total, "added": added, "updated": updated}
async def _sync_group_members_serial(self, bot, room_wxid: str, request_delay: float = 0.2) -> dict:
"""
同步群成员信息到数据库(串行 + 固定延迟)
Args:
bot: WechatHookClient 实例
room_wxid: 群聊 ID
request_delay: 每次 API 调用延迟(秒)
Returns:
dict: 同步结果统计 {total, added, updated}
"""
wxids, total, basic_map = await self._get_room_members_snapshot(bot, room_wxid)
if not wxids:
raise Exception("无法获取群成员列表")
logger.info(f"开始串行同步群成员信息: {room_wxid}, 成员数: {total}")
results = {"added": 0, "updated": 0}
for wxid in wxids:
try:
await asyncio.sleep(request_delay)
user_info = await bot.get_group_member_contact(room_wxid, wxid)
if user_info:
nick_name = user_info.get("nickName", {})
if isinstance(nick_name, dict):
nickname = nick_name.get("string", "")
else:
nickname = nick_name if isinstance(nick_name, str) else ""
big_head = user_info.get("bigHeadImgUrl", "")
if isinstance(big_head, dict):
avatar_url = big_head.get("string", "")
else:
avatar_url = big_head if isinstance(big_head, str) else ""
signature = self._extract_signature(user_info, "")
else:
basic = basic_map.get(wxid, {})
nickname = basic.get("nickname", "")
avatar_url = basic.get("avatar_url", "")
signature = ""
if not nickname or not avatar_url:
basic = basic_map.get(wxid, {})
nickname = nickname or basic.get("nickname", "")
avatar_url = avatar_url or basic.get("avatar_url", "")
group_nickname = basic_map.get(wxid, {}).get("group_nickname") or ""
if not group_nickname:
group_nickname = nickname or None
is_new = await self._save_member(
room_wxid,
wxid,
nickname,
avatar_url,
group_nickname=group_nickname,
signature=signature or None,
)
if is_new:
results["added"] += 1
else:
results["updated"] += 1
except Exception as e:
logger.error(f"同步成员 {wxid} 失败: {e}")
logger.success(
f"{room_wxid} 同步完成: 总计={total}, 新增={results['added']}, 更新={results['updated']}"
)
return {"total": total, "added": results["added"], "updated": results["updated"]}
async def _sync_group_members(self, bot, room_wxid: str) -> dict:
"""
同步群成员信息到数据库(并发处理)
Args:
bot: WechatHookClient 实例
room_wxid: 群聊 ID
Returns:
dict: 同步结果统计 {total, added, updated}
"""
if not self.db_path:
raise Exception("数据库未初始化")
wxids, total, basic_map = await self._get_room_members_snapshot(bot, room_wxid)
if not wxids:
raise Exception("无法获取群成员列表")
logger.info(f"开始同步群成员信息: {room_wxid}, 成员数: {total}")
# 获取并发数配置
max_concurrency = self.config["behavior"].get("max_concurrency", 10)
semaphore = asyncio.Semaphore(max_concurrency)
# 用于统计的计数器
results = {"added": 0, "updated": 0}
async def process_member(wxid: str):
if not wxid:
return
async with semaphore:
try:
# 添加请求延迟避免API过载
request_delay = self.config["behavior"].get("request_delay", 0.1)
await asyncio.sleep(request_delay)
# 获取成员详细信息(包括头像)
user_info = await bot.get_group_member_contact(room_wxid, wxid)
if user_info:
# 提取昵称
nick_name = user_info.get("nickName", {})
if isinstance(nick_name, dict):
nickname = nick_name.get("string", "")
else:
nickname = nick_name if isinstance(nick_name, str) else ""
# 提取头像 URL
big_head = user_info.get("bigHeadImgUrl", "")
if isinstance(big_head, dict):
avatar_url = big_head.get("string", "")
else:
avatar_url = big_head if isinstance(big_head, str) else ""
signature = self._extract_signature(user_info, "")
else:
# 如果获取详细信息失败,使用基本信息
basic = basic_map.get(wxid, {})
nickname = basic.get("nickname", "")
avatar_url = basic.get("avatar_url", "")
signature = ""
if not nickname or not avatar_url:
basic = basic_map.get(wxid, {})
nickname = nickname or basic.get("nickname", "")
avatar_url = avatar_url or basic.get("avatar_url", "")
group_nickname = basic_map.get(wxid, {}).get("group_nickname") or ""
if not group_nickname:
group_nickname = nickname or None
# 保存到数据库
is_new = await self._save_member(
room_wxid,
wxid,
nickname,
avatar_url,
group_nickname=group_nickname,
signature=signature or None,
)
if is_new:
results["added"] += 1
else:
results["updated"] += 1
logger.debug(f"同步成员: {nickname} ({wxid}), 新增={is_new}")
except Exception as e:
logger.error(f"同步成员 {wxid} 失败: {e}")
# 并发处理所有成员
tasks = [process_member(wxid) for wxid in wxids]
await asyncio.gather(*tasks)
logger.success(f"同步完成: 总计={total}, 新增={results['added']}, 更新={results['updated']}")
return {
"total": total,
"added": results["added"],
"updated": results["updated"]
}
async def _get_room_members_snapshot(self, bot, room_wxid: str) -> tuple:
"""获取群成员 wxid 列表与基础信息映射(尽量全量)"""
basic_map = {}
wxids = []
total = 0
info = await bot.get_chatroom_info(room_wxid)
if info:
new_data = info.get("newChatroomData", {})
for m in new_data.get("chatRoomMember", []) or []:
wxid = m.get("userName", "") or m.get("wxid", "")
if not wxid:
continue
basic_map[wxid] = {
"nickname": m.get("nickName", "") or "",
"avatar_url": m.get("bigHeadImgUrl", "") or "",
"group_nickname": self._extract_member_group_nickname(m, ""),
}
all_list = info.get("allMemberUserNameList") or []
if all_list:
for entry in all_list:
if isinstance(entry, dict):
wxid = entry.get("String", "") or entry.get("string", "")
else:
wxid = str(entry)
if wxid:
wxids.append(wxid)
else:
wxids = list(basic_map.keys())
if wxids:
wxids = list(dict.fromkeys(wxids))
total = int(info.get("allMemberCount") or len(wxids))
if total and len(wxids) < total:
logger.warning(
f"[MemberSync] 群成员数量不完整,尝试补全: room={room_wxid}, all={total}, listed={len(wxids)}"
)
members = await bot.get_chatroom_members(room_wxid)
for m in members:
mwxid = m.get("wxid") or m.get("userName") or ""
if not mwxid:
continue
if mwxid not in basic_map:
basic_map[mwxid] = {
"nickname": m.get("nickname", "") or "",
"avatar_url": m.get("avatar", "") or "",
"group_nickname": self._extract_member_group_nickname(m, ""),
}
wxids.append(mwxid)
if wxids:
wxids = list(dict.fromkeys(wxids))
return wxids, total, basic_map
members = await bot.get_chatroom_members(room_wxid)
for m in members:
wxid = m.get("wxid", "") or m.get("userName", "")
if not wxid:
continue
wxids.append(wxid)
basic_map[wxid] = {
"nickname": m.get("nickname", "") or m.get("nickName", "") or "",
"avatar_url": m.get("avatar", "") or m.get("bigHeadImgUrl", "") or "",
"group_nickname": self._extract_member_group_nickname(m, ""),
}
total = len(wxids)
return wxids, total, basic_map
async def _save_member(
self,
chatroom_wxid: str,
wxid: str,
nickname: str,
avatar_url: str,
group_nickname: Optional[str] = None,
signature: Optional[str] = None,
) -> bool:
"""
保存或更新成员信息到数据库
Args:
chatroom_wxid: 群聊 ID
wxid: 微信 ID
nickname: 昵称
avatar_url: 头像 URL
group_nickname: 群昵称(群内备注)
signature: 个性签名
Returns:
bool: True=新增, False=更新
"""
try:
async with aiosqlite.connect(self.db_path) as db:
# 先检查是否存在
cursor = await db.execute(
"SELECT id FROM group_members WHERE chatroom_wxid = ? AND wxid = ?",
(chatroom_wxid, wxid)
)
existing = await cursor.fetchone()
if existing:
# 更新
fields = ["nickname = ?", "avatar_url = ?"]
params = [nickname, avatar_url]
if group_nickname is not None:
fields.append("group_nickname = ?")
params.append(group_nickname)
if signature is not None:
fields.append("signature = ?")
params.append(signature)
fields.append("updated_at = CURRENT_TIMESTAMP")
sql = (
"UPDATE group_members SET "
+ ", ".join(fields)
+ " WHERE chatroom_wxid = ? AND wxid = ?"
)
params.extend([chatroom_wxid, wxid])
await db.execute(sql, tuple(params))
await db.commit()
return False
else:
# 插入
await db.execute(
"""INSERT INTO group_members
(chatroom_wxid, wxid, nickname, group_nickname, signature, avatar_url)
VALUES (?, ?, ?, ?, ?, ?)""",
(chatroom_wxid, wxid, nickname, group_nickname, signature, avatar_url)
)
await db.commit()
return True
except Exception as e:
logger.error(f"保存成员信息失败: wxid={wxid}, error={e}")
raise
def _should_sync(self, room_wxid: str) -> bool:
"""判断是否应该同步该群"""
enabled_groups = self.config["behavior"]["enabled_groups"]
disabled_groups = self.config["behavior"]["disabled_groups"]
# 如果在禁用列表中,不同步
if room_wxid in disabled_groups:
return False
# 如果启用列表为空,对所有群生效
if not enabled_groups:
return True
# 否则只对启用列表中的群生效
return room_wxid in enabled_groups
@on_system_message(priority=50)
async def handle_member_join(self, bot, message: dict):
"""处理新成员加入事件"""
logger.debug(f"[MemberSync] 收到系统消息")
# 检查是否启用自动同步
if not self.config["behavior"].get("auto_sync_new_members", False):
logger.debug(f"[MemberSync] 自动同步未启用")
return True
# 检查是否启用插件
if not self.config["behavior"]["enabled"]:
logger.debug(f"[MemberSync] 插件未启用")
return True
content = message.get("Content", "")
from_wxid = message.get("FromWxid", "")
is_group = message.get("IsGroup", False)
logger.debug(f"[MemberSync] 消息内容: {content}, 群聊: {is_group}")
# 必须是群聊消息
if not is_group:
logger.debug(f"[MemberSync] 不是群聊消息")
return True
# 检查群聊过滤
if not self._should_sync(from_wxid):
logger.debug(f"[MemberSync] 群聊 {from_wxid} 不在同步列表中")
return True
# 检查是否是成员加入消息
join_keywords = ["加入群聊", "加入了群聊", "invited", "邀请", "通过扫描"]
if not any(keyword in content for keyword in join_keywords):
logger.debug(f"[MemberSync] 不是成员加入消息")
return True
logger.info(f"[MemberSync] 检测到新成员加入: {from_wxid}, 消息: {content}")
# 异步执行同步(不阻塞消息处理)
asyncio.create_task(self._sync_new_members(bot, from_wxid, content))
return True
async def _sync_new_members(self, bot, room_wxid: str, content: str):
"""同步新加入的成员信息"""
try:
# 等待一小段时间,确保微信服务器已更新成员列表
await asyncio.sleep(2)
# 获取最新的群成员列表(尽量全量)
wxids, _, basic_map = await self._get_room_members_snapshot(bot, room_wxid)
if not wxids:
logger.warning(f"无法获取群成员列表: {room_wxid}")
return
logger.info(f"开始同步新成员信息: {room_wxid}")
# 遍历所有成员,检查数据库中是否存在
new_count = 0
request_delay = float(self.config["behavior"].get("request_delay", 0.2))
for wxid in wxids:
if not wxid:
continue
# 检查数据库中是否已存在
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT id FROM group_members WHERE wxid = ?",
(wxid,)
)
existing = await cursor.fetchone()
# 如果不存在,则同步该成员
if not existing:
try:
# 添加延迟避免API过载
await asyncio.sleep(request_delay)
# 获取成员详细信息
user_info = await bot.get_group_member_contact(room_wxid, wxid)
if user_info:
# 提取昵称
nick_name = user_info.get("nickName", {})
if isinstance(nick_name, dict):
nickname = nick_name.get("string", "")
else:
nickname = nick_name if isinstance(nick_name, str) else ""
# 提取头像 URL
big_head = user_info.get("bigHeadImgUrl", "")
if isinstance(big_head, dict):
avatar_url = big_head.get("string", "")
else:
avatar_url = big_head if isinstance(big_head, str) else ""
signature = self._extract_signature(user_info, "")
else:
basic = basic_map.get(wxid, {})
nickname = basic.get("nickname", "")
avatar_url = basic.get("avatar_url", "")
signature = ""
if not nickname or not avatar_url:
basic = basic_map.get(wxid, {})
nickname = nickname or basic.get("nickname", "")
avatar_url = avatar_url or basic.get("avatar_url", "")
# 保存到数据库
group_nickname = basic_map.get(wxid, {}).get("group_nickname") or ""
if not group_nickname:
group_nickname = nickname or None
await self._save_member(
room_wxid,
wxid,
nickname,
avatar_url,
group_nickname=group_nickname,
signature=signature or None,
)
new_count += 1
logger.info(f"自动同步新成员: {nickname} ({wxid})")
except Exception as e:
logger.error(f"同步新成员 {wxid} 失败: {e}")
if new_count > 0:
logger.success(f"自动同步完成: 新增 {new_count} 个成员")
except Exception as e:
logger.error(f"自动同步新成员失败: {e}")
async def cleanup(self):
if self._full_sync_task and not self._full_sync_task.done():
self._full_sync_task.cancel()
try:
await self._full_sync_task
except asyncio.CancelledError:
logger.info("[MemberSync] 全量同步后台任务已取消")
"""插件清理"""
logger.info("MemberSync 插件已清理")