from typing import Any, Dict, List, Optional, Tuple import re from urllib.parse import quote import aiohttp from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from wechat_ipad import WechatAPIClient class FanhaoSearchPlugin(MessagePluginInterface): """番号查询插件(JavBus API 在线版)。 重构说明: 1. 参考 koishi-plugin-javbus 的“在线 API 查询”模式,替换原 Mongo 本地库检索; 2. 与你当前 ABOT 插件接口对齐:群权限、积分扣费、消息发送、日志结构保持一致; 3. 为兼容不同 javbus-api 版本,内置 /api/v1 与 /api 两套路由回退,避免单点失效。 """ FEATURE_KEY = "FANHAO" FEATURE_DESCRIPTION = "🔎 番号查询功能 [番号]" @property def name(self) -> str: return "番号查询" @property def version(self) -> str: return "2.0.0" @property def description(self) -> str: return "基于JavBus API的番号查询,支持详情/磁力/封面返回。" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() # 注册功能权限,确保群管理页可配置开启/关闭“番号查询”。 self.feature = self.register_feature() self.enable = True self._commands: List[str] = ["番号", "番号查询"] self.command_format = "番号 番号编号 例如:番号 SSIS-406" # API 基础配置:api_prefix 为必填,未配置时会显式报错提醒。 self.api_prefix = "" self.api_token = "" self.request_timeout_seconds = 15 # 参考 koishi-plugin-javbus 的两个关键开关。 self.allow_download_link = False self.allow_preview_cover = False # 详情接口路由模板:默认优先 v1,再回退 v0 风格。 self.movie_detail_paths: List[str] = ["/api/v1/movies/{id}", "/api/movies/{id}"] def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件配置。""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") self.event_system = context.get("event_system") cfg = self._config.get("FanhaoSearch", {}) self._commands = cfg.get("command", ["番号", "番号查询"]) self.command_format = cfg.get("command-format", "番号 番号编号 例如:番号 SSIS-406") self.enable = bool(cfg.get("enable", True)) # 用户部署的 javbus-api 前缀,例如:https://xxx.com 或 http://127.0.0.1:8922 self.api_prefix = str(cfg.get("api_prefix", "") or "").strip().rstrip("/") # javbus-api 的可选鉴权 Token,对应请求头 j-auth-token。 self.api_token = str(cfg.get("auth_token", "") or "").strip() self.request_timeout_seconds = max(5, int(cfg.get("request_timeout_seconds", 15) or 15)) self.allow_download_link = bool(cfg.get("allow_download_link", False)) self.allow_preview_cover = bool(cfg.get("allow_preview_cover", False)) # 允许外部覆盖路径列表,便于你后续升级 API 版本时无代码切换。 configured_paths = cfg.get("movie_detail_paths", ["/api/v1/movies/{id}", "/api/movies/{id}"]) normalized_paths: List[str] = [] for path in configured_paths: text = str(path or "").strip() if text and "{id}" in text: normalized_paths.append(text) self.movie_detail_paths = normalized_paths or ["/api/v1/movies/{id}", "/api/movies/{id}"] self.LOG.info( f"[{self.name}] 初始化完成: enable={self.enable}, commands={self._commands}, " f"api_prefix={'已配置' if self.api_prefix else '未配置'}, allow_download_link={self.allow_download_link}, " f"allow_preview_cover={self.allow_preview_cover}, timeout={self.request_timeout_seconds}s" ) return True def start(self) -> bool: self.status = PluginStatus.RUNNING self.LOG.debug(f"[{self.name}] 插件已启动") return True def stop(self) -> bool: self.status = PluginStatus.STOPPED self.LOG.info(f"[{self.name}] 插件已停止") return True def can_process(self, message: Dict[str, Any]) -> bool: """只处理配置命令开头的文本消息。""" if not self.enable: return False content = str(message.get("content", "") or "").strip() if not content: return False command = content.split(" ")[0] return command in self._commands @staticmethod def _normalize_code(text: str) -> str: """标准化番号。 规则说明: 1. 自动去空格并转大写; 2. 处理“字母+数字无横杠”场景,如 ipzz108 -> IPZZ-108; 3. 保留用户已有横杠结构,避免误改合法编号。 """ code = (text or "").strip().upper() return re.sub(r"([A-Z])(\d)", r"\1-\2", code) def _build_request_headers(self) -> Dict[str, str]: """构建请求头。 说明: - 如果配置了 auth_token,则透传到 j-auth-token,兼容 ovnrain/javbus-api 的鉴权模式; - 固定 UA 便于服务端日志追踪与风控排查。 """ headers = { "User-Agent": "ABOT-FanhaoSearch/2.0", "Accept": "application/json", } if self.api_token: headers["j-auth-token"] = self.api_token return headers async def _fetch_movie_detail(self, movie_id: str) -> Optional[Dict[str, Any]]: """请求影片详情,支持多路由回退。""" if not self.api_prefix: return None timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds) headers = self._build_request_headers() safe_id = quote(movie_id, safe="") last_error = "" async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: for path in self.movie_detail_paths: url = f"{self.api_prefix}{path.format(id=safe_id)}" try: async with session.get(url) as resp: # 非 2xx 先记录,继续尝试下一条路径。 if resp.status < 200 or resp.status >= 300: body_preview = (await resp.text())[:160] self.LOG.warning( f"[{self.name}] 详情接口返回异常: status={resp.status}, url={url}, body={body_preview}" ) continue data = await resp.json(content_type=None) # 同时兼容两种返回形态: # 1) 直接是详情对象;2) 包在 data/result 字段内。 if isinstance(data, dict): if isinstance(data.get("data"), dict): return data.get("data") if isinstance(data.get("result"), dict): return data.get("result") if data.get("id") or data.get("title"): return data except Exception as e: last_error = str(e) self.LOG.warning(f"[{self.name}] 请求详情失败: url={url}, error={e}") continue if last_error: self.LOG.error(f"[{self.name}] 所有详情路由均失败: movie_id={movie_id}, last_error={last_error}") return None @staticmethod def _extract_star_names(movie: Dict[str, Any]) -> str: """提取女优名称,兼容数组/字符串两种结构。""" stars = movie.get("stars") if isinstance(stars, list): names = [] for star in stars: if isinstance(star, dict) and star.get("name"): names.append(str(star.get("name"))) elif isinstance(star, str) and star.strip(): names.append(star.strip()) if names: return ", ".join(names) if isinstance(movie.get("actress"), str): return str(movie.get("actress")).strip() return "" @staticmethod def _pick_best_magnet(magnets: Any) -> str: """从磁力列表中挑选一条优先磁力链接。 选择策略(参考 koishi-plugin-javbus 并增强健壮性): 1. 优先有字幕的磁力; 2. 再按 numberSize / size 字段的“可解析数值”降序; 3. 返回首个有效 link/hash。 """ if not isinstance(magnets, list): return "" candidates: List[Dict[str, Any]] = [m for m in magnets if isinstance(m, dict)] if not candidates: return "" def parse_size_num(item: Dict[str, Any]) -> float: raw = item.get("numberSize") or item.get("size") or 0 # numberSize 可能是数字,也可能是字符串,这里统一兜底到 float。 try: return float(raw) except Exception: # 尝试从诸如 "2.3 GB" 文本中提取首个数字。 matched = re.search(r"(\d+(?:\.\d+)?)", str(raw)) return float(matched.group(1)) if matched else 0.0 with_subtitle = [m for m in candidates if bool(m.get("hasSubtitle"))] pool = with_subtitle if with_subtitle else candidates pool_sorted = sorted(pool, key=parse_size_num, reverse=True) for item in pool_sorted: link = str(item.get("link") or "").strip() if link: return link # 某些 API 只回 hash,不回完整 magnet。 hash_value = str(item.get("hash") or "").strip() if hash_value: return f"magnet:?xt=urn:btih:{hash_value}" return "" def _format_result_text(self, movie: Dict[str, Any], normalized_code: str) -> str: """拼装文本消息内容。""" movie_id = str(movie.get("id") or normalized_code or "未知番号").strip() title = str(movie.get("title") or "未提供标题").strip() date_value = str(movie.get("date") or movie.get("publishDate") or "").strip() publisher_name = "" publisher = movie.get("publisher") if isinstance(publisher, dict): publisher_name = str(publisher.get("name") or "").strip() elif isinstance(publisher, str): publisher_name = publisher.strip() star_text = self._extract_star_names(movie) lines = [f"✅ 查询成功:{movie_id}", f"标题:{title}"] if date_value: lines.append(f"发行日期:{date_value}") if star_text: lines.append(f"女优:{star_text}") if publisher_name: lines.append(f"发行商:{publisher_name}") if self.allow_download_link: magnet = self._pick_best_magnet(movie.get("magnets")) if magnet: lines.append(f"磁力:{magnet}") return "\n".join(lines) @plugin_stats_decorator(plugin_name="番号查询") @plugin_points_cost(10, "番号查询消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """执行番号查询主流程。""" content = str(message.get("content", "") or "").strip() self.LOG.debug(f"[{self.name}] 插件执行: content={content}") sender = message.get("sender") room_id = str(message.get("roomid", "") or "").strip() gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") target = room_id if room_id else sender # 群开关关闭时不处理,避免越权。 if room_id and gbm and gbm.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" parts = content.split(" ", 1) if len(parts) < 2 or not parts[1].strip(): await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender) return False, "命令格式错误" if not self.api_prefix: await bot.send_text_message( target, "❌番号插件未配置 api_prefix,请在 plugins/fanhao_search/config.toml 中补充后重试。", sender, ) return False, "api_prefix未配置" raw_code = parts[1].strip() normalized_code = self._normalize_code(raw_code) if not normalized_code: await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender) return False, "命令格式错误" self.LOG.info( f"[{self.name}] 收到查询: raw={raw_code}, normalized={normalized_code}, api_prefix={self.api_prefix}" ) try: movie = await self._fetch_movie_detail(normalized_code) if not movie: await bot.send_text_message(target, f"未找到番号:{normalized_code}", sender) return False, "未找到" # 先发文本,确保最差场景也能看到核心信息。 result_text = self._format_result_text(movie, normalized_code) await bot.send_text_message(target, result_text, sender) # 按配置决定是否额外发送封面预览(可能含敏感内容,默认关闭)。 if self.allow_preview_cover: cover_url = str(movie.get("img") or movie.get("cover") or "").strip() if cover_url: try: await bot.send_image_message(target, cover_url) except Exception as image_error: self.LOG.warning(f"[{self.name}] 封面发送失败: code={normalized_code}, error={image_error}") return True, "查询成功" except Exception as e: self.LOG.exception(f"[{self.name}] 处理番号查询出错: {e}") return False, f"处理出错: {e}" def get_plugin(): """返回插件实例。""" return FanhaoSearchPlugin()