短剧搜索功能

This commit is contained in:
liuwei
2025-09-11 11:49:11 +08:00
parent 1ad8015a8d
commit 3d93661104
3 changed files with 236 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入DuanjuSearchPlugin类
from .main import DuanjuSearchPlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return DuanjuSearchPlugin()

View File

@@ -0,0 +1,8 @@
[DuanjuSearch]
enable = true
command = ["短剧", "短剧搜索", "搜索短剧", "短剧查询"]
command-format = """
🎬短剧搜索指令:
短剧 剧名
"""
api_url = "https://api.uuuka.com/api/search"

View File

@@ -0,0 +1,221 @@
import urllib.parse
from loguru import logger
import requests
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
class DuanjuSearchPlugin(MessagePluginInterface):
"""短剧搜索插件"""
# 功能权限常量
FEATURE_KEY = "DUANJU_SEARCH"
FEATURE_DESCRIPTION = "🎬 短剧搜索功能 [短剧, 短剧搜索, 搜索短剧, 短剧查询]"
@property
def name(self) -> str:
return "短剧搜索"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供短剧搜索功能,支持搜索短剧名称并返回剧名和链接信息"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self._commands = self._config.get("DuanjuSearch", {}).get("command", ["短剧", "短剧搜索"])
self.command_format = self._config.get("DuanjuSearch", {}).get("command-format", "短剧 剧名")
self.enable = self._config.get("DuanjuSearch", {}).get("enable", True)
self.api_url = self._config.get("DuanjuSearch", {}).get("api_url", "https://api.uuuka.com/api/search")
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="短剧搜索")
@plugin_points_cost(1, "短剧搜索消耗积分", FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查命令格式
if len(content.split(" ")) == 1:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
, sender)
return False, "命令格式错误"
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# 提取短剧名称
user_drama_name = content[len(command):].strip()
try:
# 搜索短剧
drama_results = self._search_drama(user_drama_name)
if not drama_results:
await bot.send_text_message((roomid if roomid else sender), f"❌未找到短剧:{user_drama_name}", sender)
return False, "未找到短剧"
# 发送搜索结果
await self._send_search_results(bot, drama_results, user_drama_name, roomid or sender)
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理短剧搜索请求出错: {e}")
return False, f"处理出错: {e}"
def _search_drama(self, drama_name: str) -> List[Dict[str, Any]]:
"""搜索短剧信息"""
try:
# URL编码搜索关键词
encoded_keyword = urllib.parse.quote(drama_name)
# 构建API请求URL
api_url = f"{self.api_url}?keyword={encoded_keyword}&page=1&limit=3"
# 设置请求头
headers = {
'accept': 'application/json'
}
self.LOG.info(f"请求短剧搜索API: {api_url}")
response = requests.get(api_url, headers=headers, timeout=10)
if response.status_code != 200:
self.LOG.error(f"API 请求失败,状态码: {response.status_code}")
return []
json_data = response.json()
# 检查API响应是否成功
if not json_data.get("success", False):
self.LOG.error(f"API 返回错误: {json_data.get('message', '未知错误')}")
return []
# 提取短剧数据
data = json_data.get("data", {})
items = data.get("items", [])
# 限制返回前3个结果
results = []
for item in items[:3]:
result = {
"title": item.get("title", ""),
"source_link": item.get("source_link", ""),
"update_time": item.get("update_time", ""),
"view_count": item.get("view_count", 0)
}
results.append(result)
self.LOG.info(f"找到 {len(results)} 个短剧结果")
return results
except Exception as e:
self.LOG.error(f"搜索短剧出错: {e}")
return []
async def _send_search_results(self, bot: WechatAPIClient, drama_results: List[Dict[str, Any]],
search_keyword: str, receiver: str) -> bool:
"""发送搜索结果"""
try:
if not drama_results:
await bot.send_text_message(receiver, f"❌未找到短剧:{search_keyword}")
return False
# 构建结果消息
message_parts = [f"🎬 短剧搜索结果:{search_keyword}\n"]
for i, drama in enumerate(drama_results, 1):
title = drama.get("title", "")
source_link = drama.get("source_link", "")
update_time = drama.get("update_time", "")
view_count = drama.get("view_count", 0)
# 格式化标题,去除多余信息
clean_title = title.replace("短剧《", "").replace("》免费观看", "").strip()
message_parts.append(f"{i}. {clean_title}")
if source_link:
message_parts.append(f" 🔗 链接:{source_link}")
if update_time:
message_parts.append(f" 📅 更新时间:{update_time}")
if view_count > 0:
message_parts.append(f" 👀 观看次数:{view_count}")
message_parts.append("") # 空行分隔
# 发送消息
result_message = "\n".join(message_parts)
self.LOG.info(f"发送短剧搜索结果:{result_message}")
await bot.send_text_message(receiver, result_message)
return True
except Exception as e:
self.LOG.error(f"发送短剧搜索结果出错: {e}")
return False