""" MemesAPI 插件 - meme-generator-rs API 接入 功能: - 群聊中通过 @ / 引用消息触发表情 - 头像优先使用 MemberSync 数据库,必要时调用 Hook API 兜底 - 表情配置文件化(plugins/MemesAPI/memes/*.toml) """ import asyncio import base64 import json import re import time import uuid from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional, Tuple import aiohttp import tomllib from loguru import logger from utils.plugin_base import PluginBase from utils.decorators import on_text_message, on_quote_message from utils.member_info_service import get_member_service @dataclass class MemeConfig: key: str remark: str triggers: List[str] image_count: int default_texts: List[str] default_options: Dict[str, object] option_types: Dict[str, str] additional: Dict[str, List[str]] file_path: Path class MemesAPI(PluginBase): """meme-generator-rs API 表情插件""" description = "meme-generator-rs 表情包插件" author = "Assistant" version = "1.0.0" def __init__(self): super().__init__() self.config = None self.base_url = "http://127.0.0.1:2233" self.timeout = 20 self.use_base64_fallback = True self.upload_mode = "auto" self.enabled = True self.reload_interval = 60 self.max_at_users = 2 self.send_gif_as_file = True self.gif_send_mode = "file" self.gif_inline_max_kb = 2048 self.gif_optimize = True self.gif_force_image_max_kb = 2048 self.gif_optimize_max_dim = 360 self.avatar_missing_tip = "头像获取失败,请稍后重试" self.api_failed_tip = "表情服务不可用,请稍后再试" self.memes_dir = Path(__file__).parent / "memes" self.temp_dir = Path(__file__).parent / "temp" self._memes: List[MemeConfig] = [] self._trigger_index: List[Tuple[str, MemeConfig]] = [] self._last_load_ts = 0.0 self._session: Optional[aiohttp.ClientSession] = None async def async_init(self): """异步初始化""" try: config_path = Path(__file__).parent / "config.toml" with open(config_path, "rb") as f: self.config = tomllib.load(f) api_config = self.config.get("api", {}) behavior_config = self.config.get("behavior", {}) reply_config = self.config.get("reply", {}) self.base_url = api_config.get("base_url", self.base_url).rstrip("/") self.timeout = api_config.get("timeout", self.timeout) self.use_base64_fallback = api_config.get("use_base64_fallback", True) self.upload_mode = str(api_config.get("upload_mode", "auto")).strip().lower() if self.upload_mode not in {"auto", "url", "data"}: self.upload_mode = "auto" self.enabled = behavior_config.get("enabled", True) self.reload_interval = behavior_config.get("reload_interval", 60) self.max_at_users = behavior_config.get("max_at_users", 2) self.send_gif_as_file = behavior_config.get("send_gif_as_file", True) self.gif_send_mode = str(behavior_config.get("gif_send_mode", "")).strip().lower() self.gif_inline_max_kb = int(behavior_config.get("gif_inline_max_kb", 2048)) self.gif_optimize = bool(behavior_config.get("gif_optimize", True)) self.gif_force_image_max_kb = int(behavior_config.get("gif_force_image_max_kb", 2048)) self.gif_optimize_max_dim = int(behavior_config.get("gif_optimize_max_dim", 360)) if not self.gif_send_mode: self.gif_send_mode = "file" if self.send_gif_as_file else "image" if self.gif_send_mode not in {"file", "image", "auto"}: self.gif_send_mode = "file" if self.gif_inline_max_kb < 0: self.gif_inline_max_kb = 0 if self.gif_force_image_max_kb < 0: self.gif_force_image_max_kb = 0 if self.gif_optimize_max_dim < 0: self.gif_optimize_max_dim = 0 self.avatar_missing_tip = reply_config.get("avatar_missing_tip", self.avatar_missing_tip) self.api_failed_tip = reply_config.get("api_failed_tip", self.api_failed_tip) self.memes_dir.mkdir(parents=True, exist_ok=True) self.temp_dir.mkdir(parents=True, exist_ok=True) self._ensure_example_meme() await self._load_memes() logger.success("[MemesAPI] 插件初始化完成") except Exception as e: logger.error(f"[MemesAPI] 初始化失败: {e}") self.enabled = False async def on_disable(self): await super().on_disable() await self._close_session() def _ensure_example_meme(self): """确保至少有一个示例配置""" try: existing = list(self.memes_dir.glob("*.toml")) if existing: return example_path = self.memes_dir / "petpet.toml" if example_path.exists(): return example_path.write_text( """key = "petpet" remark = "摸头表情" triggers = ["摸", "摸摸"] image_count = 1 default_texts = [] """, encoding="utf-8", ) logger.info("[MemesAPI] 已生成示例表情配置: petpet.toml") except Exception as e: logger.warning(f"[MemesAPI] 生成示例配置失败: {e}") async def _load_memes(self): """加载表情配置""" memes: List[MemeConfig] = [] for file_path in self.memes_dir.glob("*.toml"): try: with open(file_path, "rb") as f: data = tomllib.load(f) key = str(data.get("key", "")).strip() if not key: logger.warning(f"[MemesAPI] 跳过无 key 配置: {file_path.name}") continue remark = str(data.get("remark", "")).strip() triggers = data.get("triggers", []) if isinstance(triggers, str): triggers = [triggers] triggers = [str(t).strip() for t in triggers if str(t).strip()] image_count = int(data.get("image_count", 1)) default_texts = data.get("default_texts", []) if isinstance(default_texts, str): default_texts = [default_texts] default_texts = [str(t) for t in default_texts] default_options, option_types = self._parse_default_options( data.get("default_options", {}), data.get("option_types", {}), ) additional = self._parse_additional( data.get("Additional", data.get("additional", {})) ) memes.append( MemeConfig( key=key, remark=remark, triggers=triggers, image_count=image_count, default_texts=default_texts, default_options=default_options, option_types=option_types, additional=additional, file_path=file_path, ) ) except Exception as e: logger.error(f"[MemesAPI] 解析配置失败: {file_path.name} -> {e}") self._memes = memes self._build_trigger_index() self._last_load_ts = time.time() logger.info(f"[MemesAPI] 已加载表情配置: {len(self._memes)} 个") def _build_trigger_index(self): index: List[Tuple[str, MemeConfig]] = [] for meme in self._memes: for trigger in meme.triggers: if trigger: index.append((trigger.lower(), meme)) index.sort(key=lambda item: len(item[0]), reverse=True) self._trigger_index = index def _parse_default_options( self, default_options: dict, option_types: dict ) -> Tuple[Dict[str, object], Dict[str, str]]: options: Dict[str, object] = {} types: Dict[str, str] = {} if isinstance(option_types, dict): for key, value in option_types.items(): key_name = str(key).strip() if not key_name: continue types[key_name] = str(value).strip().lower() if isinstance(default_options, dict): for key, value in default_options.items(): key_name = str(key).strip() if not key_name: continue # 支持 key 形如 circle[boolean](注意 toml 需用引号) match = re.match(r"^(?P.+?)\[(?P.+?)\]$", key_name) if match: name = match.group("name").strip() opt_type = match.group("type").strip().lower() if name: types.setdefault(name, opt_type) options[name] = value else: options[key_name] = value return options, types def _parse_additional(self, additional: dict) -> Dict[str, List[str]]: result: Dict[str, List[str]] = {} if not isinstance(additional, dict): return result for key, value in additional.items(): name = str(key).strip() if not name: continue if isinstance(value, list): result[name] = [str(v).strip() for v in value if str(v).strip()] else: result[name] = [str(value).strip()] return result def _coerce_option_value(self, value: str, opt_type: str): opt_type = (opt_type or "").lower() if opt_type == "boolean": val = value.strip().lower() if val in {"1", "true", "yes", "y", "on"}: return True if val in {"0", "false", "no", "n", "off"}: return False return None if opt_type == "integer": try: return int(value) except Exception: return None if opt_type == "float": try: return float(value) except Exception: return None if opt_type == "string": return value return value def _render_value(self, value, replacements: Optional[Dict[str, str]] = None): if not isinstance(value, str): return value from datetime import datetime today = datetime.now().strftime("%Y-%m-%d") result = value result = result.replace("{date}", today) result = result.replace("${date}", today) if replacements: for key, val in replacements.items(): token = "{" + key + "}" token2 = "${" + key + "}" result = result.replace(token, str(val)) result = result.replace(token2, str(val)) return result def _build_options_and_texts( self, meme: MemeConfig, params: List[str], replacements: Optional[Dict[str, str]] = None ) -> Tuple[Dict[str, object], List[str]]: options: Dict[str, object] = dict(meme.default_options or {}) if not params: return {k: self._render_value(v, replacements) for k, v in options.items()}, [] # 1) 处理形如 name=value / name:value used_tokens: set = set() explicit_keys: set = set() for token in params: if "=" in token or ":" in token: sep = "=" if "=" in token else ":" name, raw_val = token.split(sep, 1) name = name.strip() raw_val = raw_val.strip() if not name: continue opt_type = meme.option_types.get(name, "") coerced = self._coerce_option_value(raw_val, opt_type) if opt_type else raw_val if coerced is not None: options[name] = coerced explicit_keys.add(name) used_tokens.add(token) # 2) 处理 Additional 映射(主要用于 boolean) for token in params: token = token.strip() if not token: continue for opt_name, values in (meme.additional or {}).items(): if not values: continue if token in values: opt_type = meme.option_types.get(opt_name, "") if opt_type == "boolean": if token == values[0]: options[opt_name] = True elif len(values) > 1 and token == values[1]: options[opt_name] = False explicit_keys.add(opt_name) else: options[opt_name] = token explicit_keys.add(opt_name) used_tokens.add(token) text_tokens = [ t for t in params if t not in used_tokens and "=" not in t and ":" not in t ] # 3) 仅一个 string 选项时:允许直接传参(如 “戒导 2024-01-01”) string_options = [k for k, t in (meme.option_types or {}).items() if t == "string"] if len(string_options) == 1: string_key = string_options[0] if string_key not in explicit_keys: if text_tokens: options[string_key] = " ".join(text_tokens) text_tokens = [] return {k: self._render_value(v, replacements) for k, v in options.items()}, text_tokens async def _maybe_reload(self): if self.reload_interval <= 0: return if time.time() - self._last_load_ts >= self.reload_interval: await self._load_memes() def _normalize_text(self, text: str) -> str: if not text: return "" text = text.strip() if not text: return "" text = re.sub(r"@[^\s\u2005]+", "", text) text = re.sub(r"\s+", " ", text) return text.strip() def _match_meme(self, text: str) -> Tuple[Optional[MemeConfig], List[str]]: if not text: return None, [] tokens = text.split(" ") if not tokens: return None, [] first = tokens[0].lower() for trigger, meme in self._trigger_index: if trigger and first == trigger: return meme, tokens[1:] return None, [] def _extract_string(self, value) -> str: if isinstance(value, dict): return value.get("String") or value.get("string") or value.get("value") or "" if value is None: return "" return str(value) def _extract_msg_source(self, raw: dict) -> str: for key in ("msgSource", "msgsource", "MsgSource", "msg_source"): if key in raw: return self._extract_string(raw.get(key)) return "" def _extract_xml_payload(self, content: str) -> str: if not content or not isinstance(content, str): return "" text = content.strip() if not text: return "" if text.startswith("= 0: return text[xml_start:] return "" def _parse_at_wxids_from_msgsource(self, msg_source: str) -> List[str]: if not msg_source: return [] msg_source = msg_source.strip() if not msg_source: return [] try: import xml.etree.ElementTree as ET root = ET.fromstring(msg_source) at_text = root.findtext(".//atuserlist") or root.findtext(".//atUserList") or "" if not at_text: return [] parts = [p.strip() for p in re.split(r"[,,;;]", at_text) if p.strip()] return [p for p in parts if p and p != "notify@all"] except Exception: return [] def _parse_at_names_from_text(self, text: str) -> List[str]: if not text: return [] names = re.findall(r"@([^\s\u2005]+)", text) result = [] for name in names: name = name.strip() if name and name not in result: result.append(name) return result async def _resolve_at_names(self, room_wxid: str, at_names: List[str]) -> List[str]: if not room_wxid or not at_names: return [] try: member_service = get_member_service() members = await member_service.get_chatroom_members(room_wxid) if not members: return [] wxids: List[str] = [] for name in at_names: matched = None for member in members: if name == member.get("group_nickname") or name == member.get("nickname"): matched = member.get("wxid") break if matched and matched not in wxids: wxids.append(matched) return wxids except Exception as e: logger.debug(f"[MemesAPI] @昵称解析失败: {e}") return [] async def _extract_at_wxids(self, message: dict, room_wxid: str, content: str) -> List[str]: ats = message.get("Ats", []) if isinstance(ats, list) and ats: return [a for a in ats if a and a != "notify@all"] raw = message.get("_raw", {}) or {} msg_source = self._extract_msg_source(raw) wxids = self._parse_at_wxids_from_msgsource(msg_source) if wxids: return wxids # 兜底:用昵称匹配(可能有重名) at_names = self._parse_at_names_from_text(content) if at_names: return await self._resolve_at_names(room_wxid, at_names) return [] def _extract_quote_sender(self, message: dict) -> str: sender = message.get("QuoteSender") or "" if sender: return sender content = message.get("Content", "") xml_payload = self._extract_xml_payload(content) if not xml_payload: raw = message.get("_raw", {}) or {} raw_content = raw.get("content") or raw.get("Content") xml_payload = self._extract_xml_payload(self._extract_string(raw_content)) if not xml_payload: return "" try: import xml.etree.ElementTree as ET root = ET.fromstring(xml_payload) refer = root.find(".//refermsg") if refer is None: return "" # 群引用优先使用 chatusr(真实被引用的发送者) chatusr = refer.findtext("chatusr") or "" if chatusr: return chatusr fromusr = ( refer.findtext("fromusr") or refer.findtext("fromusername") or refer.findtext("fromuser") or refer.findtext("fromusrname") or refer.findtext("username") or "" ) return fromusr except Exception: return "" def _extract_quote_text(self, message: dict) -> str: title = message.get("QuoteTitle") or "" if title: return title.strip() content = message.get("Content", "") xml_payload = self._extract_xml_payload(content) if not xml_payload: raw = message.get("_raw", {}) or {} raw_content = raw.get("content") or raw.get("Content") xml_payload = self._extract_xml_payload(self._extract_string(raw_content)) if not xml_payload: return "" try: import xml.etree.ElementTree as ET root = ET.fromstring(xml_payload) title = root.findtext(".//appmsg/title") or root.findtext(".//title") or "" return title.strip() except Exception: return "" async def _resolve_quote_wxid(self, room_wxid: str, quote_sender: str, quote_display: str) -> str: if quote_sender and not quote_sender.endswith("@chatroom"): return quote_sender if not room_wxid or not quote_display: return "" try: member_service = get_member_service() members = await member_service.get_chatroom_members(room_wxid) for member in members or []: if quote_display == member.get("group_nickname") or quote_display == member.get("nickname"): return member.get("wxid") or "" except Exception: pass return "" def _extract_quote_displayname(self, message: dict) -> str: content = message.get("Content", "") xml_payload = self._extract_xml_payload(content) if not xml_payload: raw = message.get("_raw", {}) or {} raw_content = raw.get("content") or raw.get("Content") xml_payload = self._extract_xml_payload(self._extract_string(raw_content)) if not xml_payload: return "" try: import xml.etree.ElementTree as ET root = ET.fromstring(xml_payload) refer = root.find(".//refermsg") if refer is None: return "" return ( refer.findtext("displayname") or refer.findtext("nickname") or "" ) except Exception: return "" async def _get_avatar_url(self, bot, room_wxid: str, wxid: str) -> str: if not wxid: return "" try: member_service = get_member_service() avatar = await member_service.get_chatroom_member_avatar(room_wxid, wxid) if not avatar: avatar = await member_service.get_member_avatar(wxid) if avatar: return avatar except Exception: pass # 兜底:直接拉群成员列表匹配(处理 userName 为 dict 的情况) try: members = await bot.get_chatroom_members(room_wxid) for member in members or []: user_name = self._extract_string(member.get("userName", "")) wxid_value = self._extract_string(member.get("wxid", "")) if wxid and (wxid == user_name or wxid == wxid_value): avatar = self._extract_string(member.get("bigHeadImgUrl", "")) if avatar: return avatar except Exception: pass # 兜底:调用 Hook API try: user_info = await bot.get_user_info_in_chatroom(room_wxid, wxid, max_retries=1) if user_info: avatar = self._extract_string(user_info.get("bigHeadImgUrl", "")) if avatar: return avatar except Exception: pass try: contact = await bot.get_group_member_contact(room_wxid, wxid) if contact: avatar = self._extract_string(contact.get("bigHeadImgUrl", "")) if avatar: return avatar except Exception: pass return "" async def _get_display_name(self, bot, room_wxid: str, wxid: str) -> str: if not wxid: return "" try: member_service = get_member_service() info = await member_service.get_chatroom_member_info(room_wxid, wxid) if info: group_nickname = info.get("group_nickname") or "" if group_nickname: return str(group_nickname) nickname = info.get("nickname") or "" if nickname: return str(nickname) info = await member_service.get_member_info(wxid) if info: nickname = info.get("nickname") or "" if nickname: return str(nickname) except Exception: pass # 兜底:调用 Hook API try: user_info = await bot.get_user_info_in_chatroom(room_wxid, wxid, max_retries=1) if user_info: nickname = self._extract_string(user_info.get("nickName", "")) if nickname: return nickname except Exception: pass try: contact = await bot.get_group_member_contact(room_wxid, wxid) if contact: nickname = self._extract_string(contact.get("nickName", "")) if nickname: return nickname except Exception: pass return wxid async def _get_session(self) -> aiohttp.ClientSession: if self._session and not self._session.closed: return self._session timeout = aiohttp.ClientTimeout(total=self.timeout) self._session = aiohttp.ClientSession(timeout=timeout) return self._session async def _close_session(self): if self._session and not self._session.closed: await self._session.close() self._session = None async def _post_json(self, path: str, payload: dict) -> Optional[dict]: url = f"{self.base_url}{path}" session = await self._get_session() try: async with session.post(url, json=payload) as resp: if resp.status != 200: try: body = await resp.text() except Exception: body = "" if body: body = body.strip().replace("\n", " ") if len(body) > 300: body = body[:300] + "..." logger.warning(f"[MemesAPI] POST {path} 失败: {resp.status}, body={body!r}") else: logger.warning(f"[MemesAPI] POST {path} 失败: {resp.status}") try: result = json.loads(body) if body else None if isinstance(result, dict): result["_status"] = resp.status return result except Exception: pass return None try: return await resp.json() except Exception as e: logger.warning(f"[MemesAPI] POST {path} 响应解析失败: {type(e).__name__}: {e!r}") return None except Exception as e: logger.warning( f"[MemesAPI] POST {path} 异常: {type(e).__name__}: {e!r}" ) return None async def _get_bytes(self, path: str) -> Tuple[Optional[bytes], str]: url = f"{self.base_url}{path}" session = await self._get_session() try: async with session.get(url) as resp: if resp.status != 200: logger.warning(f"[MemesAPI] GET {path} 失败: {resp.status}") return None, "" data = await resp.read() content_type = resp.headers.get("Content-Type", "") return data, content_type except Exception as e: logger.warning(f"[MemesAPI] GET {path} 异常: {e}") return None, "" async def _upload_image(self, image_url: str) -> Optional[str]: if not image_url: return None if self.upload_mode in {"auto", "url"}: result = await self._post_json("/image/upload", {"type": "url", "url": image_url}) if result and result.get("image_id"): return result.get("image_id") if self.upload_mode == "url" and not self.use_base64_fallback: return None if self.upload_mode == "auto" and not self.use_base64_fallback: return None # base64 / data 上传 data = await self._download_image_data(image_url) if not data: return None base64_data = base64.b64encode(data).decode() result = await self._post_json("/image/upload", {"type": "data", "data": base64_data}) if result and result.get("image_id"): return result.get("image_id") return None async def _download_image_data(self, image_url: str) -> Optional[bytes]: try: session = await self._get_session() async with session.get(image_url) as resp: if resp.status != 200: logger.warning( f"[MemesAPI] 下载头像失败: {resp.status}, url={image_url}" ) return None return await resp.read() except Exception as e: logger.warning( f"[MemesAPI] 下载头像异常: {type(e).__name__}: {e!r}, url={image_url}" ) return None async def _generate_meme( self, meme_key: str, images: List[Dict[str, str]], texts: List[str], options: Dict[str, object], ) -> Optional[str]: payload = { "images": images or [], "texts": texts, "options": options or {}, } result = await self._post_json(f"/memes/{meme_key}", payload) if result and result.get("image_id"): return result.get("image_id") if result and result.get("code") == 551: data = result.get("data", {}) if isinstance(result, dict) else {} min_t = data.get("min") max_t = data.get("max") try: min_t = int(min_t) max_t = int(max_t) except Exception: min_t = None max_t = None if min_t is not None and max_t is not None: adjusted = list(texts or []) if max_t == 0: adjusted = [] else: if len(adjusted) > max_t: adjusted = adjusted[:max_t] if len(adjusted) < min_t: adjusted = adjusted + [""] * (min_t - len(adjusted)) if adjusted != list(texts or []): logger.debug( f"[MemesAPI] 文本数量不匹配,自动调整: {len(texts)} -> {len(adjusted)}" ) payload["texts"] = adjusted retry = await self._post_json(f"/memes/{meme_key}", payload) if retry and retry.get("image_id"): return retry.get("image_id") if result is not None: logger.debug(f"[MemesAPI] 生成表情失败: key={meme_key}, resp={result}") return None def _detect_ext(self, data: bytes, content_type: str) -> str: """根据响应头与文件头检测扩展名,尽量保留动图""" ct = (content_type or "").lower() if "gif" in ct: return ".gif" if "jpeg" in ct or "jpg" in ct: return ".jpg" if "webp" in ct: return ".webp" if "png" in ct: return ".png" if data: if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"): return ".gif" if data.startswith(b"\x89PNG\r\n\x1a\n"): return ".png" if data.startswith(b"\xff\xd8\xff"): return ".jpg" if data.startswith(b"RIFF") and b"WEBP" in data[8:16]: return ".webp" return ".png" async def _download_meme_file(self, image_id: str) -> Optional[Path]: data, content_type = await self._get_bytes(f"/image/{image_id}") if not data: return None ext = self._detect_ext(data, content_type) filename = f"meme_{image_id[:8]}_{uuid.uuid4().hex[:6]}{ext}" file_path = self.temp_dir / filename try: file_path.write_bytes(data) return file_path except Exception: return None def _maybe_optimize_gif(self, file_path: Path) -> Tuple[Path, List[Path]]: if not self.gif_optimize: return file_path, [] if file_path.suffix.lower() != ".gif": return file_path, [] if self.gif_force_image_max_kb <= 0: return file_path, [] try: size_kb = int(file_path.stat().st_size / 1024) except Exception: size_kb = 0 if size_kb and size_kb <= self.gif_force_image_max_kb: return file_path, [] try: from PIL import Image, ImageSequence except Exception as e: logger.warning( f"[MemesAPI] 未安装 Pillow,跳过 GIF 优化: {type(e).__name__}: {e!r}" ) return file_path, [] try: im = Image.open(file_path) except Exception: return file_path, [] if im.format != "GIF": return file_path, [] frames = [] durations = [] try: for frame in ImageSequence.Iterator(im): f = frame.convert("RGBA") if self.gif_optimize_max_dim > 0: max_dim = max(f.size) if max_dim > self.gif_optimize_max_dim: scale = self.gif_optimize_max_dim / max_dim new_size = ( max(1, int(f.size[0] * scale)), max(1, int(f.size[1] * scale)), ) f = f.resize(new_size, Image.LANCZOS) frames.append(f) durations.append(frame.info.get("duration", 40)) except Exception: return file_path, [] if not frames: return file_path, [] candidates: List[Path] = [] for step in (1, 2, 3): use_frames = frames[::step] use_durations = durations[::step] if not use_frames: continue out_path = self.temp_dir / f"meme_opt_{uuid.uuid4().hex[:8]}.gif" try: use_frames[0].save( out_path, save_all=True, append_images=use_frames[1:], duration=use_durations, loop=0, optimize=True, disposal=2, ) candidates.append(out_path) out_kb = int(out_path.stat().st_size / 1024) if out_kb and out_kb <= self.gif_force_image_max_kb: logger.debug( f"[MemesAPI] GIF 优化成功: {size_kb}KB -> {out_kb}KB" ) return out_path, candidates except Exception: try: out_path.unlink(missing_ok=True) except Exception: pass # 选体积最小的优化结果(如果有) best = None best_kb = size_kb or 0 for path in candidates: try: out_kb = int(path.stat().st_size / 1024) except Exception: continue if out_kb and (best is None or out_kb < best_kb): best = path best_kb = out_kb if best: logger.debug( f"[MemesAPI] GIF 优化未达目标,使用较小版本: {size_kb}KB -> {best_kb}KB" ) return best, candidates return file_path, candidates def _select_target_wxids( self, image_count: int, sender_wxid: str, at_wxids: List[str], quote_wxid: str, ) -> List[str]: if at_wxids: if image_count <= 0: return [] if image_count == 1: return [at_wxids[0]] if image_count == 2: if len(at_wxids) >= 2: return [at_wxids[0], at_wxids[1]] return [sender_wxid, at_wxids[0]] return [] if quote_wxid: if image_count == 1: return [quote_wxid] if image_count == 2: return [sender_wxid, quote_wxid] return [] return [] async def _process_message(self, bot, message: dict, content: str) -> bool: if not self.enabled: return True if not message.get("IsGroup", False): return True from_wxid = message.get("FromWxid", "") sender_wxid = message.get("SenderWxid", "") if bot and sender_wxid and bot.wxid and sender_wxid == bot.wxid: return True if not content: return True logger.debug(f"[MemesAPI] raw_content={content!r}") await self._maybe_reload() normalized = self._normalize_text(content) if not normalized: return True logger.debug(f"[MemesAPI] normalized={normalized!r}") meme, params = self._match_meme(normalized) if not meme: return True logger.debug(f"[MemesAPI] matched meme={meme.key}, params={params}") at_wxids = await self._extract_at_wxids(message, from_wxid, content) if at_wxids: at_wxids = [w for w in at_wxids if w and w != "notify@all"] at_wxids = at_wxids[: self.max_at_users] quote_sender = self._extract_quote_sender(message) quote_display = self._extract_quote_displayname(message) quote_wxid = await self._resolve_quote_wxid(from_wxid, quote_sender, quote_display) if meme.image_count > 2: # 不支持的表情类型静默跳过 return True target_wxids = self._select_target_wxids(meme.image_count, sender_wxid, at_wxids, quote_wxid) if meme.image_count == 0: target_wxids = [] if meme.image_count > 0 and not target_wxids: # 未满足触发条件时静默跳过 return True avatar_urls: List[str] = [] display_names: List[str] = [] for wxid in target_wxids: avatar = await self._get_avatar_url(bot, from_wxid, wxid) if not avatar: await bot.send_text(from_wxid, self.avatar_missing_tip) return False avatar_urls.append(avatar) display_names.append(await self._get_display_name(bot, from_wxid, wxid)) replacements = { "name": display_names[0] if display_names else "", "name1": display_names[0] if len(display_names) > 0 else "", "name2": display_names[1] if len(display_names) > 1 else "", } image_items: List[Dict[str, str]] = [] for idx, avatar_url in enumerate(avatar_urls): image_id = await self._upload_image(avatar_url) if not image_id: await bot.send_text(from_wxid, self.api_failed_tip) return False name = display_names[idx] if idx < len(display_names) else "" if not name: name = f"image{idx+1}" image_items.append({"name": name, "id": image_id}) if meme.image_count == 0: image_items = [] options, text_tokens = self._build_options_and_texts(meme, params, replacements) logger.debug(f"[MemesAPI] options={options}, text_tokens={text_tokens}") texts = meme.default_texts if text_tokens: joined = " ".join(text_tokens).strip() if "|" in joined: texts = [t.strip() for t in joined.split("|") if t.strip()] else: texts = [joined] texts = [self._render_value(t, replacements) for t in texts] logger.debug(f"[MemesAPI] final texts={texts}") meme_image_id = await self._generate_meme( meme.key, image_items, texts, options, ) if not meme_image_id: await bot.send_text(from_wxid, self.api_failed_tip) return False file_path = await self._download_meme_file(meme_image_id) if not file_path: await bot.send_text(from_wxid, self.api_failed_tip) return False send_path = file_path cleanup_paths = [file_path] if file_path.suffix.lower() == ".gif" and self.gif_send_mode == "image": send_path, extra_paths = self._maybe_optimize_gif(file_path) for p in extra_paths: if p not in cleanup_paths: cleanup_paths.append(p) try: if send_path.suffix.lower() == ".gif": if self.gif_send_mode == "file": await bot.send_file(from_wxid, str(send_path)) elif self.gif_send_mode == "image": await bot.send_image(from_wxid, str(send_path)) else: size_kb = 0 try: size_kb = int(send_path.stat().st_size / 1024) except Exception: size_kb = 0 if size_kb and size_kb > self.gif_inline_max_kb: await bot.send_file(from_wxid, str(send_path)) else: await bot.send_image(from_wxid, str(send_path)) else: await bot.send_image(from_wxid, str(send_path)) finally: for path in cleanup_paths: try: path.unlink(missing_ok=True) except Exception: pass return False @on_text_message(priority=80) async def handle_text_message(self, bot, message: dict): content = message.get("Content", "") or "" return await self._process_message(bot, message, content) @on_quote_message(priority=80) async def handle_quote_message(self, bot, message: dict): content = self._extract_quote_text(message) return await self._process_message(bot, message, content) __all__ = ["MemesAPI"]