Files
WechatHookBot/plugins/KFC/main.py
2025-12-03 15:48:44 +08:00

354 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
KFC文案插件
支持指令触发和定时推送(每周四每两小时)
"""
import tomllib
import asyncio
import aiohttp
from pathlib import Path
from loguru import logger
from typing import Optional
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message, schedule
from WechatHook import WechatHookClient
# 可选导入代理支持
try:
from aiohttp_socks import ProxyConnector
PROXY_SUPPORT = True
except ImportError:
PROXY_SUPPORT = False
logger.warning("aiohttp_socks 未安装,代理功能将不可用")
class KFC(PluginBase):
"""KFC文案插件"""
description = "KFC文案 - 指令触发和定时推送"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
async def async_init(self):
"""异步初始化"""
try:
config_path = Path(__file__).parent / "config.toml"
if not config_path.exists():
logger.error(f"KFC文案插件配置文件不存在: {config_path}")
return
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
logger.success("KFC文案插件已加载")
except Exception as e:
logger.error(f"KFC文案插件初始化失败: {e}")
self.config = None
async def _fetch_kfc(self) -> Optional[str]:
"""获取KFC文案"""
try:
api_config = self.config["api"]
timeout = aiohttp.ClientTimeout(total=api_config["timeout"])
# 配置代理
connector = None
proxy_config = self.config.get("proxy", {})
if proxy_config.get("enabled", False):
proxy_type = proxy_config.get("type", "socks5").upper()
proxy_host = proxy_config.get("host", "127.0.0.1")
proxy_port = proxy_config.get("port", 7890)
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
if PROXY_SUPPORT:
try:
connector = ProxyConnector.from_url(proxy_url)
except Exception as e:
logger.warning(f"代理配置失败,将直连: {e}")
connector = None
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
async with session.get(api_config["base_url"]) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(f"KFC文案 API 错误: {resp.status}, {error_text}")
return None
result = await resp.json()
if result.get("code") != 200:
logger.error(f"KFC文案 API 返回错误: {result.get('message')}")
return None
data = result.get("data", {})
kfc_text = data.get("kfc", "")
if not kfc_text:
logger.warning("KFC文案 API 返回数据为空")
return None
logger.info(f"获取KFC文案成功")
return kfc_text
except Exception as e:
logger.error(f"获取KFC文案失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
@on_text_message(priority=70)
async def handle_command(self, bot: WechatHookClient, message: dict):
"""处理指令触发"""
if self.config is None:
return True
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
is_group = message.get("IsGroup", False)
# 检查是否是触发指令
keywords = self.config["behavior"]["command_keywords"]
matched = False
for keyword in keywords:
if content == keyword or content.endswith(f" {keyword}"):
matched = True
break
if not matched:
return True
if not self.config["behavior"]["enabled"]:
return True
# 检查群聊过滤
if is_group:
enabled_groups = self.config["behavior"]["enabled_groups"]
disabled_groups = self.config["behavior"]["disabled_groups"]
if from_wxid in disabled_groups:
return True
if enabled_groups and from_wxid not in enabled_groups:
return True
logger.info(f"收到KFC文案请求: {from_wxid}")
try:
kfc_text = await self._fetch_kfc()
if not kfc_text:
await bot.send_text(from_wxid, "❌ 获取KFC文案失败请稍后重试")
return False
# 发送KFC文案
await bot.send_text(from_wxid, kfc_text)
logger.success(f"已发送KFC文案")
except Exception as e:
logger.error(f"处理KFC文案请求失败: {e}")
await bot.send_text(from_wxid, f"❌ 请求失败: {str(e)}")
return False
# 为每个时间点创建一个定时任务
@schedule('cron', day_of_week=3, hour=0, minute=0)
async def scheduled_push_00(self, bot=None):
"""定时推送KFC文案周四 00:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=2, minute=0)
async def scheduled_push_02(self, bot=None):
"""定时推送KFC文案周四 02:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=4, minute=0)
async def scheduled_push_04(self, bot=None):
"""定时推送KFC文案周四 04:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=6, minute=0)
async def scheduled_push_06(self, bot=None):
"""定时推送KFC文案周四 06:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=8, minute=0)
async def scheduled_push_08(self, bot=None):
"""定时推送KFC文案周四 08:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=10, minute=0)
async def scheduled_push_10(self, bot=None):
"""定时推送KFC文案周四 10:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=12, minute=0)
async def scheduled_push_12(self, bot=None):
"""定时推送KFC文案周四 12:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=14, minute=0)
async def scheduled_push_14(self, bot=None):
"""定时推送KFC文案周四 14:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=16, minute=0)
async def scheduled_push_16(self, bot=None):
"""定时推送KFC文案周四 16:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=18, minute=0)
async def scheduled_push_18(self, bot=None):
"""定时推送KFC文案周四 18:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=20, minute=0)
async def scheduled_push_20(self, bot=None):
"""定时推送KFC文案周四 20:00"""
await self._do_scheduled_push(bot)
@schedule('cron', day_of_week=3, hour=22, minute=0)
async def scheduled_push_22(self, bot=None):
"""定时推送KFC文案周四 22:00"""
await self._do_scheduled_push(bot)
async def _do_scheduled_push(self, bot=None):
"""执行定时推送"""
if not self.config or not self.config["schedule"]["enabled"]:
return
logger.info("开始执行KFC文案定时推送任务")
try:
# 获取bot实例
if not bot:
from utils.plugin_manager import PluginManager
bot = PluginManager().bot
if not bot:
logger.error("定时任务无法获取bot实例")
return
# 获取KFC文案
kfc_text = await self._fetch_kfc()
if not kfc_text:
logger.error("定时任务获取KFC文案失败")
return
# 获取目标群组
enabled_groups = self.config["behavior"]["enabled_groups"]
disabled_groups = self.config["behavior"]["disabled_groups"]
# 如果没有配置enabled_groups获取所有群聊
target_groups = []
if enabled_groups:
target_groups = [g for g in enabled_groups if g not in disabled_groups]
else:
# 从MessageLogger数据库获取所有群聊
try:
from plugins.MessageLogger.main import MessageLogger
msg_logger = MessageLogger.get_instance()
if msg_logger:
with msg_logger.get_db_connection() as conn:
with conn.cursor() as cursor:
sql = """
SELECT DISTINCT group_id
FROM messages
WHERE is_group = 1
AND group_id IS NOT NULL
AND group_id != ''
"""
cursor.execute(sql)
results = cursor.fetchall()
target_groups = [row[0] for row in results if row[0] not in disabled_groups]
logger.info(f"从数据库获取到 {len(target_groups)} 个群聊")
else:
logger.warning("MessageLogger实例不存在无法获取群聊列表")
return
except Exception as e:
logger.error(f"获取群聊列表失败: {e}")
return
if not target_groups:
logger.warning("没有找到目标群聊,跳过定时推送")
return
success_count = 0
group_interval = self.config["schedule"]["group_interval"]
for group_id in target_groups:
try:
logger.info(f"向群聊 {group_id} 推送KFC文案")
# 发送KFC文案
await bot.send_text(group_id, kfc_text)
success_count += 1
logger.success(f"群聊 {group_id} 推送成功")
# 群聊之间的间隔
await asyncio.sleep(group_interval)
except Exception as e:
logger.error(f"推送到 {group_id} 失败: {e}")
import traceback
logger.error(traceback.format_exc())
logger.info(f"KFC文案定时推送完成 - 成功: {success_count}/{len(target_groups)}")
except Exception as e:
logger.error(f"KFC文案定时推送失败: {e}")
import traceback
logger.error(traceback.format_exc())
def get_llm_tools(self):
"""返回LLM工具定义"""
return [{
"type": "function",
"function": {
"name": "get_kfc",
"description": "获取KFC疯狂星期四文案。当用户询问KFC、疯狂星期四、肯德基等内容时调用此工具。",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
}]
async def execute_llm_tool(self, tool_name: str, arguments: dict, bot: WechatHookClient, from_wxid: str) -> dict:
"""执行LLM工具调用"""
if tool_name != "get_kfc":
return None
try:
logger.info(f"LLM工具调用KFC文案: {from_wxid}")
kfc_text = await self._fetch_kfc()
if not kfc_text:
return {
"success": False,
"message": "获取KFC文案失败请稍后重试"
}
# 发送KFC文案
await bot.send_text(from_wxid, kfc_text)
return {
"success": True,
"message": f"已发送KFC文案",
"no_reply": True # 已发送内容不需要AI再回复
}
except Exception as e:
logger.error(f"LLM工具执行失败: {e}")
return {
"success": False,
"message": f"执行失败: {str(e)}"
}