""" KFC文案插件 支持指令触发和定时推送(每周四每两小时) """ import tomllib import asyncio import aiohttp from pathlib import Path from loguru import logger from typing import Optional from utils.plugin_base import PluginBase from utils.decorators import on_text_message, schedule from WechatHook import WechatHookClient # 可选导入代理支持 try: from aiohttp_socks import ProxyConnector PROXY_SUPPORT = True except ImportError: PROXY_SUPPORT = False logger.warning("aiohttp_socks 未安装,代理功能将不可用") class KFC(PluginBase): """KFC文案插件""" description = "KFC文案 - 指令触发和定时推送" author = "ShiHao" version = "1.0.0" def __init__(self): super().__init__() self.config = None async def async_init(self): """异步初始化""" try: config_path = Path(__file__).parent / "config.toml" if not config_path.exists(): logger.error(f"KFC文案插件配置文件不存在: {config_path}") return with open(config_path, "rb") as f: self.config = tomllib.load(f) logger.success("KFC文案插件已加载") except Exception as e: logger.error(f"KFC文案插件初始化失败: {e}") self.config = None async def _fetch_kfc(self) -> Optional[str]: """获取KFC文案""" try: api_config = self.config["api"] timeout = aiohttp.ClientTimeout(total=api_config["timeout"]) # 配置代理 connector = None proxy_config = self.config.get("proxy", {}) if proxy_config.get("enabled", False): proxy_type = proxy_config.get("type", "socks5").upper() proxy_host = proxy_config.get("host", "127.0.0.1") proxy_port = proxy_config.get("port", 7890) proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}" if PROXY_SUPPORT: try: connector = ProxyConnector.from_url(proxy_url) except Exception as e: logger.warning(f"代理配置失败,将直连: {e}") connector = None async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: async with session.get(api_config["base_url"]) as resp: if resp.status != 200: error_text = await resp.text() logger.error(f"KFC文案 API 错误: {resp.status}, {error_text}") return None result = await resp.json() if result.get("code") != 200: logger.error(f"KFC文案 API 返回错误: {result.get('message')}") return None data = result.get("data", {}) kfc_text = data.get("kfc", "") if not kfc_text: logger.warning("KFC文案 API 返回数据为空") return None logger.info(f"获取KFC文案成功") return kfc_text except Exception as e: logger.error(f"获取KFC文案失败: {e}") import traceback logger.error(traceback.format_exc()) return None @on_text_message(priority=70) async def handle_command(self, bot: WechatHookClient, message: dict): """处理指令触发""" if self.config is None: return True content = message.get("Content", "").strip() from_wxid = message.get("FromWxid", "") is_group = message.get("IsGroup", False) # 检查是否是触发指令 keywords = self.config["behavior"]["command_keywords"] matched = False for keyword in keywords: if content == keyword or content.endswith(f" {keyword}"): matched = True break if not matched: return True if not self.config["behavior"]["enabled"]: return True # 检查群聊过滤 if is_group: enabled_groups = self.config["behavior"]["enabled_groups"] disabled_groups = self.config["behavior"]["disabled_groups"] if from_wxid in disabled_groups: return True if enabled_groups and from_wxid not in enabled_groups: return True logger.info(f"收到KFC文案请求: {from_wxid}") try: kfc_text = await self._fetch_kfc() if not kfc_text: await bot.send_text(from_wxid, "❌ 获取KFC文案失败,请稍后重试") return False # 发送KFC文案 await bot.send_text(from_wxid, kfc_text) logger.success(f"已发送KFC文案") except Exception as e: logger.error(f"处理KFC文案请求失败: {e}") await bot.send_text(from_wxid, f"❌ 请求失败: {str(e)}") return False # 为每个时间点创建一个定时任务 @schedule('cron', day_of_week=3, hour=0, minute=0) async def scheduled_push_00(self, bot=None): """定时推送KFC文案(周四 00:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=2, minute=0) async def scheduled_push_02(self, bot=None): """定时推送KFC文案(周四 02:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=4, minute=0) async def scheduled_push_04(self, bot=None): """定时推送KFC文案(周四 04:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=6, minute=0) async def scheduled_push_06(self, bot=None): """定时推送KFC文案(周四 06:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=8, minute=0) async def scheduled_push_08(self, bot=None): """定时推送KFC文案(周四 08:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=10, minute=0) async def scheduled_push_10(self, bot=None): """定时推送KFC文案(周四 10:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=12, minute=0) async def scheduled_push_12(self, bot=None): """定时推送KFC文案(周四 12:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=14, minute=0) async def scheduled_push_14(self, bot=None): """定时推送KFC文案(周四 14:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=16, minute=0) async def scheduled_push_16(self, bot=None): """定时推送KFC文案(周四 16:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=18, minute=0) async def scheduled_push_18(self, bot=None): """定时推送KFC文案(周四 18:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=20, minute=0) async def scheduled_push_20(self, bot=None): """定时推送KFC文案(周四 20:00)""" await self._do_scheduled_push(bot) @schedule('cron', day_of_week=3, hour=22, minute=0) async def scheduled_push_22(self, bot=None): """定时推送KFC文案(周四 22:00)""" await self._do_scheduled_push(bot) async def _do_scheduled_push(self, bot=None): """执行定时推送""" if not self.config or not self.config["schedule"]["enabled"]: return logger.info("开始执行KFC文案定时推送任务") try: # 获取bot实例 if not bot: from utils.plugin_manager import PluginManager bot = PluginManager().bot if not bot: logger.error("定时任务:无法获取bot实例") return # 获取KFC文案 kfc_text = await self._fetch_kfc() if not kfc_text: logger.error("定时任务:获取KFC文案失败") return # 获取目标群组 enabled_groups = self.config["behavior"]["enabled_groups"] disabled_groups = self.config["behavior"]["disabled_groups"] # 如果没有配置enabled_groups,获取所有群聊 target_groups = [] if enabled_groups: target_groups = [g for g in enabled_groups if g not in disabled_groups] else: # 从MessageLogger数据库获取所有群聊 try: from plugins.MessageLogger.main import MessageLogger msg_logger = MessageLogger.get_instance() if msg_logger: with msg_logger.get_db_connection() as conn: with conn.cursor() as cursor: sql = """ SELECT DISTINCT group_id FROM messages WHERE is_group = 1 AND group_id IS NOT NULL AND group_id != '' """ cursor.execute(sql) results = cursor.fetchall() target_groups = [row[0] for row in results if row[0] not in disabled_groups] logger.info(f"从数据库获取到 {len(target_groups)} 个群聊") else: logger.warning("MessageLogger实例不存在,无法获取群聊列表") return except Exception as e: logger.error(f"获取群聊列表失败: {e}") return if not target_groups: logger.warning("没有找到目标群聊,跳过定时推送") return success_count = 0 group_interval = self.config["schedule"]["group_interval"] for group_id in target_groups: try: logger.info(f"向群聊 {group_id} 推送KFC文案") # 发送KFC文案 await bot.send_text(group_id, kfc_text) success_count += 1 logger.success(f"群聊 {group_id} 推送成功") # 群聊之间的间隔 await asyncio.sleep(group_interval) except Exception as e: logger.error(f"推送到 {group_id} 失败: {e}") import traceback logger.error(traceback.format_exc()) logger.info(f"KFC文案定时推送完成 - 成功: {success_count}/{len(target_groups)}") except Exception as e: logger.error(f"KFC文案定时推送失败: {e}") import traceback logger.error(traceback.format_exc()) def get_llm_tools(self): """返回LLM工具定义""" return [{ "type": "function", "function": { "name": "get_kfc", "description": "获取KFC疯狂星期四文案。当用户询问KFC、疯狂星期四、肯德基等内容时调用此工具。", "parameters": { "type": "object", "properties": {}, "required": [] } } }] async def execute_llm_tool(self, tool_name: str, arguments: dict, bot: WechatHookClient, from_wxid: str) -> dict: """执行LLM工具调用""" if tool_name != "get_kfc": return None try: logger.info(f"LLM工具调用KFC文案: {from_wxid}") kfc_text = await self._fetch_kfc() if not kfc_text: return { "success": False, "message": "获取KFC文案失败,请稍后重试" } # 发送KFC文案 await bot.send_text(from_wxid, kfc_text) return { "success": True, "message": f"已发送KFC文案", "no_reply": True # 已发送内容,不需要AI再回复 } except Exception as e: logger.error(f"LLM工具执行失败: {e}") return { "success": False, "message": f"执行失败: {str(e)}" }