变更项: 1. 重写 fanhao_search 主逻辑,参考 ovnrain/javbus-api 直接抓取 JavBus 详情页并解析字段。 2. 增加 gid/uc 提取与 AJAX 磁力表解析,支持可选磁力返回及字幕优先策略。 3. 修复封面发送链路:改为先下载图片 bytes 再调用 send_image_message。 4. 配置项改为 javbus_base_url/http_proxy,不再需要部署 javbus-api 服务。 5. 增强登录拦截场景处理:详情容器缺失或 login 页时返回明确提示。
479 lines
19 KiB
Python
479 lines
19 KiB
Python
from typing import Any, Dict, List, Optional, Tuple
|
||
import re
|
||
from urllib.parse import urljoin
|
||
|
||
import aiohttp
|
||
from bs4 import BeautifulSoup
|
||
from loguru import logger
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.decorator.points_decorator import plugin_points_cost
|
||
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class FanhaoSearchPlugin(MessagePluginInterface):
|
||
"""番号查询插件(JavBus 直连解析版)。
|
||
|
||
设计说明:
|
||
1. 不再依赖外部 javbus-api 服务,直接访问 JavBus 网页并解析;
|
||
2. 实现思路参考 ovnrain/javbus-api 的解析逻辑(详情页 + AJAX 磁力表);
|
||
3. 保留你现有项目的群权限、积分扣费、命令格式与日志风格。
|
||
"""
|
||
|
||
FEATURE_KEY = "FANHAO"
|
||
FEATURE_DESCRIPTION = "🔎 番号查询功能 [番号]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "番号查询"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "3.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "直连JavBus站点解析影片详情与磁力信息。"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "ABOT Team"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return ""
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
# 注册群权限开关,便于后台按群启停。
|
||
self.feature = self.register_feature()
|
||
self.enable = True
|
||
self._commands: List[str] = ["番号", "番号查询"]
|
||
self.command_format = "番号 番号编号 例如:番号 SSIS-406"
|
||
# 站点基础配置:默认使用官方域名,可在配置中改成镜像域名。
|
||
self.javbus_base_url = "https://www.javbus.com"
|
||
self.request_timeout_seconds = 15
|
||
self.http_proxy = ""
|
||
# 功能开关:默认只返回文本详情,磁力和封面均默认关闭。
|
||
self.allow_download_link = False
|
||
self.allow_preview_cover = False
|
||
self.prefer_subtitle_magnet = True
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件配置。"""
|
||
self.LOG = logger
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
self.event_system = context.get("event_system")
|
||
|
||
cfg = self._config.get("FanhaoSearch", {})
|
||
self.enable = bool(cfg.get("enable", True))
|
||
self._commands = cfg.get("command", ["番号", "番号查询"])
|
||
self.command_format = cfg.get("command-format", "番号 番号编号 例如:番号 SSIS-406")
|
||
self.javbus_base_url = str(cfg.get("javbus_base_url", "https://www.javbus.com") or "").strip().rstrip("/")
|
||
self.request_timeout_seconds = max(5, int(cfg.get("request_timeout_seconds", 15) or 15))
|
||
self.http_proxy = str(cfg.get("http_proxy", "") or "").strip()
|
||
self.allow_download_link = bool(cfg.get("allow_download_link", False))
|
||
self.allow_preview_cover = bool(cfg.get("allow_preview_cover", False))
|
||
self.prefer_subtitle_magnet = bool(cfg.get("prefer_subtitle_magnet", True))
|
||
|
||
self.LOG.info(
|
||
f"[{self.name}] 初始化完成: enable={self.enable}, commands={self._commands}, "
|
||
f"base_url={self.javbus_base_url}, allow_download_link={self.allow_download_link}, "
|
||
f"allow_preview_cover={self.allow_preview_cover}, timeout={self.request_timeout_seconds}s"
|
||
)
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
self.status = PluginStatus.RUNNING
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
self.status = PluginStatus.STOPPED
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""仅处理配置命令开头的文本消息。"""
|
||
if not self.enable:
|
||
return False
|
||
content = str(message.get("content", "") or "").strip()
|
||
if not content:
|
||
return False
|
||
command = content.split(" ")[0]
|
||
return command in self._commands
|
||
|
||
@staticmethod
|
||
def _normalize_code(text: str) -> str:
|
||
"""标准化番号。
|
||
|
||
处理策略:
|
||
1. 去掉前后空白并转大写;
|
||
2. 自动补横杠:如 ipzz108 -> IPZZ-108;
|
||
3. 保留用户已有横杠结构,避免误改。
|
||
"""
|
||
code = (text or "").strip().upper()
|
||
return re.sub(r"([A-Z])(\d)", r"\1-\2", code)
|
||
|
||
def _build_headers(self, referer: str = "") -> Dict[str, str]:
|
||
"""构建请求头,模拟常见浏览器访问。"""
|
||
headers = {
|
||
"User-Agent": (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/124.0.0.0 Safari/537.36"
|
||
),
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
}
|
||
if referer:
|
||
headers["Referer"] = referer
|
||
return headers
|
||
|
||
def _build_proxy(self) -> Optional[str]:
|
||
"""获取代理配置(未配置返回 None)。"""
|
||
return self.http_proxy if self.http_proxy else None
|
||
|
||
async def _http_get_text(
|
||
self,
|
||
url: str,
|
||
*,
|
||
referer: str = "",
|
||
params: Optional[Dict[str, Any]] = None,
|
||
) -> str:
|
||
"""执行 HTTP GET 并返回文本。"""
|
||
timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds)
|
||
proxy = self._build_proxy()
|
||
async with aiohttp.ClientSession(timeout=timeout, headers=self._build_headers(referer)) as session:
|
||
async with session.get(url, params=params, proxy=proxy) as resp:
|
||
body = await resp.text(errors="ignore")
|
||
if resp.status < 200 or resp.status >= 300:
|
||
raise RuntimeError(f"请求失败 status={resp.status}, url={url}, body={body[:180]}")
|
||
return body
|
||
|
||
async def _http_get_bytes(self, url: str, *, referer: str = "") -> bytes:
|
||
"""执行 HTTP GET 并返回二进制,用于下载封面图。"""
|
||
timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds)
|
||
proxy = self._build_proxy()
|
||
async with aiohttp.ClientSession(timeout=timeout, headers=self._build_headers(referer)) as session:
|
||
async with session.get(url, proxy=proxy) as resp:
|
||
if resp.status < 200 or resp.status >= 300:
|
||
raise RuntimeError(f"图片下载失败 status={resp.status}, url={url}")
|
||
return await resp.read()
|
||
|
||
@staticmethod
|
||
def _extract_plain_value_from_info_p(info_p) -> str:
|
||
"""从详情页 <p> 节点中提取纯文本值。
|
||
|
||
规则:
|
||
1. 先去掉 header 标签文本;
|
||
2. 再压缩多余空白;
|
||
3. 保留正文语义,避免把“識別碼:”一起带出来。
|
||
"""
|
||
if not info_p:
|
||
return ""
|
||
node = BeautifulSoup(str(info_p), "html.parser")
|
||
header = node.find(class_="header")
|
||
if header:
|
||
header.extract()
|
||
text = node.get_text(" ", strip=True)
|
||
return re.sub(r"\s+", " ", text).strip()
|
||
|
||
@staticmethod
|
||
def _find_info_p_by_header(info_ps: List[Any], header_keywords: List[str]):
|
||
"""根据 header 关键字定位详情信息行。"""
|
||
for p in info_ps:
|
||
header = p.find(class_="header")
|
||
if not header:
|
||
continue
|
||
header_text = header.get_text(strip=True)
|
||
if any(keyword in header_text for keyword in header_keywords):
|
||
return p
|
||
return None
|
||
|
||
def _extract_movie_detail_from_html(self, html: str, movie_id: str) -> Optional[Dict[str, Any]]:
|
||
"""解析详情页 HTML,提取核心字段。"""
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
|
||
# 若被站点重定向到登录页或拦截页,通常页面包含 login 关键词或缺失详情容器。
|
||
page_text = soup.get_text(" ", strip=True).lower()
|
||
if "login" in page_text and "javbus" in page_text and not soup.select_one(".container .movie"):
|
||
return None
|
||
|
||
movie_container = soup.select_one(".container .movie")
|
||
if not movie_container:
|
||
return None
|
||
|
||
# 标题与封面图。
|
||
title = (soup.select_one(".container h3") or soup.select_one("h3"))
|
||
title_text = title.get_text(strip=True) if title else ""
|
||
img_node = soup.select_one(".bigImage img")
|
||
img_url = str(img_node.get("src") or "").strip() if img_node else ""
|
||
if img_url and img_url.startswith("/"):
|
||
img_url = urljoin(self.javbus_base_url, img_url)
|
||
|
||
# 基本信息块:统一从 p.header 结构里定位。
|
||
info_ps = movie_container.select(".info p")
|
||
date_p = self._find_info_p_by_header(info_ps, ["發行日期", "发行日期"])
|
||
length_p = self._find_info_p_by_header(info_ps, ["長度", "长度"])
|
||
publisher_p = self._find_info_p_by_header(info_ps, ["發行商", "发行商"])
|
||
star_p = self._find_info_p_by_header(info_ps, ["演員", "演员"])
|
||
|
||
date_value = self._extract_plain_value_from_info_p(date_p)
|
||
length_value = self._extract_plain_value_from_info_p(length_p)
|
||
length_minutes = None
|
||
if length_value:
|
||
match = re.search(r"(\d+)", length_value)
|
||
if match:
|
||
length_minutes = int(match.group(1))
|
||
|
||
publisher_name = ""
|
||
if publisher_p:
|
||
publisher_link = publisher_p.find("a")
|
||
if publisher_link:
|
||
publisher_name = publisher_link.get_text(strip=True)
|
||
if not publisher_name:
|
||
publisher_name = self._extract_plain_value_from_info_p(publisher_p)
|
||
|
||
# 女优可能有多个链接,拼接输出更可读。
|
||
stars: List[str] = []
|
||
if star_p:
|
||
for star_link in star_p.find_all("a"):
|
||
star_name = star_link.get_text(strip=True)
|
||
if star_name:
|
||
stars.append(star_name)
|
||
|
||
# 提取 gid / uc,后续用于 AJAX 磁力查询。
|
||
gid_match = re.search(r"var\s+gid\s*=\s*(\d+)\s*;", html)
|
||
uc_match = re.search(r"var\s+uc\s*=\s*(\d+)\s*;", html)
|
||
gid = gid_match.group(1) if gid_match else ""
|
||
uc = uc_match.group(1) if uc_match else ""
|
||
|
||
return {
|
||
"id": movie_id,
|
||
"title": title_text,
|
||
"img": img_url,
|
||
"date": date_value,
|
||
"video_length_minutes": length_minutes,
|
||
"publisher_name": publisher_name,
|
||
"stars": stars,
|
||
"gid": gid,
|
||
"uc": uc,
|
||
}
|
||
|
||
@staticmethod
|
||
def _parse_size_to_bytes(size_text: str) -> float:
|
||
"""把尺寸文本(如 6.57GB)转换为字节数,用于排序。"""
|
||
text = str(size_text or "").strip().upper()
|
||
if not text:
|
||
return 0.0
|
||
match = re.search(r"(\d+(?:\.\d+)?)\s*(KB|MB|GB|TB)", text)
|
||
if not match:
|
||
return 0.0
|
||
value = float(match.group(1))
|
||
unit = match.group(2)
|
||
factor_map = {
|
||
"KB": 1024.0,
|
||
"MB": 1024.0 ** 2,
|
||
"GB": 1024.0 ** 3,
|
||
"TB": 1024.0 ** 4,
|
||
}
|
||
return value * factor_map.get(unit, 1.0)
|
||
|
||
def _parse_magnets_from_html(self, html: str) -> List[Dict[str, Any]]:
|
||
"""解析 AJAX 返回的磁力表格。"""
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
magnets: List[Dict[str, Any]] = []
|
||
rows = soup.select("tr")
|
||
for tr in rows:
|
||
first_td = tr.find("td")
|
||
if not first_td:
|
||
continue
|
||
first_a = first_td.find("a")
|
||
if not first_a:
|
||
continue
|
||
link = str(first_a.get("href") or "").strip()
|
||
if not link.startswith("magnet:?xt=urn:btih:"):
|
||
continue
|
||
|
||
title = first_a.get_text(" ", strip=True)
|
||
tags_text = first_td.get_text(" ", strip=True)
|
||
is_hd = "高清" in tags_text
|
||
has_subtitle = "字幕" in tags_text
|
||
|
||
tds = tr.find_all("td")
|
||
size_text = tds[1].get_text(" ", strip=True) if len(tds) > 1 else ""
|
||
share_date = tds[2].get_text(" ", strip=True) if len(tds) > 2 else ""
|
||
|
||
magnets.append(
|
||
{
|
||
"link": link,
|
||
"title": title,
|
||
"size": size_text,
|
||
"number_size": self._parse_size_to_bytes(size_text),
|
||
"share_date": share_date,
|
||
"is_hd": is_hd,
|
||
"has_subtitle": has_subtitle,
|
||
}
|
||
)
|
||
|
||
# 默认按大小降序,接近 javbus-api 的默认行为。
|
||
magnets.sort(key=lambda item: float(item.get("number_size") or 0.0), reverse=True)
|
||
return magnets
|
||
|
||
async def _fetch_movie_magnets(self, movie_id: str, gid: str, uc: str) -> List[Dict[str, Any]]:
|
||
"""通过 JavBus AJAX 接口获取磁力列表。"""
|
||
if not gid or not uc:
|
||
return []
|
||
ajax_url = f"{self.javbus_base_url}/ajax/uncledatoolsbyajax.php"
|
||
referer = f"{self.javbus_base_url}/{movie_id}"
|
||
html = await self._http_get_text(
|
||
ajax_url,
|
||
referer=referer,
|
||
params={
|
||
"lang": "zh",
|
||
"gid": gid,
|
||
"uc": uc,
|
||
},
|
||
)
|
||
return self._parse_magnets_from_html(html)
|
||
|
||
def _pick_best_magnet(self, magnets: List[Dict[str, Any]]) -> str:
|
||
"""按策略选出一条最优磁力。"""
|
||
if not magnets:
|
||
return ""
|
||
pool = magnets
|
||
if self.prefer_subtitle_magnet:
|
||
subtitle_pool = [item for item in magnets if bool(item.get("has_subtitle"))]
|
||
if subtitle_pool:
|
||
pool = subtitle_pool
|
||
# 尺寸越大通常清晰度越高,这里取排序后的第一条。
|
||
best = pool[0]
|
||
return str(best.get("link") or "").strip()
|
||
|
||
def _format_result_text(self, detail: Dict[str, Any], best_magnet: str) -> str:
|
||
"""格式化返回给用户的文本。"""
|
||
lines = [
|
||
f"✅ 查询成功:{detail.get('id') or '未知番号'}",
|
||
f"标题:{detail.get('title') or '未提供标题'}",
|
||
]
|
||
|
||
date_value = str(detail.get("date") or "").strip()
|
||
if date_value:
|
||
lines.append(f"发行日期:{date_value}")
|
||
|
||
length_minutes = detail.get("video_length_minutes")
|
||
if isinstance(length_minutes, int) and length_minutes > 0:
|
||
lines.append(f"片长:{length_minutes} 分钟")
|
||
|
||
publisher_name = str(detail.get("publisher_name") or "").strip()
|
||
if publisher_name:
|
||
lines.append(f"发行商:{publisher_name}")
|
||
|
||
stars = detail.get("stars") or []
|
||
if isinstance(stars, list) and stars:
|
||
lines.append(f"女优:{', '.join([str(s) for s in stars if str(s).strip()])}")
|
||
|
||
if best_magnet:
|
||
lines.append(f"磁力:{best_magnet}")
|
||
|
||
return "\n".join(lines)
|
||
|
||
@plugin_stats_decorator(plugin_name="番号查询")
|
||
@plugin_points_cost(10, "番号查询消耗积分", FEATURE_KEY)
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理番号查询请求。"""
|
||
content = str(message.get("content", "") or "").strip()
|
||
self.LOG.debug(f"[{self.name}] 插件执行: content={content}")
|
||
|
||
sender = message.get("sender")
|
||
room_id = str(message.get("roomid", "") or "").strip()
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
bot: WechatAPIClient = message.get("bot")
|
||
target = room_id if room_id else sender
|
||
|
||
# 群开关关闭时不处理。
|
||
if room_id and gbm and gbm.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
parts = content.split(" ", 1)
|
||
if len(parts) < 2 or not parts[1].strip():
|
||
await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender)
|
||
return False, "命令格式错误"
|
||
|
||
raw_code = parts[1].strip()
|
||
normalized_code = self._normalize_code(raw_code)
|
||
if not normalized_code:
|
||
await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender)
|
||
return False, "命令格式错误"
|
||
|
||
detail_url = f"{self.javbus_base_url}/{normalized_code}"
|
||
self.LOG.info(
|
||
f"[{self.name}] 收到查询: raw={raw_code}, normalized={normalized_code}, detail_url={detail_url}"
|
||
)
|
||
|
||
try:
|
||
# 1) 抓详情页并解析基础信息。
|
||
detail_html = await self._http_get_text(detail_url, referer=self.javbus_base_url)
|
||
detail = self._extract_movie_detail_from_html(detail_html, normalized_code)
|
||
if not detail:
|
||
await bot.send_text_message(
|
||
target,
|
||
"未找到番号,或当前节点被 JavBus 重定向到登录页(常见于部分地区 IP)。",
|
||
sender,
|
||
)
|
||
return False, "未找到或被登录拦截"
|
||
|
||
# 2) 可选抓磁力(需要 gid/uc)。
|
||
best_magnet = ""
|
||
if self.allow_download_link:
|
||
try:
|
||
magnets = await self._fetch_movie_magnets(
|
||
normalized_code,
|
||
str(detail.get("gid") or ""),
|
||
str(detail.get("uc") or ""),
|
||
)
|
||
best_magnet = self._pick_best_magnet(magnets)
|
||
except Exception as magnet_error:
|
||
self.LOG.warning(f"[{self.name}] 磁力抓取失败: code={normalized_code}, error={magnet_error}")
|
||
|
||
# 3) 先发文本,保证主链路可见。
|
||
text = self._format_result_text(detail, best_magnet)
|
||
await bot.send_text_message(target, text, sender)
|
||
|
||
# 4) 可选发封面:必须先下载为 bytes,再调用 send_image_message。
|
||
if self.allow_preview_cover:
|
||
cover_url = str(detail.get("img") or "").strip()
|
||
if cover_url:
|
||
try:
|
||
cover_bytes = await self._http_get_bytes(cover_url, referer=detail_url)
|
||
await bot.send_image_message(target, cover_bytes)
|
||
except Exception as cover_error:
|
||
self.LOG.warning(f"[{self.name}] 封面发送失败: code={normalized_code}, error={cover_error}")
|
||
|
||
return True, "查询成功"
|
||
except Exception as e:
|
||
self.LOG.exception(f"[{self.name}] 处理番号查询出错: {e}")
|
||
return False, f"处理出错: {e}"
|
||
|
||
|
||
def get_plugin():
|
||
"""返回插件实例。"""
|
||
return FanhaoSearchPlugin()
|
||
|