Files
2025-12-03 15:48:44 +08:00

356 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
随机发病文学插件
支持指令触发和定时推送
"""
import tomllib
import asyncio
import aiohttp
import random
from pathlib import Path
from loguru import logger
from typing import Optional
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message, schedule
from WechatHook import WechatHookClient
# 可选导入代理支持
try:
from aiohttp_socks import ProxyConnector
PROXY_SUPPORT = True
except ImportError:
PROXY_SUPPORT = False
logger.warning("aiohttp_socks 未安装,代理功能将不可用")
class Fabing(PluginBase):
"""随机发病文学插件"""
description = "随机发病文学 - 指令触发和定时推送"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
async def async_init(self):
"""异步初始化"""
try:
config_path = Path(__file__).parent / "config.toml"
if not config_path.exists():
logger.error(f"发病文学插件配置文件不存在: {config_path}")
return
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
logger.success("随机发病文学插件已加载")
except Exception as e:
logger.error(f"随机发病文学插件初始化失败: {e}")
self.config = None
async def _fetch_fabing(self, name: str) -> Optional[str]:
"""获取发病文学"""
try:
api_config = self.config["api"]
timeout = aiohttp.ClientTimeout(total=api_config["timeout"])
# 配置代理
connector = None
proxy_config = self.config.get("proxy", {})
if proxy_config.get("enabled", False):
proxy_type = proxy_config.get("type", "socks5").upper()
proxy_host = proxy_config.get("host", "127.0.0.1")
proxy_port = proxy_config.get("port", 7890)
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
if PROXY_SUPPORT:
try:
connector = ProxyConnector.from_url(proxy_url)
except Exception as e:
logger.warning(f"代理配置失败,将直连: {e}")
connector = None
params = {"name": name}
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
async with session.get(api_config["base_url"], params=params) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(f"发病文学 API 错误: {resp.status}, {error_text}")
return None
result = await resp.json()
if result.get("code") != 200:
logger.error(f"发病文学 API 返回错误: {result.get('message')}")
return None
data = result.get("data", {})
saying = data.get("saying", "")
if not saying:
logger.warning("发病文学 API 返回数据为空")
return None
logger.info(f"获取发病文学成功: {name}")
return saying
except Exception as e:
logger.error(f"获取发病文学失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
async def _get_random_group_member(self, bot: WechatHookClient, group_id: str) -> Optional[str]:
"""从群组中随机抽取一名成员的昵称"""
try:
# 从MessageLogger数据库中获取该群组的所有成员昵称
from plugins.MessageLogger.main import MessageLogger
msg_logger = MessageLogger.get_instance()
if not msg_logger:
logger.warning("MessageLogger实例不存在无法获取群成员")
return None
with msg_logger.get_db_connection() as conn:
with conn.cursor() as cursor:
# 查询该群组最近活跃的成员昵称(去重)
sql = """
SELECT DISTINCT nickname
FROM messages
WHERE group_id = %s
AND nickname != ''
AND nickname IS NOT NULL
ORDER BY create_time DESC
LIMIT 100
"""
cursor.execute(sql, (group_id,))
results = cursor.fetchall()
if not results:
logger.warning(f"群组 {group_id} 没有找到成员昵称")
return None
# 提取昵称列表
nicknames = [row[0] for row in results]
# 随机选择一个昵称
selected_nickname = random.choice(nicknames)
logger.info(f"从群组 {group_id} 随机选择了昵称: {selected_nickname}")
return selected_nickname
except Exception as e:
logger.error(f"获取随机群成员失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
@on_text_message(priority=70)
async def handle_command(self, bot: WechatHookClient, message: dict):
"""处理指令触发"""
if self.config is None:
return True
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
is_group = message.get("IsGroup", False)
# 检查是否是触发指令
keywords = self.config["behavior"]["command_keywords"]
matched = False
name = None
for keyword in keywords:
# 支持 "发病 xxx" 或 "@机器人 发病 xxx"
if content.startswith(keyword + " ") or content.endswith(" " + keyword + " "):
matched = True
# 提取名字
parts = content.split()
for i, part in enumerate(parts):
if part == keyword or part == keyword.lstrip("/"):
if i + 1 < len(parts):
name = parts[i + 1]
break
break
elif content == keyword:
matched = True
name = None # 没有指定名字
break
if not matched:
return True
if not self.config["behavior"]["enabled"]:
return True
# 检查群聊过滤
if is_group:
enabled_groups = self.config["behavior"]["enabled_groups"]
disabled_groups = self.config["behavior"]["disabled_groups"]
if from_wxid in disabled_groups:
return True
if enabled_groups and from_wxid not in enabled_groups:
return True
# 如果没有指定名字,从群成员中随机选择
if not name and is_group:
name = await self._get_random_group_member(bot, from_wxid)
if not name:
await bot.send_text(from_wxid, "❌ 无法获取群成员信息")
return False
elif not name:
await bot.send_text(from_wxid, "❌ 请指定名字\n格式:发病 名字")
return False
logger.info(f"收到发病文学请求: {from_wxid}, name={name}")
try:
saying = await self._fetch_fabing(name)
if not saying:
await bot.send_text(from_wxid, "❌ 获取发病文学失败,请稍后重试")
return False
# 发送发病文学
await bot.send_text(from_wxid, saying)
logger.success(f"已发送发病文学: {name}")
except Exception as e:
logger.error(f"处理发病文学请求失败: {e}")
await bot.send_text(from_wxid, f"❌ 请求失败: {str(e)}")
return False
@schedule('cron', minute=0)
async def scheduled_push(self, bot=None):
"""定时推送发病文学(每小时整点)"""
if not self.config or not self.config["schedule"]["enabled"]:
return
logger.info("开始执行发病文学定时推送任务")
try:
# 获取bot实例
if not bot:
from utils.plugin_manager import PluginManager
bot = PluginManager().bot
if not bot:
logger.error("定时任务无法获取bot实例")
return
# 获取目标群组
enabled_groups = self.config["behavior"]["enabled_groups"]
disabled_groups = self.config["behavior"]["disabled_groups"]
# 如果没有配置enabled_groups跳过
if not enabled_groups:
logger.warning("未配置群组白名单,跳过定时推送")
return
success_count = 0
group_interval = self.config["schedule"]["group_interval"]
for group_id in enabled_groups:
if group_id in disabled_groups:
continue
try:
logger.info(f"向群聊 {group_id} 推送发病文学")
# 从群成员中随机选择一个昵称
name = await self._get_random_group_member(bot, group_id)
if not name:
logger.warning(f"群聊 {group_id} 无法获取群成员昵称")
continue
# 获取发病文学
saying = await self._fetch_fabing(name)
if not saying:
logger.warning(f"群聊 {group_id} 获取发病文学失败")
continue
# 发送发病文学
await bot.send_text(group_id, saying)
success_count += 1
logger.success(f"群聊 {group_id} 推送成功")
# 群聊之间的间隔
await asyncio.sleep(group_interval)
except Exception as e:
logger.error(f"推送到 {group_id} 失败: {e}")
import traceback
logger.error(traceback.format_exc())
logger.info(f"发病文学定时推送完成 - 成功: {success_count}/{len(enabled_groups)}")
except Exception as e:
logger.error(f"发病文学定时推送失败: {e}")
import traceback
logger.error(traceback.format_exc())
def get_llm_tools(self):
"""返回LLM工具定义"""
return [{
"type": "function",
"function": {
"name": "get_fabing",
"description": "获取随机发病文学。当用户要求发病、整活、发疯等内容时调用此工具。",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "要发病的对象名字"
}
},
"required": ["name"]
}
}
}]
async def execute_llm_tool(self, tool_name: str, arguments: dict, bot: WechatHookClient, from_wxid: str) -> dict:
"""执行LLM工具调用"""
if tool_name != "get_fabing":
return None
try:
logger.info(f"LLM工具调用发病文学: {from_wxid}")
name = arguments.get("name")
if not name:
return {
"success": False,
"message": "缺少名字参数"
}
saying = await self._fetch_fabing(name)
if not saying:
return {
"success": False,
"message": "获取发病文学失败,请稍后重试"
}
# 发送发病文学
await bot.send_text(from_wxid, saying)
return {
"success": True,
"message": f"已发送发病文学",
"no_reply": True # 已发送内容不需要AI再回复
}
except Exception as e:
logger.error(f"LLM工具执行失败: {e}")
return {
"success": False,
"message": f"执行失败: {str(e)}"
}