Files
abot/plugins/duanju_search/main.py
2025-09-11 11:49:11 +08:00

222 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import urllib.parse
from loguru import logger
import requests
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
class DuanjuSearchPlugin(MessagePluginInterface):
"""短剧搜索插件"""
# 功能权限常量
FEATURE_KEY = "DUANJU_SEARCH"
FEATURE_DESCRIPTION = "🎬 短剧搜索功能 [短剧, 短剧搜索, 搜索短剧, 短剧查询]"
@property
def name(self) -> str:
return "短剧搜索"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供短剧搜索功能,支持搜索短剧名称并返回剧名和链接信息"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self._commands = self._config.get("DuanjuSearch", {}).get("command", ["短剧", "短剧搜索"])
self.command_format = self._config.get("DuanjuSearch", {}).get("command-format", "短剧 剧名")
self.enable = self._config.get("DuanjuSearch", {}).get("enable", True)
self.api_url = self._config.get("DuanjuSearch", {}).get("api_url", "https://api.uuuka.com/api/search")
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="短剧搜索")
@plugin_points_cost(1, "短剧搜索消耗积分", FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查命令格式
if len(content.split(" ")) == 1:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
, sender)
return False, "命令格式错误"
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# 提取短剧名称
user_drama_name = content[len(command):].strip()
try:
# 搜索短剧
drama_results = self._search_drama(user_drama_name)
if not drama_results:
await bot.send_text_message((roomid if roomid else sender), f"❌未找到短剧:{user_drama_name}", sender)
return False, "未找到短剧"
# 发送搜索结果
await self._send_search_results(bot, drama_results, user_drama_name, roomid or sender)
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理短剧搜索请求出错: {e}")
return False, f"处理出错: {e}"
def _search_drama(self, drama_name: str) -> List[Dict[str, Any]]:
"""搜索短剧信息"""
try:
# URL编码搜索关键词
encoded_keyword = urllib.parse.quote(drama_name)
# 构建API请求URL
api_url = f"{self.api_url}?keyword={encoded_keyword}&page=1&limit=3"
# 设置请求头
headers = {
'accept': 'application/json'
}
self.LOG.info(f"请求短剧搜索API: {api_url}")
response = requests.get(api_url, headers=headers, timeout=10)
if response.status_code != 200:
self.LOG.error(f"API 请求失败,状态码: {response.status_code}")
return []
json_data = response.json()
# 检查API响应是否成功
if not json_data.get("success", False):
self.LOG.error(f"API 返回错误: {json_data.get('message', '未知错误')}")
return []
# 提取短剧数据
data = json_data.get("data", {})
items = data.get("items", [])
# 限制返回前3个结果
results = []
for item in items[:3]:
result = {
"title": item.get("title", ""),
"source_link": item.get("source_link", ""),
"update_time": item.get("update_time", ""),
"view_count": item.get("view_count", 0)
}
results.append(result)
self.LOG.info(f"找到 {len(results)} 个短剧结果")
return results
except Exception as e:
self.LOG.error(f"搜索短剧出错: {e}")
return []
async def _send_search_results(self, bot: WechatAPIClient, drama_results: List[Dict[str, Any]],
search_keyword: str, receiver: str) -> bool:
"""发送搜索结果"""
try:
if not drama_results:
await bot.send_text_message(receiver, f"❌未找到短剧:{search_keyword}")
return False
# 构建结果消息
message_parts = [f"🎬 短剧搜索结果:{search_keyword}\n"]
for i, drama in enumerate(drama_results, 1):
title = drama.get("title", "")
source_link = drama.get("source_link", "")
update_time = drama.get("update_time", "")
view_count = drama.get("view_count", 0)
# 格式化标题,去除多余信息
clean_title = title.replace("短剧《", "").replace("》免费观看", "").strip()
message_parts.append(f"{i}. {clean_title}")
if source_link:
message_parts.append(f" 🔗 链接:{source_link}")
if update_time:
message_parts.append(f" 📅 更新时间:{update_time}")
if view_count > 0:
message_parts.append(f" 👀 观看次数:{view_count}")
message_parts.append("") # 空行分隔
# 发送消息
result_message = "\n".join(message_parts)
self.LOG.info(f"发送短剧搜索结果:{result_message}")
await bot.send_text_message(receiver, result_message)
return True
except Exception as e:
self.LOG.error(f"发送短剧搜索结果出错: {e}")
return False