""" 短剧搜索插件 用户发送 /搜索短剧 xxx 来搜索短剧并获取视频链接 """ import tomllib import asyncio import aiohttp from pathlib import Path from loguru import logger from typing import List, Dict, Optional from utils.plugin_base import PluginBase from utils.decorators import on_text_message class PlayletSearch(PluginBase): """短剧搜索插件""" description = "搜索短剧并获取视频链接" author = "Assistant" version = "1.0.0" def __init__(self): super().__init__() self.config = None async def async_init(self): """异步初始化""" config_path = Path(__file__).parent / "config.toml" with open(config_path, "rb") as f: self.config = tomllib.load(f) logger.success("短剧搜索插件已加载") @on_text_message(priority=70) async def handle_message(self, bot, message: dict): """处理文本消息""" content = message.get("Content", "").strip() from_wxid = message.get("FromWxid", "") is_group = message.get("IsGroup", False) # 精确匹配指令 if not content.startswith("/搜索短剧 "): return True # 检查是否启用 if not self.config["behavior"]["enabled"]: return True # 检查群聊过滤 if is_group: enabled_groups = self.config["behavior"]["enabled_groups"] disabled_groups = self.config["behavior"]["disabled_groups"] if from_wxid in disabled_groups: return True if enabled_groups and from_wxid not in enabled_groups: return True # 提取短剧名称 keyword = content[6:].strip() # 去掉 "/搜索短剧 " if not keyword: await bot.send_text(from_wxid, "❌ 请输入短剧名称\n格式:/搜索短剧 短剧名称") return False logger.info(f"搜索短剧: {keyword}") await bot.send_text(from_wxid, f"🔍 正在搜索短剧:{keyword}\n请稍候...") try: # 第一步:搜索短剧 search_result = await self._search_playlet(keyword) if not search_result: await bot.send_text(from_wxid, f"❌ 未找到短剧:{keyword}") return False book_id, cover_url = search_result # 第二步:获取剧集列表 episode_result = await self._get_episode_list(book_id, keyword) if episode_result is None: await bot.send_text(from_wxid, "❌ 获取剧集列表失败") return False video_list, detail_cover = episode_result if not video_list: await bot.send_text(from_wxid, "❌ 获取剧集列表失败") return False # 优先使用搜索结果的cover,因为detail_cover可能是错误的 final_cover_url = cover_url if cover_url else detail_cover # 限制集数 max_episodes = self.config["behavior"]["max_episodes"] if len(video_list) > max_episodes: video_list = video_list[:max_episodes] # 第三步:并发获取所有视频URL video_urls = await self._get_video_urls(video_list) # 第四步:构造并发送聊天记录 await self._send_chat_records(bot, from_wxid, keyword, video_urls, final_cover_url) logger.success(f"短剧搜索完成: {keyword}, {len(video_urls)} 集") except Exception as e: logger.error(f"短剧搜索失败: {e}") await bot.send_text(from_wxid, f"❌ 搜索失败: {str(e)}") return False async def _search_playlet(self, keyword: str) -> Optional[tuple]: """搜索短剧,返回 (book_id, cover_url)""" url = self.config["api"]["base_url"] params = { "key": self.config["api"]["api_key"], "keyword": keyword } timeout = aiohttp.ClientTimeout(total=self.config["api"]["timeout"]) max_retries = self.config["behavior"].get("max_retries", 15) for attempt in range(max_retries): try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(url, params=params) as resp: if resp.status != 200: logger.warning(f"搜索短剧失败 (尝试{attempt+1}/{max_retries}): HTTP {resp.status}") if attempt < max_retries - 1: await asyncio.sleep(1) continue return None result = await resp.json() if result.get("code") != 0: logger.warning(f"搜索短剧失败 (尝试{attempt+1}/{max_retries}): {result.get('msg')}") if attempt < max_retries - 1: await asyncio.sleep(1) continue return None data = result.get("data", []) if not data: logger.warning(f"搜索短剧无结果 (尝试{attempt+1}/{max_retries})") if attempt < max_retries - 1: await asyncio.sleep(1) continue return None # 返回第一个结果的 book_id 和 cover first_result = data[0] book_id = first_result.get("book_id") cover_url = first_result.get("cover", "") # URL解码 import urllib.parse if cover_url: cover_url = urllib.parse.unquote(cover_url) logger.info(f"找到短剧: {first_result.get('title')}, book_id={book_id}, cover_url={cover_url}") return (book_id, cover_url) except Exception as e: logger.error(f"搜索短剧异常 (尝试{attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: await asyncio.sleep(1) else: return None return None async def _get_episode_list(self, book_id: str, keyword: str) -> tuple: """获取剧集列表(带重试),返回 (video_list, detail_cover)""" url = self.config["api"]["base_url"] params = { "key": self.config["api"]["api_key"], "book_id": book_id, "keyword": keyword } timeout = aiohttp.ClientTimeout(total=self.config["api"]["timeout"]) max_retries = self.config["behavior"].get("max_retries", 15) for attempt in range(max_retries): try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(url, params=params) as resp: if resp.status != 200: logger.error(f"获取剧集列表失败 (尝试{attempt+1}/{max_retries}): HTTP {resp.status}") if attempt < max_retries - 1: await asyncio.sleep(1) continue return ([], "") result = await resp.json() if result.get("code") != 0: logger.error(f"获取剧集列表失败: code={result.get('code')}, msg={result.get('msg')}") return ([], "") data = result.get("data", {}) video_list = data.get("video_list") or [] detail = data.get("detail", {}) detail_cover = detail.get("cover", "") # URL解码 import urllib.parse if detail_cover: detail_cover = urllib.parse.unquote(detail_cover) if not video_list: logger.warning(f"剧集列表为空 (尝试{attempt+1}/{max_retries})") if attempt < max_retries - 1: await asyncio.sleep(1) continue return ([], "") logger.info(f"获取到 {len(video_list)} 集, detail_cover={detail_cover}") return (video_list, detail_cover) except Exception as e: logger.error(f"获取剧集列表异常 (尝试{attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: await asyncio.sleep(1) else: return ([], "") return ([], "") async def _get_video_url(self, session: aiohttp.ClientSession, video_id: str) -> Optional[str]: """获取单个视频URL(带重试)""" url = self.config["api"]["base_url"] params = { "key": self.config["api"]["api_key"], "video_id": video_id } max_retries = self.config["behavior"].get("max_retries", 15) for attempt in range(max_retries): try: async with session.post(url, params=params) as resp: if resp.status != 200: logger.warning(f"获取视频URL失败 (video_id={video_id}, 尝试{attempt+1}/{max_retries}): HTTP {resp.status}") if attempt < max_retries - 1: await asyncio.sleep(1) continue return None result = await resp.json() if result.get("code") != 0: logger.warning(f"获取视频URL失败 (video_id={video_id}, 尝试{attempt+1}/{max_retries}): code={result.get('code')}, msg={result.get('msg')}") if attempt < max_retries - 1: await asyncio.sleep(1) continue return None data = result.get("data", {}) video = data.get("video", {}) video_url = video.get("url") if not video_url: logger.warning(f"获取视频URL失败 (video_id={video_id}, 尝试{attempt+1}/{max_retries}): 返回数据中没有url字段") if attempt < max_retries - 1: await asyncio.sleep(1) continue return None return video_url except Exception as e: logger.error(f"获取视频URL异常 (video_id={video_id}, 尝试{attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: await asyncio.sleep(1) else: return None return None async def _get_video_urls(self, video_list: List[Dict]) -> List[Dict]: """并发获取所有视频URL""" timeout = aiohttp.ClientTimeout(total=self.config["api"]["timeout"]) max_concurrent = self.config["behavior"].get("max_concurrent_videos", 10) async with aiohttp.ClientSession(timeout=timeout) as session: semaphore = asyncio.Semaphore(max_concurrent) async def get_single_video(video): async with semaphore: video_id = video.get("video_id") title = video.get("title") url = await self._get_video_url(session, video_id) if url: return {"title": title, "url": url} else: logger.warning(f"获取视频URL失败: {title} (video_id={video_id})") return None # 并发执行所有任务 tasks = [get_single_video(video) for video in video_list] results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤掉失败的结果 valid_results = [] for result in results: if isinstance(result, dict) and result: valid_results.append(result) elif isinstance(result, Exception): logger.error(f"获取视频URL异常: {result}") return valid_results async def _send_chat_records(self, bot, from_wxid: str, playlet_name: str, video_urls: List[Dict], cover_url: str = ""): if not video_urls: await bot.send_text(from_wxid, "❌ 未获取到任何视频链接") return import uuid import time import hashlib import xml.etree.ElementTree as ET is_group = from_wxid.endswith("@chatroom") recordinfo = ET.Element("recordinfo") info_el = ET.SubElement(recordinfo, "info") info_el.text = f"{playlet_name} 链接合集" is_group_el = ET.SubElement(recordinfo, "isChatRoom") is_group_el.text = "1" if is_group else "0" datalist = ET.SubElement(recordinfo, "datalist") datalist.set("count", str(len(video_urls))) desc_el = ET.SubElement(recordinfo, "desc") desc_el.text = f"{playlet_name} 链接合集" fromscene_el = ET.SubElement(recordinfo, "fromscene") fromscene_el.text = "3" for item in video_urls: di = ET.SubElement(datalist, "dataitem") di.set("datatype", "5") di.set("dataid", uuid.uuid4().hex) src_local_id = str((int(time.time() * 1000) % 90000) + 10000) new_msg_id = str(int(time.time() * 1000)) create_time = str(int(time.time())) ET.SubElement(di, "srcMsgLocalid").text = src_local_id ET.SubElement(di, "sourcetime").text = time.strftime("%Y-%m-%d %H:%M", time.localtime(int(create_time))) ET.SubElement(di, "fromnewmsgid").text = new_msg_id ET.SubElement(di, "srcMsgCreateTime").text = create_time ET.SubElement(di, "sourcename").text = playlet_name ET.SubElement(di, "sourceheadurl").text = cover_url or "" ET.SubElement(di, "datatitle").text = item.get("title") or "" ET.SubElement(di, "datadesc").text = "点击观看" ET.SubElement(di, "datafmt").text = "url" ET.SubElement(di, "link").text = item.get("url") or "" ET.SubElement(di, "ischatroom").text = "1" if is_group else "0" weburlitem = ET.SubElement(di, "weburlitem") ET.SubElement(weburlitem, "thumburl").text = cover_url or "" ET.SubElement(di, "thumbwidth").text = "200" ET.SubElement(di, "thumbheight").text = "200" ET.SubElement(weburlitem, "title").text = item.get("title") or "" ET.SubElement(weburlitem, "link").text = item.get("url") or "" ET.SubElement(weburlitem, "desc").text = "点击观看" appmsgshareitem = ET.SubElement(weburlitem, "appmsgshareitem") ET.SubElement(appmsgshareitem, "itemshowtype").text = "-1" dataitemsource = ET.SubElement(di, "dataitemsource") ET.SubElement(dataitemsource, "hashusername").text = hashlib.sha256(from_wxid.encode("utf-8")).hexdigest() record_xml = ET.tostring(recordinfo, encoding="unicode") appmsg_parts = [ "", f"{playlet_name} 链接合集", f"{playlet_name}", "19", "https://support.weixin.qq.com/cgi-bin/mmsupport-bin/readtemplate?t=page/favorite_record__w_unsupport", "", f"", "0", "" ] appmsg_xml = "".join(appmsg_parts) await bot._send_data_async(11214, {"to_wxid": from_wxid, "content": appmsg_xml}) logger.success(f"已发送聊天记录,包含 {len(video_urls)} 集视频链接") def get_llm_tools(self) -> List[dict]: """返回LLM工具定义,供AIChat插件调用""" return [ { "type": "function", "function": { "name": "search_playlet", "description": "搜索短剧并获取视频链接", "parameters": { "type": "object", "properties": { "keyword": { "type": "string", "description": "短剧名称或关键词" } }, "required": ["keyword"] } } } ] async def execute_llm_tool(self, tool_name: str, arguments: dict, bot, from_wxid: str) -> dict: """执行LLM工具调用,供AIChat插件调用""" try: if not self.config["behavior"]["enabled"]: return {"success": False, "message": "短剧搜索插件未启用"} # 检查群聊过滤 is_group = from_wxid.endswith("@chatroom") if is_group: enabled_groups = self.config["behavior"]["enabled_groups"] disabled_groups = self.config["behavior"]["disabled_groups"] if from_wxid in disabled_groups: return {"success": False, "message": "此群聊未启用短剧搜索功能"} if enabled_groups and from_wxid not in enabled_groups: return {"success": False, "message": "此群聊未启用短剧搜索功能"} if tool_name == "search_playlet": keyword = arguments.get("keyword") if not keyword: return {"success": False, "message": "缺少短剧名称参数"} logger.info(f"LLM工具调用搜索短剧: {keyword}") await bot.send_text(from_wxid, f"🔍 正在搜索短剧:{keyword}\n请稍候...") # 第一步:搜索短剧 search_result = await self._search_playlet(keyword) if not search_result: await bot.send_text(from_wxid, f"❌ 未找到短剧:{keyword}") return {"success": False, "message": f"未找到短剧:{keyword}"} book_id, cover_url = search_result # 第二步:获取剧集列表 episode_result = await self._get_episode_list(book_id, keyword) if episode_result is None: await bot.send_text(from_wxid, "❌ 获取剧集列表失败") return {"success": False, "message": "获取剧集列表失败"} video_list, detail_cover = episode_result if not video_list: await bot.send_text(from_wxid, "❌ 获取剧集列表失败") return {"success": False, "message": "获取剧集列表失败"} # 优先使用搜索结果的cover final_cover_url = cover_url if cover_url else detail_cover # 限制集数 max_episodes = self.config["behavior"]["max_episodes"] if len(video_list) > max_episodes: video_list = video_list[:max_episodes] # 第三步:并发获取所有视频URL video_urls = await self._get_video_urls(video_list) # 第四步:构造并发送聊天记录 await self._send_chat_records(bot, from_wxid, keyword, video_urls, final_cover_url) logger.success(f"短剧搜索完成: {keyword}, {len(video_urls)} 集") return {"success": True, "message": f"短剧搜索完成:{keyword},共{len(video_urls)}集"} else: return None # 不是本插件的工具,返回None让其他插件处理 except Exception as e: logger.error(f"LLM工具执行失败: {e}") await bot.send_text(from_wxid, f"❌ 搜索失败: {str(e)}") return {"success": False, "message": f"执行失败: {str(e)}"}