import urllib.parse from loguru import logger import requests from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost from wechat_ipad import WechatAPIClient class DuanjuSearchPlugin(MessagePluginInterface): """短剧搜索插件""" # 功能权限常量 FEATURE_KEY = "DUANJU_SEARCH" FEATURE_DESCRIPTION = "🎬 短剧搜索功能 [短剧, 短剧搜索, 搜索短剧, 短剧查询]" @property def name(self) -> str: return "短剧搜索" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供短剧搜索功能,支持搜索短剧名称并返回剧名和链接信息" @property def author(self) -> str: return "liu.wei" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self._commands = self._config.get("DuanjuSearch", {}).get("command", ["短剧", "短剧搜索"]) self.command_format = self._config.get("DuanjuSearch", {}).get("command-format", "短剧 剧名") self.enable = self._config.get("DuanjuSearch", {}).get("enable", True) self.api_url = self._config.get("DuanjuSearch", {}).get("api_url", "https://api.uuuka.com/api/search") self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="短剧搜索") @plugin_points_cost(1, "短剧搜索消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") command = content.split(" ")[0] sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查命令格式 if len(content.split(" ")) == 1: await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}" , sender) return False, "命令格式错误" # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" # 提取短剧名称 user_drama_name = content[len(command):].strip() try: # 搜索短剧 drama_results = self._search_drama(user_drama_name) if not drama_results: await bot.send_text_message((roomid if roomid else sender), f"❌未找到短剧:{user_drama_name}", sender) return False, "未找到短剧" # 发送搜索结果 await self._send_search_results(bot, drama_results, user_drama_name, roomid or sender) return True, "发送成功" except Exception as e: self.LOG.error(f"处理短剧搜索请求出错: {e}") return False, f"处理出错: {e}" def _search_drama(self, drama_name: str) -> List[Dict[str, Any]]: """搜索短剧信息""" try: # URL编码搜索关键词 encoded_keyword = urllib.parse.quote(drama_name) # 构建API请求URL api_url = f"{self.api_url}?keyword={encoded_keyword}&page=1&limit=3" # 设置请求头 headers = { 'accept': 'application/json' } self.LOG.info(f"请求短剧搜索API: {api_url}") response = requests.get(api_url, headers=headers, timeout=10) if response.status_code != 200: self.LOG.error(f"API 请求失败,状态码: {response.status_code}") return [] json_data = response.json() # 检查API响应是否成功 if not json_data.get("success", False): self.LOG.error(f"API 返回错误: {json_data.get('message', '未知错误')}") return [] # 提取短剧数据 data = json_data.get("data", {}) items = data.get("items", []) # 限制返回前3个结果 results = [] for item in items[:3]: result = { "title": item.get("title", ""), "source_link": item.get("source_link", ""), "update_time": item.get("update_time", ""), "view_count": item.get("view_count", 0) } results.append(result) self.LOG.info(f"找到 {len(results)} 个短剧结果") return results except Exception as e: self.LOG.error(f"搜索短剧出错: {e}") return [] async def _send_search_results(self, bot: WechatAPIClient, drama_results: List[Dict[str, Any]], search_keyword: str, receiver: str) -> bool: """发送搜索结果""" try: if not drama_results: await bot.send_text_message(receiver, f"❌未找到短剧:{search_keyword}") return False # 构建结果消息 message_parts = [f"🎬 短剧搜索结果:{search_keyword}\n"] for i, drama in enumerate(drama_results, 1): title = drama.get("title", "") source_link = drama.get("source_link", "") update_time = drama.get("update_time", "") view_count = drama.get("view_count", 0) # 格式化标题,去除多余信息 clean_title = title.replace("短剧《", "").replace("》免费观看", "").strip() message_parts.append(f"{i}. {clean_title}") if source_link: message_parts.append(f" 🔗 链接:{source_link}") if update_time: message_parts.append(f" 📅 更新时间:{update_time}") if view_count > 0: message_parts.append(f" 👀 观看次数:{view_count}") message_parts.append("") # 空行分隔 # 发送消息 result_message = "\n".join(message_parts) self.LOG.info(f"发送短剧搜索结果:{result_message}") await bot.send_text_message(receiver, result_message) return True except Exception as e: self.LOG.error(f"发送短剧搜索结果出错: {e}") return False