Files
abot/plugins/fanhao_search/main.py
2025-09-10 16:32:18 +08:00

217 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
class FanhaoSearchPlugin(MessagePluginInterface):
"""番号查询插件"""
FEATURE_KEY = "FANHAO"
FEATURE_DESCRIPTION = "🔎 番号查询功能 [番号]"
@property
def name(self) -> str:
return "番号查询"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供基于MongoDB的番号搜索功能支持两个集合查询"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.mongo_client = None
self.mongo_db = None
def initialize(self, context: Dict[str, Any]) -> bool:
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
self.event_system = context.get("event_system")
cfg = self._config.get("FanhaoSearch", {})
self._commands = cfg.get("command", ["番号"]) # 例:"番号 FNS-109"
self.command_format = cfg.get("command-format", "番号 番号编号 例如:番号 FNS-109")
self.enable = cfg.get("enable", True)
self.mongo_uri = cfg.get(
"mongo_uri",
"mongodb+srv://cluster0.8mosa.mongodb.net/Cluster0?retryWrites=true&w=majority",
)
self.mongo_db_name = cfg.get("db", "sehuatang")
self.collections = cfg.get(
"collections", ["asia_codeless_originate", "asia_mosaic_originate"]
)
self.search_fields = cfg.get("search_fields", ["number", "code", "番号"]) # 可能的字段名
# 延迟连接,在首次查询时连接,避免初始化阻塞
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
try:
if self.mongo_client:
self.mongo_client.close()
except Exception:
pass
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
def _ensure_mongo(self):
if self.mongo_client:
return
from pymongo import MongoClient
# 采用默认匿名直连如需凭证可在URI中配置
self.mongo_client = MongoClient(self.mongo_uri)
self.mongo_db = self.mongo_client.get_database(self.mongo_db_name)
def _normalize_code(self, text: str) -> str:
return (text or "").strip().upper()
def _query_collections(self, code_upper: str) -> Optional[Dict[str, Any]]:
self._ensure_mongo()
# 构造或条件:任一字段等于 code_upper
or_conditions = [{field: code_upper} for field in self.search_fields]
query = {"$or": or_conditions}
for coll_name in self.collections:
try:
coll = self.mongo_db.get_collection(coll_name)
doc = coll.find_one(query)
if doc:
doc["_collection"] = coll_name
return doc
except Exception as e:
self.LOG.error(f"查询集合 {coll_name} 出错: {e}")
continue
return None
def _format_result(self, doc: Dict[str, Any]) -> str:
def pick(d: Dict[str, Any], keys: List[str]) -> str:
for k in keys:
v = d.get(k)
if v:
return str(v)
return ""
code = pick(doc, self.search_fields)
title = pick(doc, ["title", "name", "标题"]) or "未提供"
actress = pick(doc, ["actress", "actors", "performer", "女优", "演员"]) # 可为空
date_val = pick(doc, ["date", "publish_date", "发行日"]) # 例如2025-09-10
post_time = pick(doc, ["post_time"]) # 例如2025-09-10 10:42:04
magnet = pick(doc, ["magnet"]) # 磁力
magnet_115 = pick(doc, ["magnet_115"]) # 115专用磁力
lines = [
f"✅ 查询成功:{code}",
f"标题:{title}",
]
if actress:
lines.append(f"演员:{actress}")
if date_val and post_time:
lines.append(f"日期:{date_val}(发帖:{post_time}")
elif date_val:
lines.append(f"日期:{date_val}")
elif post_time:
lines.append(f"发帖:{post_time}")
if magnet:
lines.append(f"磁力:{magnet}")
if magnet_115:
lines.append(f"115磁力{magnet_115}")
return "\n".join(lines)
@plugin_stats_decorator(plugin_name="番号查询")
@plugin_points_cost(10, "番号查询消耗积分", FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 参数检查
parts = content.split(" ")
if len(parts) < 2:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender)
return False, "命令格式错误"
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
user_code = self._normalize_code(content[len(command):].strip())
if not user_code:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}", sender)
return False, "命令格式错误"
try:
doc = self._query_collections(user_code)
target = roomid if roomid else sender
if not doc:
await bot.send_text_message(target, f"未找到番号:{user_code}", sender)
return False, "未找到"
text = self._format_result(doc)
await bot.send_text_message(target, text, sender)
return True, "查询成功"
except Exception as e:
self.LOG.error(f"处理番号查询出错: {e}")
return False, f"处理出错: {e}"
def get_plugin():
return FanhaoSearchPlugin()