from typing import Any, Dict, List, Optional, Tuple import re from urllib.parse import urljoin import aiohttp from bs4 import BeautifulSoup from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from wechat_ipad import WechatAPIClient class FanhaoSearchPlugin(MessagePluginInterface): """番号查询插件(JavBus 直连解析版)。 设计说明: 1. 不再依赖外部 javbus-api 服务,直接访问 JavBus 网页并解析; 2. 实现思路参考 ovnrain/javbus-api 的解析逻辑(详情页 + AJAX 磁力表); 3. 保留你现有项目的群权限、积分扣费、命令格式与日志风格。 """ FEATURE_KEY = "FANHAO" FEATURE_DESCRIPTION = "🔎 番号查询功能 [番号]" @property def name(self) -> str: return "番号查询" @property def version(self) -> str: return "3.0.0" @property def description(self) -> str: return "直连JavBus站点解析影片详情与磁力信息。" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() # 注册群权限开关,便于后台按群启停。 self.feature = self.register_feature() self.enable = True self._commands: List[str] = ["番号", "番号查询"] self.command_format = "番号 番号编号 例如:番号 SSIS-406" # 站点基础配置:默认使用官方域名,可在配置中改成镜像域名。 self.javbus_base_url = "https://www.javbus.com" self.request_timeout_seconds = 15 self.http_proxy = "" # 功能开关:默认只返回文本详情,磁力和封面均默认关闭。 self.allow_download_link = False self.allow_preview_cover = False self.prefer_subtitle_magnet = True def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件配置。""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") self.event_system = context.get("event_system") cfg = self._config.get("FanhaoSearch", {}) self.enable = bool(cfg.get("enable", True)) self._commands = cfg.get("command", ["番号", "番号查询"]) self.command_format = cfg.get("command-format", "番号 番号编号 例如:番号 SSIS-406") self.javbus_base_url = str(cfg.get("javbus_base_url", "https://www.javbus.com") or "").strip().rstrip("/") self.request_timeout_seconds = max(5, int(cfg.get("request_timeout_seconds", 15) or 15)) self.http_proxy = str(cfg.get("http_proxy", "") or "").strip() self.allow_download_link = bool(cfg.get("allow_download_link", False)) self.allow_preview_cover = bool(cfg.get("allow_preview_cover", False)) self.prefer_subtitle_magnet = bool(cfg.get("prefer_subtitle_magnet", True)) self.LOG.info( f"[{self.name}] 初始化完成: enable={self.enable}, commands={self._commands}, " f"base_url={self.javbus_base_url}, allow_download_link={self.allow_download_link}, " f"allow_preview_cover={self.allow_preview_cover}, timeout={self.request_timeout_seconds}s" ) return True def start(self) -> bool: self.status = PluginStatus.RUNNING self.LOG.debug(f"[{self.name}] 插件已启动") return True def stop(self) -> bool: self.status = PluginStatus.STOPPED self.LOG.info(f"[{self.name}] 插件已停止") return True def can_process(self, message: Dict[str, Any]) -> bool: """仅处理配置命令开头的文本消息。""" if not self.enable: return False content = str(message.get("content", "") or "").strip() if not content: return False command = content.split(" ")[0] return command in self._commands @staticmethod def _normalize_code(text: str) -> str: """标准化番号。 处理策略: 1. 去掉前后空白并转大写; 2. 自动补横杠:如 ipzz108 -> IPZZ-108; 3. 保留用户已有横杠结构,避免误改。 """ code = (text or "").strip().upper() return re.sub(r"([A-Z])(\d)", r"\1-\2", code) def _build_headers(self, referer: str = "") -> Dict[str, str]: """构建请求头,模拟常见浏览器访问。""" headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", } if referer: headers["Referer"] = referer return headers def _build_proxy(self) -> Optional[str]: """获取代理配置(未配置返回 None)。""" return self.http_proxy if self.http_proxy else None async def _http_get_text( self, url: str, *, referer: str = "", params: Optional[Dict[str, Any]] = None, ) -> str: """执行 HTTP GET 并返回文本。""" timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds) proxy = self._build_proxy() async with aiohttp.ClientSession(timeout=timeout, headers=self._build_headers(referer)) as session: async with session.get(url, params=params, proxy=proxy) as resp: body = await resp.text(errors="ignore") if resp.status < 200 or resp.status >= 300: raise RuntimeError(f"请求失败 status={resp.status}, url={url}, body={body[:180]}") return body async def _http_get_bytes(self, url: str, *, referer: str = "") -> bytes: """执行 HTTP GET 并返回二进制,用于下载封面图。""" timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds) proxy = self._build_proxy() async with aiohttp.ClientSession(timeout=timeout, headers=self._build_headers(referer)) as session: async with session.get(url, proxy=proxy) as resp: if resp.status < 200 or resp.status >= 300: raise RuntimeError(f"图片下载失败 status={resp.status}, url={url}") return await resp.read() @staticmethod def _extract_plain_value_from_info_p(info_p) -> str: """从详情页

节点中提取纯文本值。 规则: 1. 先去掉 header 标签文本; 2. 再压缩多余空白; 3. 保留正文语义,避免把“識別碼:”一起带出来。 """ if not info_p: return "" node = BeautifulSoup(str(info_p), "html.parser") header = node.find(class_="header") if header: header.extract() text = node.get_text(" ", strip=True) return re.sub(r"\s+", " ", text).strip() @staticmethod def _find_info_p_by_header(info_ps: List[Any], header_keywords: List[str]): """根据 header 关键字定位详情信息行。""" for p in info_ps: header = p.find(class_="header") if not header: continue header_text = header.get_text(strip=True) if any(keyword in header_text for keyword in header_keywords): return p return None def _extract_movie_detail_from_html(self, html: str, movie_id: str) -> Optional[Dict[str, Any]]: """解析详情页 HTML,提取核心字段。""" soup = BeautifulSoup(html, "html.parser") # 若被站点重定向到登录页或拦截页,通常页面包含 login 关键词或缺失详情容器。 page_text = soup.get_text(" ", strip=True).lower() if "login" in page_text and "javbus" in page_text and not soup.select_one(".container .movie"): return None movie_container = soup.select_one(".container .movie") if not movie_container: return None # 标题与封面图。 title = (soup.select_one(".container h3") or soup.select_one("h3")) title_text = title.get_text(strip=True) if title else "" img_node = soup.select_one(".bigImage img") img_url = str(img_node.get("src") or "").strip() if img_node else "" if img_url and img_url.startswith("/"): img_url = urljoin(self.javbus_base_url, img_url) # 基本信息块:统一从 p.header 结构里定位。 info_ps = movie_container.select(".info p") date_p = self._find_info_p_by_header(info_ps, ["發行日期", "发行日期"]) length_p = self._find_info_p_by_header(info_ps, ["長度", "长度"]) publisher_p = self._find_info_p_by_header(info_ps, ["發行商", "发行商"]) star_p = self._find_info_p_by_header(info_ps, ["演員", "演员"]) date_value = self._extract_plain_value_from_info_p(date_p) length_value = self._extract_plain_value_from_info_p(length_p) length_minutes = None if length_value: match = re.search(r"(\d+)", length_value) if match: length_minutes = int(match.group(1)) publisher_name = "" if publisher_p: publisher_link = publisher_p.find("a") if publisher_link: publisher_name = publisher_link.get_text(strip=True) if not publisher_name: publisher_name = self._extract_plain_value_from_info_p(publisher_p) # 女优可能有多个链接,拼接输出更可读。 stars: List[str] = [] if star_p: for star_link in star_p.find_all("a"): star_name = star_link.get_text(strip=True) if star_name: stars.append(star_name) # 提取 gid / uc,后续用于 AJAX 磁力查询。 gid_match = re.search(r"var\s+gid\s*=\s*(\d+)\s*;", html) uc_match = re.search(r"var\s+uc\s*=\s*(\d+)\s*;", html) gid = gid_match.group(1) if gid_match else "" uc = uc_match.group(1) if uc_match else "" return { "id": movie_id, "title": title_text, "img": img_url, "date": date_value, "video_length_minutes": length_minutes, "publisher_name": publisher_name, "stars": stars, "gid": gid, "uc": uc, } @staticmethod def _parse_size_to_bytes(size_text: str) -> float: """把尺寸文本(如 6.57GB)转换为字节数,用于排序。""" text = str(size_text or "").strip().upper() if not text: return 0.0 match = re.search(r"(\d+(?:\.\d+)?)\s*(KB|MB|GB|TB)", text) if not match: return 0.0 value = float(match.group(1)) unit = match.group(2) factor_map = { "KB": 1024.0, "MB": 1024.0 ** 2, "GB": 1024.0 ** 3, "TB": 1024.0 ** 4, } return value * factor_map.get(unit, 1.0) def _parse_magnets_from_html(self, html: str) -> List[Dict[str, Any]]: """解析 AJAX 返回的磁力表格。""" soup = BeautifulSoup(html, "html.parser") magnets: List[Dict[str, Any]] = [] rows = soup.select("tr") for tr in rows: first_td = tr.find("td") if not first_td: continue first_a = first_td.find("a") if not first_a: continue link = str(first_a.get("href") or "").strip() if not link.startswith("magnet:?xt=urn:btih:"): continue title = first_a.get_text(" ", strip=True) tags_text = first_td.get_text(" ", strip=True) is_hd = "高清" in tags_text has_subtitle = "字幕" in tags_text tds = tr.find_all("td") size_text = tds[1].get_text(" ", strip=True) if len(tds) > 1 else "" share_date = tds[2].get_text(" ", strip=True) if len(tds) > 2 else "" magnets.append( { "link": link, "title": title, "size": size_text, "number_size": self._parse_size_to_bytes(size_text), "share_date": share_date, "is_hd": is_hd, "has_subtitle": has_subtitle, } ) # 默认按大小降序,接近 javbus-api 的默认行为。 magnets.sort(key=lambda item: float(item.get("number_size") or 0.0), reverse=True) return magnets async def _fetch_movie_magnets(self, movie_id: str, gid: str, uc: str) -> List[Dict[str, Any]]: """通过 JavBus AJAX 接口获取磁力列表。""" if not gid or not uc: return [] ajax_url = f"{self.javbus_base_url}/ajax/uncledatoolsbyajax.php" referer = f"{self.javbus_base_url}/{movie_id}" html = await self._http_get_text( ajax_url, referer=referer, params={ "lang": "zh", "gid": gid, "uc": uc, }, ) return self._parse_magnets_from_html(html) def _pick_best_magnet(self, magnets: List[Dict[str, Any]]) -> str: """按策略选出一条最优磁力。""" if not magnets: return "" pool = magnets if self.prefer_subtitle_magnet: subtitle_pool = [item for item in magnets if bool(item.get("has_subtitle"))] if subtitle_pool: pool = subtitle_pool # 尺寸越大通常清晰度越高,这里取排序后的第一条。 best = pool[0] return str(best.get("link") or "").strip() def _format_result_text(self, detail: Dict[str, Any], best_magnet: str) -> str: """格式化返回给用户的文本。""" lines = [ f"✅ 查询成功:{detail.get('id') or '未知番号'}", f"标题:{detail.get('title') or '未提供标题'}", ] date_value = str(detail.get("date") or "").strip() if date_value: lines.append(f"发行日期:{date_value}") length_minutes = detail.get("video_length_minutes") if isinstance(length_minutes, int) and length_minutes > 0: lines.append(f"片长:{length_minutes} 分钟") publisher_name = str(detail.get("publisher_name") or "").strip() if publisher_name: lines.append(f"发行商:{publisher_name}") stars = detail.get("stars") or [] if isinstance(stars, list) and stars: lines.append(f"女优:{', '.join([str(s) for s in stars if str(s).strip()])}") if best_magnet: lines.append(f"磁力:{best_magnet}") return "\n".join(lines) @plugin_stats_decorator(plugin_name="番号查询") @plugin_points_cost(10, "番号查询消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理番号查询请求。""" content = str(message.get("content", "") or "").strip() self.LOG.debug(f"[{self.name}] 插件执行: content={content}") sender = message.get("sender") room_id = str(message.get("roomid", "") or "").strip() gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") target = room_id if room_id else sender # 群开关关闭时不处理。 if room_id and gbm and gbm.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" parts = content.split(" ", 1) if len(parts) < 2 or not parts[1].strip(): await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender) return False, "命令格式错误" raw_code = parts[1].strip() normalized_code = self._normalize_code(raw_code) if not normalized_code: await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender) return False, "命令格式错误" detail_url = f"{self.javbus_base_url}/{normalized_code}" self.LOG.info( f"[{self.name}] 收到查询: raw={raw_code}, normalized={normalized_code}, detail_url={detail_url}" ) try: # 1) 抓详情页并解析基础信息。 detail_html = await self._http_get_text(detail_url, referer=self.javbus_base_url) detail = self._extract_movie_detail_from_html(detail_html, normalized_code) if not detail: await bot.send_text_message( target, "未找到番号,或当前节点被 JavBus 重定向到登录页(常见于部分地区 IP)。", sender, ) return False, "未找到或被登录拦截" # 2) 可选抓磁力(需要 gid/uc)。 best_magnet = "" if self.allow_download_link: try: magnets = await self._fetch_movie_magnets( normalized_code, str(detail.get("gid") or ""), str(detail.get("uc") or ""), ) best_magnet = self._pick_best_magnet(magnets) except Exception as magnet_error: self.LOG.warning(f"[{self.name}] 磁力抓取失败: code={normalized_code}, error={magnet_error}") # 3) 先发文本,保证主链路可见。 text = self._format_result_text(detail, best_magnet) await bot.send_text_message(target, text, sender) # 4) 可选发封面:必须先下载为 bytes,再调用 send_image_message。 if self.allow_preview_cover: cover_url = str(detail.get("img") or "").strip() if cover_url: try: cover_bytes = await self._http_get_bytes(cover_url, referer=detail_url) await bot.send_image_message(target, cover_bytes) except Exception as cover_error: self.LOG.warning(f"[{self.name}] 封面发送失败: code={normalized_code}, error={cover_error}") return True, "查询成功" except Exception as e: self.LOG.exception(f"[{self.name}] 处理番号查询出错: {e}") return False, f"处理出错: {e}" def get_plugin(): """返回插件实例。""" return FanhaoSearchPlugin()