"""
群昵称变动通知插件
检测群成员群昵称变更并发送卡片通知。
"""
from __future__ import annotations
import asyncio
import base64
import html
import json
from pathlib import Path
from typing import Dict, List, Optional
import aiohttp
import aiosqlite
import tomllib
from loguru import logger
from utils.decorators import on_chatroom_member_nickname_change
from utils.plugin_base import PluginBase
from WechatHook import WechatHookClient
HTML_RENDERER_AVAILABLE = False
try:
from plugins.SignInPlugin.html_renderer import HtmlRenderer
HTML_RENDERER_AVAILABLE = True
except Exception:
logger.warning("GroupNicknameNotify: HTML 渲染器导入失败,将无法生成图片")
class GroupNicknameNotify(PluginBase):
"""群成员昵称变动通知插件"""
description = "群成员昵称变更时发送卡片通知"
author = "Codex"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config: Dict = {}
self.enabled_groups: List[str] = []
self.disabled_groups: List[str] = []
self.preload_cache = False
self.cleanup_image = True
self.render_timeout = 12
self.avatar_timeout = 4
self.use_member_sync_db = True
self.cache: Dict[str, Dict[str, str]] = {}
self.member_sync_db_path: Optional[Path] = None
base_dir = Path(__file__).parent
self.temp_dir = base_dir / "temp"
self.templates_dir = base_dir / "templates"
self.images_dir = base_dir / "images"
self.cache_path = base_dir / "cache.json"
self.html_renderer: Optional[HtmlRenderer] = None
async def async_init(self):
"""异步初始化"""
config_path = Path(__file__).parent / "config.toml"
if config_path.exists():
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
behavior = self.config.get("behavior", {})
self.enabled_groups = behavior.get("enabled_groups", [])
self.disabled_groups = behavior.get("disabled_groups", [])
self.preload_cache = bool(behavior.get("preload_cache", False))
data_cfg = self.config.get("data", {})
self.use_member_sync_db = bool(data_cfg.get("use_member_sync_db", True))
render_cfg = self.config.get("render", {})
self.cleanup_image = bool(render_cfg.get("cleanup_image", True))
use_html = bool(render_cfg.get("use_html", True))
self.render_timeout = int(render_cfg.get("render_timeout", self.render_timeout))
self.avatar_timeout = int(render_cfg.get("avatar_timeout", self.avatar_timeout))
self.temp_dir.mkdir(exist_ok=True)
self.templates_dir.mkdir(exist_ok=True)
self.images_dir.mkdir(exist_ok=True)
self._load_cache()
self._resolve_member_sync_db_path()
if use_html and HTML_RENDERER_AVAILABLE:
self.html_renderer = HtmlRenderer(
template_dir=self.templates_dir,
output_dir=self.temp_dir,
images_dir=self.images_dir,
bg_source=render_cfg.get("bg_source", "local"),
bg_api_url=render_cfg.get("bg_api_url", ""),
)
logger.info("GroupNicknameNotify: HTML 渲染已启用")
else:
self.html_renderer = None
logger.warning("GroupNicknameNotify: HTML 渲染不可用")
logger.success("GroupNicknameNotify 插件初始化完成")
async def on_enable(self, bot=None):
await super().on_enable(bot)
if self.preload_cache and bot:
await self._refresh_cache(bot)
def _load_cache(self):
if not self.cache_path.exists():
return
try:
raw = self.cache_path.read_text(encoding="utf-8")
data = json.loads(raw) if raw else {}
if isinstance(data, dict):
self.cache = data
except Exception as e:
logger.warning(f"GroupNicknameNotify: 读取缓存失败: {e}")
def _save_cache(self):
try:
self.cache_path.write_text(json.dumps(self.cache, ensure_ascii=False), encoding="utf-8")
except Exception as e:
logger.warning(f"GroupNicknameNotify: 保存缓存失败: {e}")
def _resolve_member_sync_db_path(self):
if not self.use_member_sync_db:
return
plugins_dir = Path(__file__).parent.parent
config_path = plugins_dir / "MemberSync" / "config.toml"
if not config_path.exists():
return
try:
with open(config_path, "rb") as f:
cfg = tomllib.load(f)
db_rel = str(cfg.get("database", {}).get("db_path", "")).strip()
if not db_rel:
return
db_path = plugins_dir / "MemberSync" / db_rel
if db_path.exists():
self.member_sync_db_path = db_path
except Exception as e:
logger.debug(f"GroupNicknameNotify: 解析 MemberSync 配置失败: {e}")
async def _get_member_sync_avatar(self, room_wxid: str, wxid: str) -> str:
if not self.use_member_sync_db or not self.member_sync_db_path:
return ""
try:
async with aiosqlite.connect(self.member_sync_db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"""SELECT avatar_url
FROM group_members
WHERE chatroom_wxid = ? AND wxid = ?
ORDER BY updated_at DESC
LIMIT 1""",
(room_wxid, wxid),
)
row = await cursor.fetchone()
if not row:
return ""
return row["avatar_url"] or ""
except Exception as e:
logger.debug(f"GroupNicknameNotify: 查询 MemberSync 失败: {e}")
return ""
async def _get_member_sync_group_nickname(self, room_wxid: str, wxid: str) -> str:
if not self.use_member_sync_db or not self.member_sync_db_path:
return ""
try:
async with aiosqlite.connect(self.member_sync_db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"""SELECT group_nickname, nickname
FROM group_members
WHERE chatroom_wxid = ? AND wxid = ?
ORDER BY updated_at DESC
LIMIT 1""",
(room_wxid, wxid),
)
row = await cursor.fetchone()
if not row:
return ""
group_nickname = row["group_nickname"] or ""
if group_nickname:
return group_nickname
return row["nickname"] or ""
except Exception as e:
logger.debug(f"GroupNicknameNotify: 查询 MemberSync 群昵称失败: {e}")
return ""
def _should_notify(self, room_wxid: str) -> bool:
if room_wxid in self.disabled_groups:
return False
if not self.enabled_groups:
return True
return room_wxid in self.enabled_groups
def _normalize_display_name(self, member: dict) -> str:
display_name = self._clean_name(member.get("display_name") or "")
nickname = self._clean_name(member.get("nickname") or "")
name = display_name or nickname
if not name:
name = str(member.get("wxid") or "")
return name
def _clean_name(self, name: str) -> str:
if not name:
return ""
return str(name).strip()
def _truncate_text(self, text: str, max_len: int = 14) -> str:
text = text or ""
if len(text) <= max_len:
return text
return text[: max_len - 1] + "…"
def _build_avatar_html(self, avatar_url: str, fallback_text: str) -> str:
if avatar_url:
safe_url = html.escape(avatar_url, quote=True)
return f''
fallback = html.escape((fallback_text or "?")[:1])
return f'