diff --git a/plugins/fanhao_search/config.toml b/plugins/fanhao_search/config.toml index 73258b0..837f25a 100644 --- a/plugins/fanhao_search/config.toml +++ b/plugins/fanhao_search/config.toml @@ -3,15 +3,26 @@ enable = true command = ["番号", "番号查询"] command-format = """ 🔎番号查询指令: -番号 番号编号 例如:番号 FNS-109 +番号 番号编号 例如:番号 SSIS-406 """ -# MongoDB 连接 -mongo_uri = "mongodb+srv://readonly:cS9NSuiJ1ebHnUL0@cluster0.8mosa.mongodb.net/sehuatang?retryWrites=true&w=majority" -db = "sehuatang" -collections = ["hd_chinese_subtitles", "asia_codeless_originate", "asia_mosaic_originate","4k_video"] +# JavBus API 服务地址(必填) +# 示例:api_prefix = "http://127.0.0.1:8922" +api_prefix = "" -# 尝试匹配的字段名(存在其一即可) -search_fields = ["number"] +# 可选鉴权 token,对应请求头 j-auth-token +auth_token = "" + +# 请求超时(秒) +request_timeout_seconds = 15 + +# 详情路由模板(兼容不同 javbus-api 版本) +movie_detail_paths = ["/api/v1/movies/{id}", "/api/movies/{id}"] + +# 是否返回磁力(默认关闭,避免群里长文本刷屏) +allow_download_link = false + +# 是否发送封面预览图(默认关闭) +allow_preview_cover = false diff --git a/plugins/fanhao_search/main.py b/plugins/fanhao_search/main.py index 19e88ba..99f8b83 100644 --- a/plugins/fanhao_search/main.py +++ b/plugins/fanhao_search/main.py @@ -1,17 +1,26 @@ -from typing import Dict, Any, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import re +from urllib.parse import quote + +import aiohttp from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost -from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager +from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from wechat_ipad import WechatAPIClient class FanhaoSearchPlugin(MessagePluginInterface): - """番号查询插件""" + """番号查询插件(JavBus API 在线版)。 + + 重构说明: + 1. 参考 koishi-plugin-javbus 的“在线 API 查询”模式,替换原 Mongo 本地库检索; + 2. 与你当前 ABOT 插件接口对齐:群权限、积分扣费、消息发送、日志结构保持一致; + 3. 为兼容不同 javbus-api 版本,内置 /api/v1 与 /api 两套路由回退,避免单点失效。 + """ FEATURE_KEY = "FANHAO" FEATURE_DESCRIPTION = "🔎 番号查询功能 [番号]" @@ -22,15 +31,15 @@ class FanhaoSearchPlugin(MessagePluginInterface): @property def version(self) -> str: - return "1.0.0" + return "2.0.0" @property def description(self) -> str: - return "提供基于MongoDB的番号搜索功能,支持两个集合查询" + return "基于JavBus API的番号查询,支持详情/磁力/封面返回。" @property def author(self) -> str: - return "liu.wei" + return "ABOT Team" @property def command_prefix(self) -> Optional[str]: @@ -50,237 +59,294 @@ class FanhaoSearchPlugin(MessagePluginInterface): def __init__(self): super().__init__() + # 注册功能权限,确保群管理页可配置开启/关闭“番号查询”。 self.feature = self.register_feature() - self.mongo_client = None - self.mongo_db = None + self.enable = True + self._commands: List[str] = ["番号", "番号查询"] + self.command_format = "番号 番号编号 例如:番号 SSIS-406" + # API 基础配置:api_prefix 为必填,未配置时会显式报错提醒。 + self.api_prefix = "" + self.api_token = "" + self.request_timeout_seconds = 15 + # 参考 koishi-plugin-javbus 的两个关键开关。 + self.allow_download_link = False + self.allow_preview_cover = False + # 详情接口路由模板:默认优先 v1,再回退 v0 风格。 + self.movie_detail_paths: List[str] = ["/api/v1/movies/{id}", "/api/movies/{id}"] def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件配置。""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") - self.event_system = context.get("event_system") cfg = self._config.get("FanhaoSearch", {}) - self._commands = cfg.get("command", ["番号"]) # 例:"番号 FNS-109" - self.command_format = cfg.get("command-format", "番号 番号编号 例如:番号 FNS-109") - self.enable = cfg.get("enable", True) + self._commands = cfg.get("command", ["番号", "番号查询"]) + self.command_format = cfg.get("command-format", "番号 番号编号 例如:番号 SSIS-406") + self.enable = bool(cfg.get("enable", True)) - self.mongo_uri = cfg.get( - "mongo_uri", - "mongodb+srv://readonly:cS9NSuiJ1ebHnUL0@cluster0.8mosa.mongodb.net/sehuatang?retryWrites=true&w=majority", - ) - self.mongo_db_name = cfg.get("db", "sehuatang") - self.collections = cfg.get( - "collections", ["hd_chinese_subtitles", "asia_codeless_originate", "asia_mosaic_originate", "4k_video"] - ) - self.search_fields = cfg.get("search_fields", ["number"]) # 可能的字段名 + # 用户部署的 javbus-api 前缀,例如:https://xxx.com 或 http://127.0.0.1:8922 + self.api_prefix = str(cfg.get("api_prefix", "") or "").strip().rstrip("/") + # javbus-api 的可选鉴权 Token,对应请求头 j-auth-token。 + self.api_token = str(cfg.get("auth_token", "") or "").strip() + self.request_timeout_seconds = max(5, int(cfg.get("request_timeout_seconds", 15) or 15)) + self.allow_download_link = bool(cfg.get("allow_download_link", False)) + self.allow_preview_cover = bool(cfg.get("allow_preview_cover", False)) - # 延迟连接,在首次查询时连接,避免初始化阻塞 - self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") + # 允许外部覆盖路径列表,便于你后续升级 API 版本时无代码切换。 + configured_paths = cfg.get("movie_detail_paths", ["/api/v1/movies/{id}", "/api/movies/{id}"]) + normalized_paths: List[str] = [] + for path in configured_paths: + text = str(path or "").strip() + if text and "{id}" in text: + normalized_paths.append(text) + self.movie_detail_paths = normalized_paths or ["/api/v1/movies/{id}", "/api/movies/{id}"] + + self.LOG.info( + f"[{self.name}] 初始化完成: enable={self.enable}, commands={self._commands}, " + f"api_prefix={'已配置' if self.api_prefix else '未配置'}, allow_download_link={self.allow_download_link}, " + f"allow_preview_cover={self.allow_preview_cover}, timeout={self.request_timeout_seconds}s" + ) return True def start(self) -> bool: - self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING + self.LOG.debug(f"[{self.name}] 插件已启动") return True def stop(self) -> bool: - try: - if self.mongo_client: - self.mongo_client.close() - except Exception: - pass - self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED + self.LOG.info(f"[{self.name}] 插件已停止") return True def can_process(self, message: Dict[str, Any]) -> bool: + """只处理配置命令开头的文本消息。""" if not self.enable: return False - content = str(message.get("content", "")).strip() + content = str(message.get("content", "") or "").strip() + if not content: + return False command = content.split(" ")[0] return command in self._commands - def _redact_mongo_uri(self, uri: str) -> str: - try: - # 隐藏用户名密码,仅保留协议和主机段 - return re.sub(r"(mongodb\+srv://)(.*?@)", r"\\1***@", uri) - except Exception: - return "***" + @staticmethod + def _normalize_code(text: str) -> str: + """标准化番号。 - def _ensure_mongo(self): - if self.mongo_client: - return - from pymongo import MongoClient + 规则说明: + 1. 自动去空格并转大写; + 2. 处理“字母+数字无横杠”场景,如 ipzz108 -> IPZZ-108; + 3. 保留用户已有横杠结构,避免误改合法编号。 + """ + code = (text or "").strip().upper() + return re.sub(r"([A-Z])(\d)", r"\1-\2", code) - self.LOG.info( - f"[{self.name}] 准备连接MongoDB uri={self._redact_mongo_uri(self.mongo_uri)} db={self.mongo_db_name}" - ) - try: - self.mongo_client = MongoClient(self.mongo_uri, serverSelectionTimeoutMS=5000) - # 探活 - self.mongo_client.admin.command("ping") - self.mongo_db = self.mongo_client.get_database(self.mongo_db_name) - # 打印可见的数据库 - try: - dbs = self.mongo_client.list_database_names() - self.LOG.info(f"[{self.name}] 可见数据库={dbs}") - except Exception as e: - self.LOG.warning(f"[{self.name}] 获取数据库列表失败: {e}") - try: - colls = self.mongo_db.list_collection_names() - except Exception as e: - colls = [] - self.LOG.warning(f"[{self.name}] 获取集合列表失败: {e}") - self.LOG.info(f"[{self.name}] MongoDB连接成功,集合={colls}") - # 对配置集合进行计数探测 - for cname in self.collections: + def _build_request_headers(self) -> Dict[str, str]: + """构建请求头。 + + 说明: + - 如果配置了 auth_token,则透传到 j-auth-token,兼容 ovnrain/javbus-api 的鉴权模式; + - 固定 UA 便于服务端日志追踪与风控排查。 + """ + headers = { + "User-Agent": "ABOT-FanhaoSearch/2.0", + "Accept": "application/json", + } + if self.api_token: + headers["j-auth-token"] = self.api_token + return headers + + async def _fetch_movie_detail(self, movie_id: str) -> Optional[Dict[str, Any]]: + """请求影片详情,支持多路由回退。""" + if not self.api_prefix: + return None + + timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds) + headers = self._build_request_headers() + safe_id = quote(movie_id, safe="") + last_error = "" + + async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: + for path in self.movie_detail_paths: + url = f"{self.api_prefix}{path.format(id=safe_id)}" try: - c = self.mongo_db.get_collection(cname) - # 尝试快速计数(可能返回估算值,但足够判断可见性) - cnt = c.estimated_document_count() - self.LOG.info(f"[{self.name}] 集合探测 {self.mongo_db_name}.{cname} 文档数≈{cnt}") + async with session.get(url) as resp: + # 非 2xx 先记录,继续尝试下一条路径。 + if resp.status < 200 or resp.status >= 300: + body_preview = (await resp.text())[:160] + self.LOG.warning( + f"[{self.name}] 详情接口返回异常: status={resp.status}, url={url}, body={body_preview}" + ) + continue + data = await resp.json(content_type=None) + # 同时兼容两种返回形态: + # 1) 直接是详情对象;2) 包在 data/result 字段内。 + if isinstance(data, dict): + if isinstance(data.get("data"), dict): + return data.get("data") + if isinstance(data.get("result"), dict): + return data.get("result") + if data.get("id") or data.get("title"): + return data except Exception as e: - self.LOG.warning(f"[{self.name}] 集合探测失败 {self.mongo_db_name}.{cname}: {e}") - except Exception as e: - self.LOG.error(f"[{self.name}] MongoDB连接失败: {e}") - raise - - def _normalize_code(self, text: str) -> str: - # 1. 基础清理:判空、去首尾空格、转大写 - text = (text or "").strip().upper() - # 用户输入 处理后 说明 - # IPzz108 IPZZ-108 目标场景:自动补全了横杠 - # ipzz108 IPZZ-108 全小写自动转大写并补全 - # IPZZ-108 IPZZ-108 已经有横杠,正则不匹配(字母后是横杠不是数字),保持原样 - # ipzz-108 IPZZ-108 小写带横杠,仅转大写,保持原样 - # ipzz108 IPZZ-108 去除前后空格 - # A1 A-1 极短代码也能兼容 - - # 2. 核心逻辑:使用正则查找“字母后面紧跟数字”的情况,并在中间插入横杠 - # r'([A-Z])(\d)' 含义:捕获组1是任意大写字母,捕获组2是任意数字 - # r'\1-\2' 含义:将匹配到的内容替换为“组1 + 横杠 + 组2” - return re.sub(r'([A-Z])(\d)', r'\1-\2', text) - - def _build_queries(self, code_upper: str) -> List[Dict[str, Any]]: - # 精确匹配查询 - or_exact = [{field: code_upper} for field in self.search_fields] - exact_query = {"$or": or_exact} - # 回退:大小写不敏感的等值匹配 - or_regex = [ - {field: {"$regex": f"^{re.escape(code_upper)}$", "$options": "i"}} - for field in self.search_fields - ] - regex_query = {"$or": or_regex} - return [exact_query, regex_query] - - def _query_collections(self, code_upper: str) -> Optional[Dict[str, Any]]: - self._ensure_mongo() - queries = self._build_queries(code_upper) - self.LOG.debug(f"[{self.name}] 标准化番号={code_upper},查询字段={self.search_fields}") - for idx, query in enumerate(queries): - self.LOG.debug(f"[{self.name}] 执行查询({idx + 1}/{len(queries)}): {query}") - for coll_name in self.collections: - try: - coll = self.mongo_db.get_collection(coll_name) - self.LOG.debug(f"[{self.name}] 在集合 {coll_name} 查找…") - doc = coll.find_one(query) - if doc: - doc["_collection"] = coll_name - self.LOG.info(f"[{self.name}] 命中集合 {coll_name}") - return doc - else: - self.LOG.debug(f"[{self.name}] 集合 {coll_name} 未命中") - except Exception as e: - self.LOG.error(f"[{self.name}] 查询集合 {coll_name} 出错: {e}") + last_error = str(e) + self.LOG.warning(f"[{self.name}] 请求详情失败: url={url}, error={e}") continue + + if last_error: + self.LOG.error(f"[{self.name}] 所有详情路由均失败: movie_id={movie_id}, last_error={last_error}") return None - def _format_result(self, doc: Dict[str, Any]) -> str: - def pick(d: Dict[str, Any], keys: List[str]) -> str: - for k in keys: - v = d.get(k) - if v: - return str(v) + @staticmethod + def _extract_star_names(movie: Dict[str, Any]) -> str: + """提取女优名称,兼容数组/字符串两种结构。""" + stars = movie.get("stars") + if isinstance(stars, list): + names = [] + for star in stars: + if isinstance(star, dict) and star.get("name"): + names.append(str(star.get("name"))) + elif isinstance(star, str) and star.strip(): + names.append(star.strip()) + if names: + return ", ".join(names) + if isinstance(movie.get("actress"), str): + return str(movie.get("actress")).strip() + return "" + + @staticmethod + def _pick_best_magnet(magnets: Any) -> str: + """从磁力列表中挑选一条优先磁力链接。 + + 选择策略(参考 koishi-plugin-javbus 并增强健壮性): + 1. 优先有字幕的磁力; + 2. 再按 numberSize / size 字段的“可解析数值”降序; + 3. 返回首个有效 link/hash。 + """ + if not isinstance(magnets, list): return "" - code = pick(doc, self.search_fields) - title = pick(doc, ["title", "name", "标题"]) or "未提供" - actress = pick(doc, ["actress", "actors", "performer", "女优", "演员"]) # 可为空 - date_val = pick(doc, ["date", "publish_date", "发行日"]) # 例如:2025-09-10 - post_time = pick(doc, ["post_time"]) # 例如:2025-09-10 10:42:04 + candidates: List[Dict[str, Any]] = [m for m in magnets if isinstance(m, dict)] + if not candidates: + return "" - magnet = pick(doc, ["magnet"]) # 磁力 - magnet_115 = pick(doc, ["magnet_115"]) # 115专用磁力 + def parse_size_num(item: Dict[str, Any]) -> float: + raw = item.get("numberSize") or item.get("size") or 0 + # numberSize 可能是数字,也可能是字符串,这里统一兜底到 float。 + try: + return float(raw) + except Exception: + # 尝试从诸如 "2.3 GB" 文本中提取首个数字。 + matched = re.search(r"(\d+(?:\.\d+)?)", str(raw)) + return float(matched.group(1)) if matched else 0.0 - lines = [ - f"✅ 查询成功:{code}", - f"标题:{title}", - ] - if actress: - lines.append(f"演员:{actress}") + with_subtitle = [m for m in candidates if bool(m.get("hasSubtitle"))] + pool = with_subtitle if with_subtitle else candidates + pool_sorted = sorted(pool, key=parse_size_num, reverse=True) - if date_val and post_time: - lines.append(f"日期:{date_val}(发帖:{post_time})") - elif date_val: - lines.append(f"日期:{date_val}") - elif post_time: - lines.append(f"发帖:{post_time}") + for item in pool_sorted: + link = str(item.get("link") or "").strip() + if link: + return link + # 某些 API 只回 hash,不回完整 magnet。 + hash_value = str(item.get("hash") or "").strip() + if hash_value: + return f"magnet:?xt=urn:btih:{hash_value}" + return "" - if magnet: - lines.append(f"磁力:{magnet}") - if magnet_115: - lines.append(f"115磁力:{magnet_115}") + def _format_result_text(self, movie: Dict[str, Any], normalized_code: str) -> str: + """拼装文本消息内容。""" + movie_id = str(movie.get("id") or normalized_code or "未知番号").strip() + title = str(movie.get("title") or "未提供标题").strip() + date_value = str(movie.get("date") or movie.get("publishDate") or "").strip() + publisher_name = "" + publisher = movie.get("publisher") + if isinstance(publisher, dict): + publisher_name = str(publisher.get("name") or "").strip() + elif isinstance(publisher, str): + publisher_name = publisher.strip() + star_text = self._extract_star_names(movie) + + lines = [f"✅ 查询成功:{movie_id}", f"标题:{title}"] + if date_value: + lines.append(f"发行日期:{date_value}") + if star_text: + lines.append(f"女优:{star_text}") + if publisher_name: + lines.append(f"发行商:{publisher_name}") + + if self.allow_download_link: + magnet = self._pick_best_magnet(movie.get("magnets")) + if magnet: + lines.append(f"磁力:{magnet}") return "\n".join(lines) @plugin_stats_decorator(plugin_name="番号查询") @plugin_points_cost(10, "番号查询消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - content = str(message.get("content", "")).strip() - self.LOG.debug(f"插件执行: {self.name}:{content}") + """执行番号查询主流程。""" + content = str(message.get("content", "") or "").strip() + self.LOG.debug(f"[{self.name}] 插件执行: content={content}") - command = content.split(" ")[0] sender = message.get("sender") - roomid = message.get("roomid", "") + room_id = str(message.get("roomid", "") or "").strip() gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") + target = room_id if room_id else sender - # 参数检查 - parts = content.split(" ") - if len(parts) < 2: - await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", - sender) - return False, "命令格式错误" - - if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: + # 群开关关闭时不处理,避免越权。 + if room_id and gbm and gbm.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" - raw_code = content[len(command):].strip() - user_code = self._normalize_code(raw_code) - self.LOG.info( - f"[{self.name}] 收到查询 command={command} raw='{raw_code}' normalized='{user_code}' db={self.mongo_db_name} collections={self.collections}" - ) - if not user_code: - await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", - sender) + parts = content.split(" ", 1) + if len(parts) < 2 or not parts[1].strip(): + await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender) return False, "命令格式错误" + if not self.api_prefix: + await bot.send_text_message( + target, + "❌番号插件未配置 api_prefix,请在 plugins/fanhao_search/config.toml 中补充后重试。", + sender, + ) + return False, "api_prefix未配置" + + raw_code = parts[1].strip() + normalized_code = self._normalize_code(raw_code) + if not normalized_code: + await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender) + return False, "命令格式错误" + + self.LOG.info( + f"[{self.name}] 收到查询: raw={raw_code}, normalized={normalized_code}, api_prefix={self.api_prefix}" + ) + try: - doc = self._query_collections(user_code) - target = roomid if roomid else sender - if not doc: - self.LOG.warning(f"[{self.name}] 未找到番号:{user_code}") - await bot.send_text_message(target, f"未找到番号:{user_code}", sender) + movie = await self._fetch_movie_detail(normalized_code) + if not movie: + await bot.send_text_message(target, f"未找到番号:{normalized_code}", sender) return False, "未找到" - text = self._format_result(doc) - await bot.send_text_message(target, text, sender) + # 先发文本,确保最差场景也能看到核心信息。 + result_text = self._format_result_text(movie, normalized_code) + await bot.send_text_message(target, result_text, sender) + + # 按配置决定是否额外发送封面预览(可能含敏感内容,默认关闭)。 + if self.allow_preview_cover: + cover_url = str(movie.get("img") or movie.get("cover") or "").strip() + if cover_url: + try: + await bot.send_image_message(target, cover_url) + except Exception as image_error: + self.LOG.warning(f"[{self.name}] 封面发送失败: code={normalized_code}, error={image_error}") + return True, "查询成功" except Exception as e: - self.LOG.exception(f"处理番号查询出错: {e}") + self.LOG.exception(f"[{self.name}] 处理番号查询出错: {e}") return False, f"处理出错: {e}" def get_plugin(): + """返回插件实例。""" return FanhaoSearchPlugin()