Files

1148 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MemesAPI 插件 - meme-generator-rs API 接入
功能:
- 群聊中通过 @ / 引用消息触发表情
- 头像优先使用 MemberSync 数据库,必要时调用 Hook API 兜底
- 表情配置文件化plugins/MemesAPI/memes/*.toml
"""
import asyncio
import base64
import json
import re
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import aiohttp
import tomllib
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message, on_quote_message
from utils.member_info_service import get_member_service
@dataclass
class MemeConfig:
key: str
remark: str
triggers: List[str]
image_count: int
default_texts: List[str]
default_options: Dict[str, object]
option_types: Dict[str, str]
additional: Dict[str, List[str]]
file_path: Path
class MemesAPI(PluginBase):
"""meme-generator-rs API 表情插件"""
description = "meme-generator-rs 表情包插件"
author = "Assistant"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
self.base_url = "http://127.0.0.1:2233"
self.timeout = 20
self.use_base64_fallback = True
self.upload_mode = "auto"
self.enabled = True
self.reload_interval = 60
self.max_at_users = 2
self.send_gif_as_file = True
self.gif_send_mode = "file"
self.gif_inline_max_kb = 2048
self.gif_optimize = True
self.gif_force_image_max_kb = 2048
self.gif_optimize_max_dim = 360
self.avatar_missing_tip = "头像获取失败,请稍后重试"
self.api_failed_tip = "表情服务不可用,请稍后再试"
self.memes_dir = Path(__file__).parent / "memes"
self.temp_dir = Path(__file__).parent / "temp"
self._memes: List[MemeConfig] = []
self._trigger_index: List[Tuple[str, MemeConfig]] = []
self._last_load_ts = 0.0
self._session: Optional[aiohttp.ClientSession] = None
async def async_init(self):
"""异步初始化"""
try:
config_path = Path(__file__).parent / "config.toml"
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
api_config = self.config.get("api", {})
behavior_config = self.config.get("behavior", {})
reply_config = self.config.get("reply", {})
self.base_url = api_config.get("base_url", self.base_url).rstrip("/")
self.timeout = api_config.get("timeout", self.timeout)
self.use_base64_fallback = api_config.get("use_base64_fallback", True)
self.upload_mode = str(api_config.get("upload_mode", "auto")).strip().lower()
if self.upload_mode not in {"auto", "url", "data"}:
self.upload_mode = "auto"
self.enabled = behavior_config.get("enabled", True)
self.reload_interval = behavior_config.get("reload_interval", 60)
self.max_at_users = behavior_config.get("max_at_users", 2)
self.send_gif_as_file = behavior_config.get("send_gif_as_file", True)
self.gif_send_mode = str(behavior_config.get("gif_send_mode", "")).strip().lower()
self.gif_inline_max_kb = int(behavior_config.get("gif_inline_max_kb", 2048))
self.gif_optimize = bool(behavior_config.get("gif_optimize", True))
self.gif_force_image_max_kb = int(behavior_config.get("gif_force_image_max_kb", 2048))
self.gif_optimize_max_dim = int(behavior_config.get("gif_optimize_max_dim", 360))
if not self.gif_send_mode:
self.gif_send_mode = "file" if self.send_gif_as_file else "image"
if self.gif_send_mode not in {"file", "image", "auto"}:
self.gif_send_mode = "file"
if self.gif_inline_max_kb < 0:
self.gif_inline_max_kb = 0
if self.gif_force_image_max_kb < 0:
self.gif_force_image_max_kb = 0
if self.gif_optimize_max_dim < 0:
self.gif_optimize_max_dim = 0
self.avatar_missing_tip = reply_config.get("avatar_missing_tip", self.avatar_missing_tip)
self.api_failed_tip = reply_config.get("api_failed_tip", self.api_failed_tip)
self.memes_dir.mkdir(parents=True, exist_ok=True)
self.temp_dir.mkdir(parents=True, exist_ok=True)
self._ensure_example_meme()
await self._load_memes()
logger.success("[MemesAPI] 插件初始化完成")
except Exception as e:
logger.error(f"[MemesAPI] 初始化失败: {e}")
self.enabled = False
async def on_disable(self):
await super().on_disable()
await self._close_session()
def _ensure_example_meme(self):
"""确保至少有一个示例配置"""
try:
existing = list(self.memes_dir.glob("*.toml"))
if existing:
return
example_path = self.memes_dir / "petpet.toml"
if example_path.exists():
return
example_path.write_text(
"""key = "petpet"
remark = "摸头表情"
triggers = ["", "摸摸"]
image_count = 1
default_texts = []
""",
encoding="utf-8",
)
logger.info("[MemesAPI] 已生成示例表情配置: petpet.toml")
except Exception as e:
logger.warning(f"[MemesAPI] 生成示例配置失败: {e}")
async def _load_memes(self):
"""加载表情配置"""
memes: List[MemeConfig] = []
for file_path in self.memes_dir.glob("*.toml"):
try:
with open(file_path, "rb") as f:
data = tomllib.load(f)
key = str(data.get("key", "")).strip()
if not key:
logger.warning(f"[MemesAPI] 跳过无 key 配置: {file_path.name}")
continue
remark = str(data.get("remark", "")).strip()
triggers = data.get("triggers", [])
if isinstance(triggers, str):
triggers = [triggers]
triggers = [str(t).strip() for t in triggers if str(t).strip()]
image_count = int(data.get("image_count", 1))
default_texts = data.get("default_texts", [])
if isinstance(default_texts, str):
default_texts = [default_texts]
default_texts = [str(t) for t in default_texts]
default_options, option_types = self._parse_default_options(
data.get("default_options", {}),
data.get("option_types", {}),
)
additional = self._parse_additional(
data.get("Additional", data.get("additional", {}))
)
memes.append(
MemeConfig(
key=key,
remark=remark,
triggers=triggers,
image_count=image_count,
default_texts=default_texts,
default_options=default_options,
option_types=option_types,
additional=additional,
file_path=file_path,
)
)
except Exception as e:
logger.error(f"[MemesAPI] 解析配置失败: {file_path.name} -> {e}")
self._memes = memes
self._build_trigger_index()
self._last_load_ts = time.time()
logger.info(f"[MemesAPI] 已加载表情配置: {len(self._memes)}")
def _build_trigger_index(self):
index: List[Tuple[str, MemeConfig]] = []
for meme in self._memes:
for trigger in meme.triggers:
if trigger:
index.append((trigger.lower(), meme))
index.sort(key=lambda item: len(item[0]), reverse=True)
self._trigger_index = index
def _parse_default_options(
self, default_options: dict, option_types: dict
) -> Tuple[Dict[str, object], Dict[str, str]]:
options: Dict[str, object] = {}
types: Dict[str, str] = {}
if isinstance(option_types, dict):
for key, value in option_types.items():
key_name = str(key).strip()
if not key_name:
continue
types[key_name] = str(value).strip().lower()
if isinstance(default_options, dict):
for key, value in default_options.items():
key_name = str(key).strip()
if not key_name:
continue
# 支持 key 形如 circle[boolean](注意 toml 需用引号)
match = re.match(r"^(?P<name>.+?)\[(?P<type>.+?)\]$", key_name)
if match:
name = match.group("name").strip()
opt_type = match.group("type").strip().lower()
if name:
types.setdefault(name, opt_type)
options[name] = value
else:
options[key_name] = value
return options, types
def _parse_additional(self, additional: dict) -> Dict[str, List[str]]:
result: Dict[str, List[str]] = {}
if not isinstance(additional, dict):
return result
for key, value in additional.items():
name = str(key).strip()
if not name:
continue
if isinstance(value, list):
result[name] = [str(v).strip() for v in value if str(v).strip()]
else:
result[name] = [str(value).strip()]
return result
def _coerce_option_value(self, value: str, opt_type: str):
opt_type = (opt_type or "").lower()
if opt_type == "boolean":
val = value.strip().lower()
if val in {"1", "true", "yes", "y", "on"}:
return True
if val in {"0", "false", "no", "n", "off"}:
return False
return None
if opt_type == "integer":
try:
return int(value)
except Exception:
return None
if opt_type == "float":
try:
return float(value)
except Exception:
return None
if opt_type == "string":
return value
return value
def _render_value(self, value, replacements: Optional[Dict[str, str]] = None):
if not isinstance(value, str):
return value
from datetime import datetime
today = datetime.now().strftime("%Y-%m-%d")
result = value
result = result.replace("{date}", today)
result = result.replace("${date}", today)
if replacements:
for key, val in replacements.items():
token = "{" + key + "}"
token2 = "${" + key + "}"
result = result.replace(token, str(val))
result = result.replace(token2, str(val))
return result
def _build_options_and_texts(
self, meme: MemeConfig, params: List[str], replacements: Optional[Dict[str, str]] = None
) -> Tuple[Dict[str, object], List[str]]:
options: Dict[str, object] = dict(meme.default_options or {})
if not params:
return {k: self._render_value(v, replacements) for k, v in options.items()}, []
# 1) 处理形如 name=value / name:value
used_tokens: set = set()
explicit_keys: set = set()
for token in params:
if "=" in token or ":" in token:
sep = "=" if "=" in token else ":"
name, raw_val = token.split(sep, 1)
name = name.strip()
raw_val = raw_val.strip()
if not name:
continue
opt_type = meme.option_types.get(name, "")
coerced = self._coerce_option_value(raw_val, opt_type) if opt_type else raw_val
if coerced is not None:
options[name] = coerced
explicit_keys.add(name)
used_tokens.add(token)
# 2) 处理 Additional 映射(主要用于 boolean
for token in params:
token = token.strip()
if not token:
continue
for opt_name, values in (meme.additional or {}).items():
if not values:
continue
if token in values:
opt_type = meme.option_types.get(opt_name, "")
if opt_type == "boolean":
if token == values[0]:
options[opt_name] = True
elif len(values) > 1 and token == values[1]:
options[opt_name] = False
explicit_keys.add(opt_name)
else:
options[opt_name] = token
explicit_keys.add(opt_name)
used_tokens.add(token)
text_tokens = [
t for t in params
if t not in used_tokens and "=" not in t and ":" not in t
]
# 3) 仅一个 string 选项时:允许直接传参(如 “戒导 2024-01-01”
string_options = [k for k, t in (meme.option_types or {}).items() if t == "string"]
if len(string_options) == 1:
string_key = string_options[0]
if string_key not in explicit_keys:
if text_tokens:
options[string_key] = " ".join(text_tokens)
text_tokens = []
return {k: self._render_value(v, replacements) for k, v in options.items()}, text_tokens
async def _maybe_reload(self):
if self.reload_interval <= 0:
return
if time.time() - self._last_load_ts >= self.reload_interval:
await self._load_memes()
def _normalize_text(self, text: str) -> str:
if not text:
return ""
text = text.strip()
if not text:
return ""
text = re.sub(r"@[^\s\u2005]+", "", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def _match_meme(self, text: str) -> Tuple[Optional[MemeConfig], List[str]]:
if not text:
return None, []
tokens = text.split(" ")
if not tokens:
return None, []
first = tokens[0].lower()
for trigger, meme in self._trigger_index:
if trigger and first == trigger:
return meme, tokens[1:]
return None, []
def _extract_string(self, value) -> str:
if isinstance(value, dict):
return value.get("String") or value.get("string") or value.get("value") or ""
if value is None:
return ""
return str(value)
def _extract_msg_source(self, raw: dict) -> str:
for key in ("msgSource", "msgsource", "MsgSource", "msg_source"):
if key in raw:
return self._extract_string(raw.get(key))
return ""
def _extract_xml_payload(self, content: str) -> str:
if not content or not isinstance(content, str):
return ""
text = content.strip()
if not text:
return ""
if text.startswith("<?xml") or text.startswith("<msg"):
return text
xml_start = text.find("<?xml")
if xml_start == -1:
xml_start = text.find("<msg")
if xml_start >= 0:
return text[xml_start:]
return ""
def _parse_at_wxids_from_msgsource(self, msg_source: str) -> List[str]:
if not msg_source:
return []
msg_source = msg_source.strip()
if not msg_source:
return []
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(msg_source)
at_text = root.findtext(".//atuserlist") or root.findtext(".//atUserList") or ""
if not at_text:
return []
parts = [p.strip() for p in re.split(r"[,;]", at_text) if p.strip()]
return [p for p in parts if p and p != "notify@all"]
except Exception:
return []
def _parse_at_names_from_text(self, text: str) -> List[str]:
if not text:
return []
names = re.findall(r"@([^\s\u2005]+)", text)
result = []
for name in names:
name = name.strip()
if name and name not in result:
result.append(name)
return result
async def _resolve_at_names(self, room_wxid: str, at_names: List[str]) -> List[str]:
if not room_wxid or not at_names:
return []
try:
member_service = get_member_service()
members = await member_service.get_chatroom_members(room_wxid)
if not members:
return []
wxids: List[str] = []
for name in at_names:
matched = None
for member in members:
if name == member.get("group_nickname") or name == member.get("nickname"):
matched = member.get("wxid")
break
if matched and matched not in wxids:
wxids.append(matched)
return wxids
except Exception as e:
logger.debug(f"[MemesAPI] @昵称解析失败: {e}")
return []
async def _extract_at_wxids(self, message: dict, room_wxid: str, content: str) -> List[str]:
ats = message.get("Ats", [])
if isinstance(ats, list) and ats:
return [a for a in ats if a and a != "notify@all"]
raw = message.get("_raw", {}) or {}
msg_source = self._extract_msg_source(raw)
wxids = self._parse_at_wxids_from_msgsource(msg_source)
if wxids:
return wxids
# 兜底:用昵称匹配(可能有重名)
at_names = self._parse_at_names_from_text(content)
if at_names:
return await self._resolve_at_names(room_wxid, at_names)
return []
def _extract_quote_sender(self, message: dict) -> str:
sender = message.get("QuoteSender") or ""
if sender:
return sender
content = message.get("Content", "")
xml_payload = self._extract_xml_payload(content)
if not xml_payload:
raw = message.get("_raw", {}) or {}
raw_content = raw.get("content") or raw.get("Content")
xml_payload = self._extract_xml_payload(self._extract_string(raw_content))
if not xml_payload:
return ""
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_payload)
refer = root.find(".//refermsg")
if refer is None:
return ""
# 群引用优先使用 chatusr真实被引用的发送者
chatusr = refer.findtext("chatusr") or ""
if chatusr:
return chatusr
fromusr = (
refer.findtext("fromusr")
or refer.findtext("fromusername")
or refer.findtext("fromuser")
or refer.findtext("fromusrname")
or refer.findtext("username")
or ""
)
return fromusr
except Exception:
return ""
def _extract_quote_text(self, message: dict) -> str:
title = message.get("QuoteTitle") or ""
if title:
return title.strip()
content = message.get("Content", "")
xml_payload = self._extract_xml_payload(content)
if not xml_payload:
raw = message.get("_raw", {}) or {}
raw_content = raw.get("content") or raw.get("Content")
xml_payload = self._extract_xml_payload(self._extract_string(raw_content))
if not xml_payload:
return ""
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_payload)
title = root.findtext(".//appmsg/title") or root.findtext(".//title") or ""
return title.strip()
except Exception:
return ""
async def _resolve_quote_wxid(self, room_wxid: str, quote_sender: str, quote_display: str) -> str:
if quote_sender and not quote_sender.endswith("@chatroom"):
return quote_sender
if not room_wxid or not quote_display:
return ""
try:
member_service = get_member_service()
members = await member_service.get_chatroom_members(room_wxid)
for member in members or []:
if quote_display == member.get("group_nickname") or quote_display == member.get("nickname"):
return member.get("wxid") or ""
except Exception:
pass
return ""
def _extract_quote_displayname(self, message: dict) -> str:
content = message.get("Content", "")
xml_payload = self._extract_xml_payload(content)
if not xml_payload:
raw = message.get("_raw", {}) or {}
raw_content = raw.get("content") or raw.get("Content")
xml_payload = self._extract_xml_payload(self._extract_string(raw_content))
if not xml_payload:
return ""
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_payload)
refer = root.find(".//refermsg")
if refer is None:
return ""
return (
refer.findtext("displayname")
or refer.findtext("nickname")
or ""
)
except Exception:
return ""
async def _get_avatar_url(self, bot, room_wxid: str, wxid: str) -> str:
if not wxid:
return ""
try:
member_service = get_member_service()
avatar = await member_service.get_chatroom_member_avatar(room_wxid, wxid)
if not avatar:
avatar = await member_service.get_member_avatar(wxid)
if avatar:
return avatar
except Exception:
pass
# 兜底:直接拉群成员列表匹配(处理 userName 为 dict 的情况)
try:
members = await bot.get_chatroom_members(room_wxid)
for member in members or []:
user_name = self._extract_string(member.get("userName", ""))
wxid_value = self._extract_string(member.get("wxid", ""))
if wxid and (wxid == user_name or wxid == wxid_value):
avatar = self._extract_string(member.get("bigHeadImgUrl", ""))
if avatar:
return avatar
except Exception:
pass
# 兜底:调用 Hook API
try:
user_info = await bot.get_user_info_in_chatroom(room_wxid, wxid, max_retries=1)
if user_info:
avatar = self._extract_string(user_info.get("bigHeadImgUrl", ""))
if avatar:
return avatar
except Exception:
pass
try:
contact = await bot.get_group_member_contact(room_wxid, wxid)
if contact:
avatar = self._extract_string(contact.get("bigHeadImgUrl", ""))
if avatar:
return avatar
except Exception:
pass
return ""
async def _get_display_name(self, bot, room_wxid: str, wxid: str) -> str:
if not wxid:
return ""
try:
member_service = get_member_service()
info = await member_service.get_chatroom_member_info(room_wxid, wxid)
if info:
group_nickname = info.get("group_nickname") or ""
if group_nickname:
return str(group_nickname)
nickname = info.get("nickname") or ""
if nickname:
return str(nickname)
info = await member_service.get_member_info(wxid)
if info:
nickname = info.get("nickname") or ""
if nickname:
return str(nickname)
except Exception:
pass
# 兜底:调用 Hook API
try:
user_info = await bot.get_user_info_in_chatroom(room_wxid, wxid, max_retries=1)
if user_info:
nickname = self._extract_string(user_info.get("nickName", ""))
if nickname:
return nickname
except Exception:
pass
try:
contact = await bot.get_group_member_contact(room_wxid, wxid)
if contact:
nickname = self._extract_string(contact.get("nickName", ""))
if nickname:
return nickname
except Exception:
pass
return wxid
async def _get_session(self) -> aiohttp.ClientSession:
if self._session and not self._session.closed:
return self._session
timeout = aiohttp.ClientTimeout(total=self.timeout)
self._session = aiohttp.ClientSession(timeout=timeout)
return self._session
async def _close_session(self):
if self._session and not self._session.closed:
await self._session.close()
self._session = None
async def _post_json(self, path: str, payload: dict) -> Optional[dict]:
url = f"{self.base_url}{path}"
session = await self._get_session()
try:
async with session.post(url, json=payload) as resp:
if resp.status != 200:
try:
body = await resp.text()
except Exception:
body = ""
if body:
body = body.strip().replace("\n", " ")
if len(body) > 300:
body = body[:300] + "..."
logger.warning(f"[MemesAPI] POST {path} 失败: {resp.status}, body={body!r}")
else:
logger.warning(f"[MemesAPI] POST {path} 失败: {resp.status}")
try:
result = json.loads(body) if body else None
if isinstance(result, dict):
result["_status"] = resp.status
return result
except Exception:
pass
return None
try:
return await resp.json()
except Exception as e:
logger.warning(f"[MemesAPI] POST {path} 响应解析失败: {type(e).__name__}: {e!r}")
return None
except Exception as e:
logger.warning(
f"[MemesAPI] POST {path} 异常: {type(e).__name__}: {e!r}"
)
return None
async def _get_bytes(self, path: str) -> Tuple[Optional[bytes], str]:
url = f"{self.base_url}{path}"
session = await self._get_session()
try:
async with session.get(url) as resp:
if resp.status != 200:
logger.warning(f"[MemesAPI] GET {path} 失败: {resp.status}")
return None, ""
data = await resp.read()
content_type = resp.headers.get("Content-Type", "")
return data, content_type
except Exception as e:
logger.warning(f"[MemesAPI] GET {path} 异常: {e}")
return None, ""
async def _upload_image(self, image_url: str) -> Optional[str]:
if not image_url:
return None
if self.upload_mode in {"auto", "url"}:
result = await self._post_json("/image/upload", {"type": "url", "url": image_url})
if result and result.get("image_id"):
return result.get("image_id")
if self.upload_mode == "url" and not self.use_base64_fallback:
return None
if self.upload_mode == "auto" and not self.use_base64_fallback:
return None
# base64 / data 上传
data = await self._download_image_data(image_url)
if not data:
return None
base64_data = base64.b64encode(data).decode()
result = await self._post_json("/image/upload", {"type": "data", "data": base64_data})
if result and result.get("image_id"):
return result.get("image_id")
return None
async def _download_image_data(self, image_url: str) -> Optional[bytes]:
try:
session = await self._get_session()
async with session.get(image_url) as resp:
if resp.status != 200:
logger.warning(
f"[MemesAPI] 下载头像失败: {resp.status}, url={image_url}"
)
return None
return await resp.read()
except Exception as e:
logger.warning(
f"[MemesAPI] 下载头像异常: {type(e).__name__}: {e!r}, url={image_url}"
)
return None
async def _generate_meme(
self,
meme_key: str,
images: List[Dict[str, str]],
texts: List[str],
options: Dict[str, object],
) -> Optional[str]:
payload = {
"images": images or [],
"texts": texts,
"options": options or {},
}
result = await self._post_json(f"/memes/{meme_key}", payload)
if result and result.get("image_id"):
return result.get("image_id")
if result and result.get("code") == 551:
data = result.get("data", {}) if isinstance(result, dict) else {}
min_t = data.get("min")
max_t = data.get("max")
try:
min_t = int(min_t)
max_t = int(max_t)
except Exception:
min_t = None
max_t = None
if min_t is not None and max_t is not None:
adjusted = list(texts or [])
if max_t == 0:
adjusted = []
else:
if len(adjusted) > max_t:
adjusted = adjusted[:max_t]
if len(adjusted) < min_t:
adjusted = adjusted + [""] * (min_t - len(adjusted))
if adjusted != list(texts or []):
logger.debug(
f"[MemesAPI] 文本数量不匹配,自动调整: {len(texts)} -> {len(adjusted)}"
)
payload["texts"] = adjusted
retry = await self._post_json(f"/memes/{meme_key}", payload)
if retry and retry.get("image_id"):
return retry.get("image_id")
if result is not None:
logger.debug(f"[MemesAPI] 生成表情失败: key={meme_key}, resp={result}")
return None
def _detect_ext(self, data: bytes, content_type: str) -> str:
"""根据响应头与文件头检测扩展名,尽量保留动图"""
ct = (content_type or "").lower()
if "gif" in ct:
return ".gif"
if "jpeg" in ct or "jpg" in ct:
return ".jpg"
if "webp" in ct:
return ".webp"
if "png" in ct:
return ".png"
if data:
if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
return ".gif"
if data.startswith(b"\x89PNG\r\n\x1a\n"):
return ".png"
if data.startswith(b"\xff\xd8\xff"):
return ".jpg"
if data.startswith(b"RIFF") and b"WEBP" in data[8:16]:
return ".webp"
return ".png"
async def _download_meme_file(self, image_id: str) -> Optional[Path]:
data, content_type = await self._get_bytes(f"/image/{image_id}")
if not data:
return None
ext = self._detect_ext(data, content_type)
filename = f"meme_{image_id[:8]}_{uuid.uuid4().hex[:6]}{ext}"
file_path = self.temp_dir / filename
try:
file_path.write_bytes(data)
return file_path
except Exception:
return None
def _maybe_optimize_gif(self, file_path: Path) -> Tuple[Path, List[Path]]:
if not self.gif_optimize:
return file_path, []
if file_path.suffix.lower() != ".gif":
return file_path, []
if self.gif_force_image_max_kb <= 0:
return file_path, []
try:
size_kb = int(file_path.stat().st_size / 1024)
except Exception:
size_kb = 0
if size_kb and size_kb <= self.gif_force_image_max_kb:
return file_path, []
try:
from PIL import Image, ImageSequence
except Exception as e:
logger.warning(
f"[MemesAPI] 未安装 Pillow跳过 GIF 优化: {type(e).__name__}: {e!r}"
)
return file_path, []
try:
im = Image.open(file_path)
except Exception:
return file_path, []
if im.format != "GIF":
return file_path, []
frames = []
durations = []
try:
for frame in ImageSequence.Iterator(im):
f = frame.convert("RGBA")
if self.gif_optimize_max_dim > 0:
max_dim = max(f.size)
if max_dim > self.gif_optimize_max_dim:
scale = self.gif_optimize_max_dim / max_dim
new_size = (
max(1, int(f.size[0] * scale)),
max(1, int(f.size[1] * scale)),
)
f = f.resize(new_size, Image.LANCZOS)
frames.append(f)
durations.append(frame.info.get("duration", 40))
except Exception:
return file_path, []
if not frames:
return file_path, []
candidates: List[Path] = []
for step in (1, 2, 3):
use_frames = frames[::step]
use_durations = durations[::step]
if not use_frames:
continue
out_path = self.temp_dir / f"meme_opt_{uuid.uuid4().hex[:8]}.gif"
try:
use_frames[0].save(
out_path,
save_all=True,
append_images=use_frames[1:],
duration=use_durations,
loop=0,
optimize=True,
disposal=2,
)
candidates.append(out_path)
out_kb = int(out_path.stat().st_size / 1024)
if out_kb and out_kb <= self.gif_force_image_max_kb:
logger.debug(
f"[MemesAPI] GIF 优化成功: {size_kb}KB -> {out_kb}KB"
)
return out_path, candidates
except Exception:
try:
out_path.unlink(missing_ok=True)
except Exception:
pass
# 选体积最小的优化结果(如果有)
best = None
best_kb = size_kb or 0
for path in candidates:
try:
out_kb = int(path.stat().st_size / 1024)
except Exception:
continue
if out_kb and (best is None or out_kb < best_kb):
best = path
best_kb = out_kb
if best:
logger.debug(
f"[MemesAPI] GIF 优化未达目标,使用较小版本: {size_kb}KB -> {best_kb}KB"
)
return best, candidates
return file_path, candidates
def _select_target_wxids(
self,
image_count: int,
sender_wxid: str,
at_wxids: List[str],
quote_wxid: str,
) -> List[str]:
if at_wxids:
if image_count <= 0:
return []
if image_count == 1:
return [at_wxids[0]]
if image_count == 2:
if len(at_wxids) >= 2:
return [at_wxids[0], at_wxids[1]]
return [sender_wxid, at_wxids[0]]
return []
if quote_wxid:
if image_count == 1:
return [quote_wxid]
if image_count == 2:
return [sender_wxid, quote_wxid]
return []
return []
async def _process_message(self, bot, message: dict, content: str) -> bool:
if not self.enabled:
return True
if not message.get("IsGroup", False):
return True
from_wxid = message.get("FromWxid", "")
sender_wxid = message.get("SenderWxid", "")
if bot and sender_wxid and bot.wxid and sender_wxid == bot.wxid:
return True
if not content:
return True
logger.debug(f"[MemesAPI] raw_content={content!r}")
await self._maybe_reload()
normalized = self._normalize_text(content)
if not normalized:
return True
logger.debug(f"[MemesAPI] normalized={normalized!r}")
meme, params = self._match_meme(normalized)
if not meme:
return True
logger.debug(f"[MemesAPI] matched meme={meme.key}, params={params}")
at_wxids = await self._extract_at_wxids(message, from_wxid, content)
if at_wxids:
at_wxids = [w for w in at_wxids if w and w != "notify@all"]
at_wxids = at_wxids[: self.max_at_users]
quote_sender = self._extract_quote_sender(message)
quote_display = self._extract_quote_displayname(message)
quote_wxid = await self._resolve_quote_wxid(from_wxid, quote_sender, quote_display)
if meme.image_count > 2:
# 不支持的表情类型静默跳过
return True
target_wxids = self._select_target_wxids(meme.image_count, sender_wxid, at_wxids, quote_wxid)
if meme.image_count == 0:
target_wxids = []
if meme.image_count > 0 and not target_wxids:
# 未满足触发条件时静默跳过
return True
avatar_urls: List[str] = []
display_names: List[str] = []
for wxid in target_wxids:
avatar = await self._get_avatar_url(bot, from_wxid, wxid)
if not avatar:
await bot.send_text(from_wxid, self.avatar_missing_tip)
return False
avatar_urls.append(avatar)
display_names.append(await self._get_display_name(bot, from_wxid, wxid))
replacements = {
"name": display_names[0] if display_names else "",
"name1": display_names[0] if len(display_names) > 0 else "",
"name2": display_names[1] if len(display_names) > 1 else "",
}
image_items: List[Dict[str, str]] = []
for idx, avatar_url in enumerate(avatar_urls):
image_id = await self._upload_image(avatar_url)
if not image_id:
await bot.send_text(from_wxid, self.api_failed_tip)
return False
name = display_names[idx] if idx < len(display_names) else ""
if not name:
name = f"image{idx+1}"
image_items.append({"name": name, "id": image_id})
if meme.image_count == 0:
image_items = []
options, text_tokens = self._build_options_and_texts(meme, params, replacements)
logger.debug(f"[MemesAPI] options={options}, text_tokens={text_tokens}")
texts = meme.default_texts
if text_tokens:
joined = " ".join(text_tokens).strip()
if "|" in joined:
texts = [t.strip() for t in joined.split("|") if t.strip()]
else:
texts = [joined]
texts = [self._render_value(t, replacements) for t in texts]
logger.debug(f"[MemesAPI] final texts={texts}")
meme_image_id = await self._generate_meme(
meme.key,
image_items,
texts,
options,
)
if not meme_image_id:
await bot.send_text(from_wxid, self.api_failed_tip)
return False
file_path = await self._download_meme_file(meme_image_id)
if not file_path:
await bot.send_text(from_wxid, self.api_failed_tip)
return False
send_path = file_path
cleanup_paths = [file_path]
if file_path.suffix.lower() == ".gif" and self.gif_send_mode == "image":
send_path, extra_paths = self._maybe_optimize_gif(file_path)
for p in extra_paths:
if p not in cleanup_paths:
cleanup_paths.append(p)
try:
if send_path.suffix.lower() == ".gif":
if self.gif_send_mode == "file":
await bot.send_file(from_wxid, str(send_path))
elif self.gif_send_mode == "image":
await bot.send_image(from_wxid, str(send_path))
else:
size_kb = 0
try:
size_kb = int(send_path.stat().st_size / 1024)
except Exception:
size_kb = 0
if size_kb and size_kb > self.gif_inline_max_kb:
await bot.send_file(from_wxid, str(send_path))
else:
await bot.send_image(from_wxid, str(send_path))
else:
await bot.send_image(from_wxid, str(send_path))
finally:
for path in cleanup_paths:
try:
path.unlink(missing_ok=True)
except Exception:
pass
return False
@on_text_message(priority=80)
async def handle_text_message(self, bot, message: dict):
content = message.get("Content", "") or ""
return await self._process_message(bot, message, content)
@on_quote_message(priority=80)
async def handle_quote_message(self, bot, message: dict):
content = self._extract_quote_text(message)
return await self._process_message(bot, message, content)
__all__ = ["MemesAPI"]