diff --git a/plugins/fanhao_search/config.toml b/plugins/fanhao_search/config.toml index 837f25a..6b4782b 100644 --- a/plugins/fanhao_search/config.toml +++ b/plugins/fanhao_search/config.toml @@ -6,18 +6,15 @@ command-format = """ 番号 番号编号 例如:番号 SSIS-406 """ -# JavBus API 服务地址(必填) -# 示例:api_prefix = "http://127.0.0.1:8922" -api_prefix = "" - -# 可选鉴权 token,对应请求头 j-auth-token -auth_token = "" +# JavBus 站点地址(可替换为你可用的镜像域名) +javbus_base_url = "https://www.javbus.com" # 请求超时(秒) request_timeout_seconds = 15 -# 详情路由模板(兼容不同 javbus-api 版本) -movie_detail_paths = ["/api/v1/movies/{id}", "/api/movies/{id}"] +# 可选代理(常用于绕过地区限制),示例: +# http_proxy = "http://127.0.0.1:7890" +http_proxy = "" # 是否返回磁力(默认关闭,避免群里长文本刷屏) allow_download_link = false @@ -25,4 +22,7 @@ allow_download_link = false # 是否发送封面预览图(默认关闭) allow_preview_cover = false +# 当开启磁力时,是否优先返回“带字幕”磁力 +prefer_subtitle_magnet = true + diff --git a/plugins/fanhao_search/main.py b/plugins/fanhao_search/main.py index 99f8b83..be8791f 100644 --- a/plugins/fanhao_search/main.py +++ b/plugins/fanhao_search/main.py @@ -1,8 +1,9 @@ from typing import Any, Dict, List, Optional, Tuple import re -from urllib.parse import quote +from urllib.parse import urljoin import aiohttp +from bs4 import BeautifulSoup from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface @@ -14,12 +15,12 @@ from wechat_ipad import WechatAPIClient class FanhaoSearchPlugin(MessagePluginInterface): - """番号查询插件(JavBus API 在线版)。 + """番号查询插件(JavBus 直连解析版)。 - 重构说明: - 1. 参考 koishi-plugin-javbus 的“在线 API 查询”模式,替换原 Mongo 本地库检索; - 2. 与你当前 ABOT 插件接口对齐:群权限、积分扣费、消息发送、日志结构保持一致; - 3. 为兼容不同 javbus-api 版本,内置 /api/v1 与 /api 两套路由回退,避免单点失效。 + 设计说明: + 1. 不再依赖外部 javbus-api 服务,直接访问 JavBus 网页并解析; + 2. 实现思路参考 ovnrain/javbus-api 的解析逻辑(详情页 + AJAX 磁力表); + 3. 保留你现有项目的群权限、积分扣费、命令格式与日志风格。 """ FEATURE_KEY = "FANHAO" @@ -31,11 +32,11 @@ class FanhaoSearchPlugin(MessagePluginInterface): @property def version(self) -> str: - return "2.0.0" + return "3.0.0" @property def description(self) -> str: - return "基于JavBus API的番号查询,支持详情/磁力/封面返回。" + return "直连JavBus站点解析影片详情与磁力信息。" @property def author(self) -> str: @@ -59,20 +60,19 @@ class FanhaoSearchPlugin(MessagePluginInterface): def __init__(self): super().__init__() - # 注册功能权限,确保群管理页可配置开启/关闭“番号查询”。 + # 注册群权限开关,便于后台按群启停。 self.feature = self.register_feature() self.enable = True self._commands: List[str] = ["番号", "番号查询"] self.command_format = "番号 番号编号 例如:番号 SSIS-406" - # API 基础配置:api_prefix 为必填,未配置时会显式报错提醒。 - self.api_prefix = "" - self.api_token = "" + # 站点基础配置:默认使用官方域名,可在配置中改成镜像域名。 + self.javbus_base_url = "https://www.javbus.com" self.request_timeout_seconds = 15 - # 参考 koishi-plugin-javbus 的两个关键开关。 + self.http_proxy = "" + # 功能开关:默认只返回文本详情,磁力和封面均默认关闭。 self.allow_download_link = False self.allow_preview_cover = False - # 详情接口路由模板:默认优先 v1,再回退 v0 风格。 - self.movie_detail_paths: List[str] = ["/api/v1/movies/{id}", "/api/movies/{id}"] + self.prefer_subtitle_magnet = True def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件配置。""" @@ -81,30 +81,19 @@ class FanhaoSearchPlugin(MessagePluginInterface): self.event_system = context.get("event_system") cfg = self._config.get("FanhaoSearch", {}) + self.enable = bool(cfg.get("enable", True)) self._commands = cfg.get("command", ["番号", "番号查询"]) self.command_format = cfg.get("command-format", "番号 番号编号 例如:番号 SSIS-406") - self.enable = bool(cfg.get("enable", True)) - - # 用户部署的 javbus-api 前缀,例如:https://xxx.com 或 http://127.0.0.1:8922 - self.api_prefix = str(cfg.get("api_prefix", "") or "").strip().rstrip("/") - # javbus-api 的可选鉴权 Token,对应请求头 j-auth-token。 - self.api_token = str(cfg.get("auth_token", "") or "").strip() + self.javbus_base_url = str(cfg.get("javbus_base_url", "https://www.javbus.com") or "").strip().rstrip("/") self.request_timeout_seconds = max(5, int(cfg.get("request_timeout_seconds", 15) or 15)) + self.http_proxy = str(cfg.get("http_proxy", "") or "").strip() self.allow_download_link = bool(cfg.get("allow_download_link", False)) self.allow_preview_cover = bool(cfg.get("allow_preview_cover", False)) - - # 允许外部覆盖路径列表,便于你后续升级 API 版本时无代码切换。 - configured_paths = cfg.get("movie_detail_paths", ["/api/v1/movies/{id}", "/api/movies/{id}"]) - normalized_paths: List[str] = [] - for path in configured_paths: - text = str(path or "").strip() - if text and "{id}" in text: - normalized_paths.append(text) - self.movie_detail_paths = normalized_paths or ["/api/v1/movies/{id}", "/api/movies/{id}"] + self.prefer_subtitle_magnet = bool(cfg.get("prefer_subtitle_magnet", True)) self.LOG.info( f"[{self.name}] 初始化完成: enable={self.enable}, commands={self._commands}, " - f"api_prefix={'已配置' if self.api_prefix else '未配置'}, allow_download_link={self.allow_download_link}, " + f"base_url={self.javbus_base_url}, allow_download_link={self.allow_download_link}, " f"allow_preview_cover={self.allow_preview_cover}, timeout={self.request_timeout_seconds}s" ) return True @@ -120,7 +109,7 @@ class FanhaoSearchPlugin(MessagePluginInterface): return True def can_process(self, message: Dict[str, Any]) -> bool: - """只处理配置命令开头的文本消息。""" + """仅处理配置命令开头的文本消息。""" if not self.enable: return False content = str(message.get("content", "") or "").strip() @@ -133,159 +122,282 @@ class FanhaoSearchPlugin(MessagePluginInterface): def _normalize_code(text: str) -> str: """标准化番号。 - 规则说明: - 1. 自动去空格并转大写; - 2. 处理“字母+数字无横杠”场景,如 ipzz108 -> IPZZ-108; - 3. 保留用户已有横杠结构,避免误改合法编号。 + 处理策略: + 1. 去掉前后空白并转大写; + 2. 自动补横杠:如 ipzz108 -> IPZZ-108; + 3. 保留用户已有横杠结构,避免误改。 """ code = (text or "").strip().upper() return re.sub(r"([A-Z])(\d)", r"\1-\2", code) - def _build_request_headers(self) -> Dict[str, str]: - """构建请求头。 - - 说明: - - 如果配置了 auth_token,则透传到 j-auth-token,兼容 ovnrain/javbus-api 的鉴权模式; - - 固定 UA 便于服务端日志追踪与风控排查。 - """ + def _build_headers(self, referer: str = "") -> Dict[str, str]: + """构建请求头,模拟常见浏览器访问。""" headers = { - "User-Agent": "ABOT-FanhaoSearch/2.0", - "Accept": "application/json", + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" + ), + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", } - if self.api_token: - headers["j-auth-token"] = self.api_token + if referer: + headers["Referer"] = referer return headers - async def _fetch_movie_detail(self, movie_id: str) -> Optional[Dict[str, Any]]: - """请求影片详情,支持多路由回退。""" - if not self.api_prefix: - return None + def _build_proxy(self) -> Optional[str]: + """获取代理配置(未配置返回 None)。""" + return self.http_proxy if self.http_proxy else None + async def _http_get_text( + self, + url: str, + *, + referer: str = "", + params: Optional[Dict[str, Any]] = None, + ) -> str: + """执行 HTTP GET 并返回文本。""" timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds) - headers = self._build_request_headers() - safe_id = quote(movie_id, safe="") - last_error = "" + proxy = self._build_proxy() + async with aiohttp.ClientSession(timeout=timeout, headers=self._build_headers(referer)) as session: + async with session.get(url, params=params, proxy=proxy) as resp: + body = await resp.text(errors="ignore") + if resp.status < 200 or resp.status >= 300: + raise RuntimeError(f"请求失败 status={resp.status}, url={url}, body={body[:180]}") + return body - async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: - for path in self.movie_detail_paths: - url = f"{self.api_prefix}{path.format(id=safe_id)}" - try: - async with session.get(url) as resp: - # 非 2xx 先记录,继续尝试下一条路径。 - if resp.status < 200 or resp.status >= 300: - body_preview = (await resp.text())[:160] - self.LOG.warning( - f"[{self.name}] 详情接口返回异常: status={resp.status}, url={url}, body={body_preview}" - ) - continue - data = await resp.json(content_type=None) - # 同时兼容两种返回形态: - # 1) 直接是详情对象;2) 包在 data/result 字段内。 - if isinstance(data, dict): - if isinstance(data.get("data"), dict): - return data.get("data") - if isinstance(data.get("result"), dict): - return data.get("result") - if data.get("id") or data.get("title"): - return data - except Exception as e: - last_error = str(e) - self.LOG.warning(f"[{self.name}] 请求详情失败: url={url}, error={e}") - continue + async def _http_get_bytes(self, url: str, *, referer: str = "") -> bytes: + """执行 HTTP GET 并返回二进制,用于下载封面图。""" + timeout = aiohttp.ClientTimeout(total=self.request_timeout_seconds) + proxy = self._build_proxy() + async with aiohttp.ClientSession(timeout=timeout, headers=self._build_headers(referer)) as session: + async with session.get(url, proxy=proxy) as resp: + if resp.status < 200 or resp.status >= 300: + raise RuntimeError(f"图片下载失败 status={resp.status}, url={url}") + return await resp.read() - if last_error: - self.LOG.error(f"[{self.name}] 所有详情路由均失败: movie_id={movie_id}, last_error={last_error}") + @staticmethod + def _extract_plain_value_from_info_p(info_p) -> str: + """从详情页
节点中提取纯文本值。 + + 规则: + 1. 先去掉 header 标签文本; + 2. 再压缩多余空白; + 3. 保留正文语义,避免把“識別碼:”一起带出来。 + """ + if not info_p: + return "" + node = BeautifulSoup(str(info_p), "html.parser") + header = node.find(class_="header") + if header: + header.extract() + text = node.get_text(" ", strip=True) + return re.sub(r"\s+", " ", text).strip() + + @staticmethod + def _find_info_p_by_header(info_ps: List[Any], header_keywords: List[str]): + """根据 header 关键字定位详情信息行。""" + for p in info_ps: + header = p.find(class_="header") + if not header: + continue + header_text = header.get_text(strip=True) + if any(keyword in header_text for keyword in header_keywords): + return p return None - @staticmethod - def _extract_star_names(movie: Dict[str, Any]) -> str: - """提取女优名称,兼容数组/字符串两种结构。""" - stars = movie.get("stars") - if isinstance(stars, list): - names = [] - for star in stars: - if isinstance(star, dict) and star.get("name"): - names.append(str(star.get("name"))) - elif isinstance(star, str) and star.strip(): - names.append(star.strip()) - if names: - return ", ".join(names) - if isinstance(movie.get("actress"), str): - return str(movie.get("actress")).strip() - return "" + def _extract_movie_detail_from_html(self, html: str, movie_id: str) -> Optional[Dict[str, Any]]: + """解析详情页 HTML,提取核心字段。""" + soup = BeautifulSoup(html, "html.parser") - @staticmethod - def _pick_best_magnet(magnets: Any) -> str: - """从磁力列表中挑选一条优先磁力链接。 + # 若被站点重定向到登录页或拦截页,通常页面包含 login 关键词或缺失详情容器。 + page_text = soup.get_text(" ", strip=True).lower() + if "login" in page_text and "javbus" in page_text and not soup.select_one(".container .movie"): + return None - 选择策略(参考 koishi-plugin-javbus 并增强健壮性): - 1. 优先有字幕的磁力; - 2. 再按 numberSize / size 字段的“可解析数值”降序; - 3. 返回首个有效 link/hash。 - """ - if not isinstance(magnets, list): - return "" + movie_container = soup.select_one(".container .movie") + if not movie_container: + return None - candidates: List[Dict[str, Any]] = [m for m in magnets if isinstance(m, dict)] - if not candidates: - return "" + # 标题与封面图。 + title = (soup.select_one(".container h3") or soup.select_one("h3")) + title_text = title.get_text(strip=True) if title else "" + img_node = soup.select_one(".bigImage img") + img_url = str(img_node.get("src") or "").strip() if img_node else "" + if img_url and img_url.startswith("/"): + img_url = urljoin(self.javbus_base_url, img_url) - def parse_size_num(item: Dict[str, Any]) -> float: - raw = item.get("numberSize") or item.get("size") or 0 - # numberSize 可能是数字,也可能是字符串,这里统一兜底到 float。 - try: - return float(raw) - except Exception: - # 尝试从诸如 "2.3 GB" 文本中提取首个数字。 - matched = re.search(r"(\d+(?:\.\d+)?)", str(raw)) - return float(matched.group(1)) if matched else 0.0 + # 基本信息块:统一从 p.header 结构里定位。 + info_ps = movie_container.select(".info p") + date_p = self._find_info_p_by_header(info_ps, ["發行日期", "发行日期"]) + length_p = self._find_info_p_by_header(info_ps, ["長度", "长度"]) + publisher_p = self._find_info_p_by_header(info_ps, ["發行商", "发行商"]) + star_p = self._find_info_p_by_header(info_ps, ["演員", "演员"]) - with_subtitle = [m for m in candidates if bool(m.get("hasSubtitle"))] - pool = with_subtitle if with_subtitle else candidates - pool_sorted = sorted(pool, key=parse_size_num, reverse=True) + date_value = self._extract_plain_value_from_info_p(date_p) + length_value = self._extract_plain_value_from_info_p(length_p) + length_minutes = None + if length_value: + match = re.search(r"(\d+)", length_value) + if match: + length_minutes = int(match.group(1)) - for item in pool_sorted: - link = str(item.get("link") or "").strip() - if link: - return link - # 某些 API 只回 hash,不回完整 magnet。 - hash_value = str(item.get("hash") or "").strip() - if hash_value: - return f"magnet:?xt=urn:btih:{hash_value}" - return "" - - def _format_result_text(self, movie: Dict[str, Any], normalized_code: str) -> str: - """拼装文本消息内容。""" - movie_id = str(movie.get("id") or normalized_code or "未知番号").strip() - title = str(movie.get("title") or "未提供标题").strip() - date_value = str(movie.get("date") or movie.get("publishDate") or "").strip() publisher_name = "" - publisher = movie.get("publisher") - if isinstance(publisher, dict): - publisher_name = str(publisher.get("name") or "").strip() - elif isinstance(publisher, str): - publisher_name = publisher.strip() - star_text = self._extract_star_names(movie) + if publisher_p: + publisher_link = publisher_p.find("a") + if publisher_link: + publisher_name = publisher_link.get_text(strip=True) + if not publisher_name: + publisher_name = self._extract_plain_value_from_info_p(publisher_p) - lines = [f"✅ 查询成功:{movie_id}", f"标题:{title}"] + # 女优可能有多个链接,拼接输出更可读。 + stars: List[str] = [] + if star_p: + for star_link in star_p.find_all("a"): + star_name = star_link.get_text(strip=True) + if star_name: + stars.append(star_name) + + # 提取 gid / uc,后续用于 AJAX 磁力查询。 + gid_match = re.search(r"var\s+gid\s*=\s*(\d+)\s*;", html) + uc_match = re.search(r"var\s+uc\s*=\s*(\d+)\s*;", html) + gid = gid_match.group(1) if gid_match else "" + uc = uc_match.group(1) if uc_match else "" + + return { + "id": movie_id, + "title": title_text, + "img": img_url, + "date": date_value, + "video_length_minutes": length_minutes, + "publisher_name": publisher_name, + "stars": stars, + "gid": gid, + "uc": uc, + } + + @staticmethod + def _parse_size_to_bytes(size_text: str) -> float: + """把尺寸文本(如 6.57GB)转换为字节数,用于排序。""" + text = str(size_text or "").strip().upper() + if not text: + return 0.0 + match = re.search(r"(\d+(?:\.\d+)?)\s*(KB|MB|GB|TB)", text) + if not match: + return 0.0 + value = float(match.group(1)) + unit = match.group(2) + factor_map = { + "KB": 1024.0, + "MB": 1024.0 ** 2, + "GB": 1024.0 ** 3, + "TB": 1024.0 ** 4, + } + return value * factor_map.get(unit, 1.0) + + def _parse_magnets_from_html(self, html: str) -> List[Dict[str, Any]]: + """解析 AJAX 返回的磁力表格。""" + soup = BeautifulSoup(html, "html.parser") + magnets: List[Dict[str, Any]] = [] + rows = soup.select("tr") + for tr in rows: + first_td = tr.find("td") + if not first_td: + continue + first_a = first_td.find("a") + if not first_a: + continue + link = str(first_a.get("href") or "").strip() + if not link.startswith("magnet:?xt=urn:btih:"): + continue + + title = first_a.get_text(" ", strip=True) + tags_text = first_td.get_text(" ", strip=True) + is_hd = "高清" in tags_text + has_subtitle = "字幕" in tags_text + + tds = tr.find_all("td") + size_text = tds[1].get_text(" ", strip=True) if len(tds) > 1 else "" + share_date = tds[2].get_text(" ", strip=True) if len(tds) > 2 else "" + + magnets.append( + { + "link": link, + "title": title, + "size": size_text, + "number_size": self._parse_size_to_bytes(size_text), + "share_date": share_date, + "is_hd": is_hd, + "has_subtitle": has_subtitle, + } + ) + + # 默认按大小降序,接近 javbus-api 的默认行为。 + magnets.sort(key=lambda item: float(item.get("number_size") or 0.0), reverse=True) + return magnets + + async def _fetch_movie_magnets(self, movie_id: str, gid: str, uc: str) -> List[Dict[str, Any]]: + """通过 JavBus AJAX 接口获取磁力列表。""" + if not gid or not uc: + return [] + ajax_url = f"{self.javbus_base_url}/ajax/uncledatoolsbyajax.php" + referer = f"{self.javbus_base_url}/{movie_id}" + html = await self._http_get_text( + ajax_url, + referer=referer, + params={ + "lang": "zh", + "gid": gid, + "uc": uc, + }, + ) + return self._parse_magnets_from_html(html) + + def _pick_best_magnet(self, magnets: List[Dict[str, Any]]) -> str: + """按策略选出一条最优磁力。""" + if not magnets: + return "" + pool = magnets + if self.prefer_subtitle_magnet: + subtitle_pool = [item for item in magnets if bool(item.get("has_subtitle"))] + if subtitle_pool: + pool = subtitle_pool + # 尺寸越大通常清晰度越高,这里取排序后的第一条。 + best = pool[0] + return str(best.get("link") or "").strip() + + def _format_result_text(self, detail: Dict[str, Any], best_magnet: str) -> str: + """格式化返回给用户的文本。""" + lines = [ + f"✅ 查询成功:{detail.get('id') or '未知番号'}", + f"标题:{detail.get('title') or '未提供标题'}", + ] + + date_value = str(detail.get("date") or "").strip() if date_value: lines.append(f"发行日期:{date_value}") - if star_text: - lines.append(f"女优:{star_text}") + + length_minutes = detail.get("video_length_minutes") + if isinstance(length_minutes, int) and length_minutes > 0: + lines.append(f"片长:{length_minutes} 分钟") + + publisher_name = str(detail.get("publisher_name") or "").strip() if publisher_name: lines.append(f"发行商:{publisher_name}") - if self.allow_download_link: - magnet = self._pick_best_magnet(movie.get("magnets")) - if magnet: - lines.append(f"磁力:{magnet}") + stars = detail.get("stars") or [] + if isinstance(stars, list) and stars: + lines.append(f"女优:{', '.join([str(s) for s in stars if str(s).strip()])}") + + if best_magnet: + lines.append(f"磁力:{best_magnet}") return "\n".join(lines) @plugin_stats_decorator(plugin_name="番号查询") @plugin_points_cost(10, "番号查询消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """执行番号查询主流程。""" + """处理番号查询请求。""" content = str(message.get("content", "") or "").strip() self.LOG.debug(f"[{self.name}] 插件执行: content={content}") @@ -295,7 +407,7 @@ class FanhaoSearchPlugin(MessagePluginInterface): bot: WechatAPIClient = message.get("bot") target = room_id if room_id else sender - # 群开关关闭时不处理,避免越权。 + # 群开关关闭时不处理。 if room_id and gbm and gbm.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" @@ -304,42 +416,55 @@ class FanhaoSearchPlugin(MessagePluginInterface): await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender) return False, "命令格式错误" - if not self.api_prefix: - await bot.send_text_message( - target, - "❌番号插件未配置 api_prefix,请在 plugins/fanhao_search/config.toml 中补充后重试。", - sender, - ) - return False, "api_prefix未配置" - raw_code = parts[1].strip() normalized_code = self._normalize_code(raw_code) if not normalized_code: await bot.send_text_message(target, f"❌命令格式错误!\n{self.command_format}", sender) return False, "命令格式错误" + detail_url = f"{self.javbus_base_url}/{normalized_code}" self.LOG.info( - f"[{self.name}] 收到查询: raw={raw_code}, normalized={normalized_code}, api_prefix={self.api_prefix}" + f"[{self.name}] 收到查询: raw={raw_code}, normalized={normalized_code}, detail_url={detail_url}" ) try: - movie = await self._fetch_movie_detail(normalized_code) - if not movie: - await bot.send_text_message(target, f"未找到番号:{normalized_code}", sender) - return False, "未找到" + # 1) 抓详情页并解析基础信息。 + detail_html = await self._http_get_text(detail_url, referer=self.javbus_base_url) + detail = self._extract_movie_detail_from_html(detail_html, normalized_code) + if not detail: + await bot.send_text_message( + target, + "未找到番号,或当前节点被 JavBus 重定向到登录页(常见于部分地区 IP)。", + sender, + ) + return False, "未找到或被登录拦截" - # 先发文本,确保最差场景也能看到核心信息。 - result_text = self._format_result_text(movie, normalized_code) - await bot.send_text_message(target, result_text, sender) + # 2) 可选抓磁力(需要 gid/uc)。 + best_magnet = "" + if self.allow_download_link: + try: + magnets = await self._fetch_movie_magnets( + normalized_code, + str(detail.get("gid") or ""), + str(detail.get("uc") or ""), + ) + best_magnet = self._pick_best_magnet(magnets) + except Exception as magnet_error: + self.LOG.warning(f"[{self.name}] 磁力抓取失败: code={normalized_code}, error={magnet_error}") - # 按配置决定是否额外发送封面预览(可能含敏感内容,默认关闭)。 + # 3) 先发文本,保证主链路可见。 + text = self._format_result_text(detail, best_magnet) + await bot.send_text_message(target, text, sender) + + # 4) 可选发封面:必须先下载为 bytes,再调用 send_image_message。 if self.allow_preview_cover: - cover_url = str(movie.get("img") or movie.get("cover") or "").strip() + cover_url = str(detail.get("img") or "").strip() if cover_url: try: - await bot.send_image_message(target, cover_url) - except Exception as image_error: - self.LOG.warning(f"[{self.name}] 封面发送失败: code={normalized_code}, error={image_error}") + cover_bytes = await self._http_get_bytes(cover_url, referer=detail_url) + await bot.send_image_message(target, cover_bytes) + except Exception as cover_error: + self.LOG.warning(f"[{self.name}] 封面发送失败: code={normalized_code}, error={cover_error}") return True, "查询成功" except Exception as e: @@ -350,3 +475,4 @@ class FanhaoSearchPlugin(MessagePluginInterface): def get_plugin(): """返回插件实例。""" return FanhaoSearchPlugin() +