Files
abot/plugins/fanhao_search/main.py
2026-04-24 16:37:24 +08:00

514 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any, Dict, List, Optional, Tuple
import re
from urllib.parse import urljoin
import io
import aiohttp
from bs4 import BeautifulSoup
from loguru import logger
from PIL import Image, ImageFilter
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from wechat_ipad import WechatAPIClient
class FanhaoSearchPlugin(MessagePluginInterface):
"""番号查询插件JavBus 直连解析版)。
设计说明:
1. 不再依赖外部 javbus-api 服务,直接访问 JavBus 网页并解析;
2. 实现思路参考 ovnrain/javbus-api 的解析逻辑(详情页 + AJAX 磁力表);
3. 保留你现有项目的群权限、积分扣费、命令格式与日志风格。
"""
FEATURE_KEY = "FANHAO"
FEATURE_DESCRIPTION = "🔎 番号查询功能 [番号]"
@property
def name(self) -> str:
return "番号查询"
@property
def version(self) -> str:
return "3.0.0"
@property
def description(self) -> str:
return "直连JavBus站点解析影片详情与磁力信息。"
@property
def author(self) -> str:
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
# 注册群权限开关,便于后台按群启停。
self.feature = self.register_feature()
self.enable = True
self._commands: List[str] = ["番号", "番号查询"]
self.command_format = "番号 番号编号 例如:番号 SSIS-406"
# 站点基础配置:默认使用官方域名,可在配置中改成镜像域名。
self.javbus_base_url = "https://www.javbus.com"
self.request_timeout_seconds = 15
self.http_proxy = ""
# 功能开关:默认只返回文本详情,磁力和封面均默认关闭。
self.allow_download_link = False
self.allow_preview_cover = False
self.prefer_subtitle_magnet = True
# 封面图降敏配置:默认开启全图模糊,降低群内图片内容风险。
self.cover_blur_enable = True
self.cover_blur_radius = 18
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件配置。"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
self.event_system = context.get("event_system")
cfg = self._config.get("FanhaoSearch", {})
self.enable = bool(cfg.get("enable", True))
self._commands = cfg.get("command", ["番号", "番号查询"])
self.command_format = cfg.get("command-format", "番号 番号编号 例如:番号 SSIS-406")
self.javbus_base_url = str(cfg.get("javbus_base_url", "https://www.javbus.com") or "").strip().rstrip("/")
self.request_timeout_seconds = max(5, int(cfg.get("request_timeout_seconds", 15) or 15))
self.http_proxy = str(cfg.get("http_proxy", "") or "").strip()
self.allow_download_link = bool(cfg.get("allow_download_link", False))
self.allow_preview_cover = bool(cfg.get("allow_preview_cover", False))
self.prefer_subtitle_magnet = bool(cfg.get("prefer_subtitle_magnet", True))
self.cover_blur_enable = bool(cfg.get("cover_blur_enable", True))
self.cover_blur_radius = max(1, int(cfg.get("cover_blur_radius", 18) or 18))
self.LOG.info(
f"[{self.name}] 初始化完成: enable={self.enable}, commands={self._commands}, "
f"base_url={self.javbus_base_url}, allow_download_link={self.allow_download_link}, "
f"allow_preview_cover={self.allow_preview_cover}, cover_blur_enable={self.cover_blur_enable}, "
f"cover_blur_radius={self.cover_blur_radius}, timeout={self.request_timeout_seconds}s"
)
return True
def start(self) -> bool:
self.status = PluginStatus.RUNNING
self.LOG.debug(f"[{self.name}] 插件已启动")
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
self.LOG.info(f"[{self.name}] 插件已停止")
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""仅处理配置命令开头的文本消息。"""
if not self.enable:
return False
content = str(message.get("content", "") or "").strip()
if not content:
return False
command = content.split(" ")[0]
return command in self._commands
@staticmethod
def _normalize_code(text: str) -> str:
"""标准化番号。
处理策略:
1. 去掉前后空白并转大写;
2. 自动补横杠:如 ipzz108 -> IPZZ-108
3. 保留用户已有横杠结构,避免误改。
"""
code = (text or "").strip().upper()
return re.sub(r"([A-Z])(\d)", r"\1-\2", code)
def _build_headers(self, referer: str = "") -> Dict[str, str]:
"""构建请求头,模拟常见浏览器访问。"""
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
if referer:
headers["Referer"] = referer
return headers
def _build_proxy(self) -> Optional[str]:
"""获取代理配置(未配置返回 None"""
return self.http_proxy if self.http_proxy else None
async def _http_get_text(
self,
url: str,
*,
referer: str = "",
params: Optional[Dict[str, Any]] = None,
) -> str:
"""执行 HTTP GET 并返回文本。"""
timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds)
proxy = self._build_proxy()
async with aiohttp.ClientSession(timeout=timeout, headers=self._build_headers(referer)) as session:
async with session.get(url, params=params, proxy=proxy) as resp:
body = await resp.text(errors="ignore")
if resp.status < 200 or resp.status >= 300:
raise RuntimeError(f"请求失败 status={resp.status}, url={url}, body={body[:180]}")
return body
async def _http_get_bytes(self, url: str, *, referer: str = "") -> bytes:
"""执行 HTTP GET 并返回二进制,用于下载封面图。"""
timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds)
proxy = self._build_proxy()
async with aiohttp.ClientSession(timeout=timeout, headers=self._build_headers(referer)) as session:
async with session.get(url, proxy=proxy) as resp:
if resp.status < 200 or resp.status >= 300:
raise RuntimeError(f"图片下载失败 status={resp.status}, url={url}")
return await resp.read()
def _apply_full_blur_to_image_bytes(self, image_bytes: bytes) -> bytes:
"""对图片执行全图高斯模糊。
设计说明:
1. 这里使用 Pillow 的 GaussianBlur 对整图处理,不做局部区域判断;
2. 输出统一转为 JPEG避免源图格式差异导致发送失败
3. 若处理异常则回退原图,保证主流程可用性。
"""
if not image_bytes:
return image_bytes
if not self.cover_blur_enable:
return image_bytes
try:
with Image.open(io.BytesIO(image_bytes)) as image:
# 转为 RGB 统一颜色空间,避免 RGBA 保存 JPEG 报错。
if image.mode in ("RGBA", "P"):
image = image.convert("RGB")
# 全图高斯模糊,半径可在配置中调整。
blurred = image.filter(ImageFilter.GaussianBlur(radius=float(self.cover_blur_radius)))
output = io.BytesIO()
blurred.save(output, format="JPEG", quality=88)
return output.getvalue()
except Exception as e:
self.LOG.warning(f"[{self.name}] 封面全图模糊失败,回退原图: error={e}")
return image_bytes
@staticmethod
def _extract_plain_value_from_info_p(info_p) -> str:
"""从详情页 <p> 节点中提取纯文本值。
规则:
1. 先去掉 header 标签文本;
2. 再压缩多余空白;
3. 保留正文语义,避免把“識別碼:”一起带出来。
"""
if not info_p:
return ""
node = BeautifulSoup(str(info_p), "html.parser")
header = node.find(class_="header")
if header:
header.extract()
text = node.get_text(" ", strip=True)
return re.sub(r"\s+", " ", text).strip()
@staticmethod
def _find_info_p_by_header(info_ps: List[Any], header_keywords: List[str]):
"""根据 header 关键字定位详情信息行。"""
for p in info_ps:
header = p.find(class_="header")
if not header:
continue
header_text = header.get_text(strip=True)
if any(keyword in header_text for keyword in header_keywords):
return p
return None
def _extract_movie_detail_from_html(self, html: str, movie_id: str) -> Optional[Dict[str, Any]]:
"""解析详情页 HTML提取核心字段。"""
soup = BeautifulSoup(html, "html.parser")
# 若被站点重定向到登录页或拦截页,通常页面包含 login 关键词或缺失详情容器。
page_text = soup.get_text(" ", strip=True).lower()
if "login" in page_text and "javbus" in page_text and not soup.select_one(".container .movie"):
return None
movie_container = soup.select_one(".container .movie")
if not movie_container:
return None
# 标题与封面图。
title = (soup.select_one(".container h3") or soup.select_one("h3"))
title_text = title.get_text(strip=True) if title else ""
img_node = soup.select_one(".bigImage img")
img_url = str(img_node.get("src") or "").strip() if img_node else ""
if img_url and img_url.startswith("/"):
img_url = urljoin(self.javbus_base_url, img_url)
# 基本信息块:统一从 p.header 结构里定位。
info_ps = movie_container.select(".info p")
date_p = self._find_info_p_by_header(info_ps, ["發行日期", "发行日期"])
length_p = self._find_info_p_by_header(info_ps, ["長度", "长度"])
publisher_p = self._find_info_p_by_header(info_ps, ["發行商", "发行商"])
star_p = self._find_info_p_by_header(info_ps, ["演員", "演员"])
date_value = self._extract_plain_value_from_info_p(date_p)
length_value = self._extract_plain_value_from_info_p(length_p)
length_minutes = None
if length_value:
match = re.search(r"(\d+)", length_value)
if match:
length_minutes = int(match.group(1))
publisher_name = ""
if publisher_p:
publisher_link = publisher_p.find("a")
if publisher_link:
publisher_name = publisher_link.get_text(strip=True)
if not publisher_name:
publisher_name = self._extract_plain_value_from_info_p(publisher_p)
# 女优可能有多个链接,拼接输出更可读。
stars: List[str] = []
if star_p:
for star_link in star_p.find_all("a"):
star_name = star_link.get_text(strip=True)
if star_name:
stars.append(star_name)
# 提取 gid / uc后续用于 AJAX 磁力查询。
gid_match = re.search(r"var\s+gid\s*=\s*(\d+)\s*;", html)
uc_match = re.search(r"var\s+uc\s*=\s*(\d+)\s*;", html)
gid = gid_match.group(1) if gid_match else ""
uc = uc_match.group(1) if uc_match else ""
return {
"id": movie_id,
"title": title_text,
"img": img_url,
"date": date_value,
"video_length_minutes": length_minutes,
"publisher_name": publisher_name,
"stars": stars,
"gid": gid,
"uc": uc,
}
@staticmethod
def _parse_size_to_bytes(size_text: str) -> float:
"""把尺寸文本(如 6.57GB)转换为字节数,用于排序。"""
text = str(size_text or "").strip().upper()
if not text:
return 0.0
match = re.search(r"(\d+(?:\.\d+)?)\s*(KB|MB|GB|TB)", text)
if not match:
return 0.0
value = float(match.group(1))
unit = match.group(2)
factor_map = {
"KB": 1024.0,
"MB": 1024.0 ** 2,
"GB": 1024.0 ** 3,
"TB": 1024.0 ** 4,
}
return value * factor_map.get(unit, 1.0)
def _parse_magnets_from_html(self, html: str) -> List[Dict[str, Any]]:
"""解析 AJAX 返回的磁力表格。"""
soup = BeautifulSoup(html, "html.parser")
magnets: List[Dict[str, Any]] = []
rows = soup.select("tr")
for tr in rows:
first_td = tr.find("td")
if not first_td:
continue
first_a = first_td.find("a")
if not first_a:
continue
link = str(first_a.get("href") or "").strip()
if not link.startswith("magnet:?xt=urn:btih:"):
continue
title = first_a.get_text(" ", strip=True)
tags_text = first_td.get_text(" ", strip=True)
is_hd = "高清" in tags_text
has_subtitle = "字幕" in tags_text
tds = tr.find_all("td")
size_text = tds[1].get_text(" ", strip=True) if len(tds) > 1 else ""
share_date = tds[2].get_text(" ", strip=True) if len(tds) > 2 else ""
magnets.append(
{
"link": link,
"title": title,
"size": size_text,
"number_size": self._parse_size_to_bytes(size_text),
"share_date": share_date,
"is_hd": is_hd,
"has_subtitle": has_subtitle,
}
)
# 默认按大小降序,接近 javbus-api 的默认行为。
magnets.sort(key=lambda item: float(item.get("number_size") or 0.0), reverse=True)
return magnets
async def _fetch_movie_magnets(self, movie_id: str, gid: str, uc: str) -> List[Dict[str, Any]]:
"""通过 JavBus AJAX 接口获取磁力列表。"""
if not gid or not uc:
return []
ajax_url = f"{self.javbus_base_url}/ajax/uncledatoolsbyajax.php"
referer = f"{self.javbus_base_url}/{movie_id}"
html = await self._http_get_text(
ajax_url,
referer=referer,
params={
"lang": "zh",
"gid": gid,
"uc": uc,
},
)
return self._parse_magnets_from_html(html)
def _pick_best_magnet(self, magnets: List[Dict[str, Any]]) -> str:
"""按策略选出一条最优磁力。"""
if not magnets:
return ""
pool = magnets
if self.prefer_subtitle_magnet:
subtitle_pool = [item for item in magnets if bool(item.get("has_subtitle"))]
if subtitle_pool:
pool = subtitle_pool
# 尺寸越大通常清晰度越高,这里取排序后的第一条。
best = pool[0]
return str(best.get("link") or "").strip()
def _format_result_text(self, detail: Dict[str, Any], best_magnet: str) -> str:
"""格式化返回给用户的文本。"""
lines = [
f"✅ 查询成功:{detail.get('id') or '未知番号'}",
f"标题:{detail.get('title') or '未提供标题'}",
]
date_value = str(detail.get("date") or "").strip()
if date_value:
lines.append(f"发行日期:{date_value}")
length_minutes = detail.get("video_length_minutes")
if isinstance(length_minutes, int) and length_minutes > 0:
lines.append(f"片长:{length_minutes} 分钟")
publisher_name = str(detail.get("publisher_name") or "").strip()
if publisher_name:
lines.append(f"发行商:{publisher_name}")
stars = detail.get("stars") or []
if isinstance(stars, list) and stars:
lines.append(f"女优:{', '.join([str(s) for s in stars if str(s).strip()])}")
if best_magnet:
lines.append(f"磁力:{best_magnet}")
return "\n".join(lines)
@plugin_stats_decorator(plugin_name="番号查询")
@plugin_points_cost(100, "番号查询消耗积分", FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理番号查询请求。"""
content = str(message.get("content", "") or "").strip()
self.LOG.debug(f"[{self.name}] 插件执行: content={content}")
sender = message.get("sender")
room_id = str(message.get("roomid", "") or "").strip()
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
target = room_id if room_id else sender
# 群开关关闭时不处理。
if room_id and gbm and gbm.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
parts = content.split(" ", 1)
if len(parts) < 2 or not parts[1].strip():
await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender)
return False, "命令格式错误"
raw_code = parts[1].strip()
normalized_code = self._normalize_code(raw_code)
if not normalized_code:
await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender)
return False, "命令格式错误"
detail_url = f"{self.javbus_base_url}/{normalized_code}"
self.LOG.info(
f"[{self.name}] 收到查询: raw={raw_code}, normalized={normalized_code}, detail_url={detail_url}"
)
try:
# 1) 抓详情页并解析基础信息。
detail_html = await self._http_get_text(detail_url, referer=self.javbus_base_url)
detail = self._extract_movie_detail_from_html(detail_html, normalized_code)
if not detail:
await bot.send_text_message(
target,
"未找到番号,或当前节点被 JavBus 重定向到登录页(常见于部分地区 IP",
sender,
)
return False, "未找到或被登录拦截"
# 2) 可选抓磁力(需要 gid/uc
best_magnet = ""
if self.allow_download_link:
try:
magnets = await self._fetch_movie_magnets(
normalized_code,
str(detail.get("gid") or ""),
str(detail.get("uc") or ""),
)
best_magnet = self._pick_best_magnet(magnets)
except Exception as magnet_error:
self.LOG.warning(f"[{self.name}] 磁力抓取失败: code={normalized_code}, error={magnet_error}")
# 3) 先发文本,保证主链路可见。
text = self._format_result_text(detail, best_magnet)
await bot.send_text_message(target, text, sender)
# 4) 可选发封面:必须先下载为 bytes再调用 send_image_message。
if self.allow_preview_cover:
cover_url = str(detail.get("img") or "").strip()
if cover_url:
try:
cover_bytes = await self._http_get_bytes(cover_url, referer=detail_url)
# 发送前执行全图模糊,避免直接发送原始封面。
cover_bytes = self._apply_full_blur_to_image_bytes(cover_bytes)
await bot.send_image_message(target, cover_bytes)
except Exception as cover_error:
self.LOG.warning(f"[{self.name}] 封面发送失败: code={normalized_code}, error={cover_error}")
return True, "查询成功"
except Exception as e:
self.LOG.exception(f"[{self.name}] 处理番号查询出错: {e}")
return False, f"处理出错: {e}"
def get_plugin():
"""返回插件实例。"""
return FanhaoSearchPlugin()