Files
2025-12-03 15:48:44 +08:00

535 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Epic免费游戏插件
定时推送和指令查询Epic免费游戏信息
"""
import tomllib
import asyncio
import aiohttp
from pathlib import Path
from datetime import datetime
from loguru import logger
from typing import List, Dict, Optional
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message, schedule
from WechatHook import WechatHookClient
# 可选导入代理支持
try:
from aiohttp_socks import ProxyConnector
PROXY_SUPPORT = True
except ImportError:
PROXY_SUPPORT = False
logger.warning("aiohttp_socks 未安装,代理功能将不可用")
class EpicFreeGames(PluginBase):
"""Epic免费游戏插件"""
description = "Epic免费游戏 - 定时推送和指令查询"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
async def async_init(self):
"""异步初始化"""
try:
config_path = Path(__file__).parent / "config.toml"
if not config_path.exists():
logger.error(f"Epic免费游戏插件配置文件不存在: {config_path}")
return
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
logger.success("Epic免费游戏插件已加载")
# 延迟设置定时任务,确保 scheduler 已启动
asyncio.create_task(self._delayed_setup_schedule())
except Exception as e:
logger.error(f"Epic免费游戏插件初始化失败: {e}")
self.config = None
async def _delayed_setup_schedule(self):
"""延迟设置定时任务,等待 scheduler 启动"""
# 等待1秒确保 scheduler 已经启动
await asyncio.sleep(1)
self._setup_schedule()
async def _fetch_epic_games(self) -> Optional[List[Dict]]:
"""获取Epic免费游戏数据"""
try:
api_config = self.config["api"]
timeout = aiohttp.ClientTimeout(total=api_config["timeout"])
# 配置代理
connector = None
proxy_config = self.config.get("proxy", {})
if proxy_config.get("enabled", False):
proxy_type = proxy_config.get("type", "socks5").upper()
proxy_host = proxy_config.get("host", "127.0.0.1")
proxy_port = proxy_config.get("port", 7890)
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
if PROXY_SUPPORT:
try:
connector = ProxyConnector.from_url(proxy_url)
except Exception as e:
logger.warning(f"代理配置失败,将直连: {e}")
connector = None
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
async with session.get(api_config["base_url"]) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(f"Epic API 错误: {resp.status}, {error_text}")
return None
result = await resp.json()
if result.get("code") != 200:
logger.error(f"Epic API 返回错误: {result.get('message')}")
return None
data = result.get("data", [])
if not data:
logger.warning("Epic API 返回数据为空")
return None
logger.info(f"获取到 {len(data)} 个Epic免费游戏")
return data
except Exception as e:
logger.error(f"获取Epic免费游戏失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
async def _translate_description(self, description: str) -> str:
"""使用AI翻译并润色游戏简介"""
try:
if not self.config["ai"]["enabled"]:
return description
# 读取AIChat插件的配置
aichat_config_path = Path(__file__).parent.parent / "AIChat" / "config.toml"
if not aichat_config_path.exists():
logger.warning("AIChat配置文件不存在跳过AI润色")
return description
with open(aichat_config_path, "rb") as f:
aichat_config = tomllib.load(f)
api_config = aichat_config["api"]
prompt = self.config["ai"]["prompt"]
payload = {
"model": api_config["model"],
"messages": [
{"role": "user", "content": f"{prompt}\n\n{description}"}
],
"max_tokens": 8000,
"temperature": 0.7
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_config['api_key']}"
}
timeout = aiohttp.ClientTimeout(total=api_config["timeout"])
# 配置代理
connector = None
proxy_config = aichat_config.get("proxy", {})
if proxy_config.get("enabled", False):
proxy_type = proxy_config.get("type", "socks5").upper()
proxy_host = proxy_config.get("host", "127.0.0.1")
proxy_port = proxy_config.get("port", 7890)
proxy_url = f"{proxy_type}://{proxy_host}:{proxy_port}"
if PROXY_SUPPORT:
try:
connector = ProxyConnector.from_url(proxy_url)
except Exception as e:
logger.warning(f"代理配置失败,将直连: {e}")
connector = None
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
async with session.post(
api_config["url"],
json=payload,
headers=headers
) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(f"AI API 错误: {resp.status}, {error_text}")
return description
result = await resp.json()
if "choices" not in result or not result["choices"]:
logger.error("AI API 返回格式错误")
return description
translated = result["choices"][0]["message"]["content"].strip()
logger.info(f"AI翻译成功: {translated[:50]}...")
return translated
except Exception as e:
logger.error(f"AI翻译失败: {e}")
return description
async def _send_games_info(self, bot: WechatHookClient, to_wxid: str, games: List[Dict]):
"""发送所有游戏信息(聊天记录格式,包含文本和链接卡片)"""
try:
if not games:
return
# 准备所有游戏的数据
game_items = []
for game in games:
title = game.get("title", "未知游戏")
description = game.get("description", "")
free_start = game.get("free_start", "")
free_end = game.get("free_end", "")
link = game.get("link", "")
cover = game.get("cover", "")
original_price_desc = game.get("original_price_desc", "")
# AI翻译简介
logger.info(f"正在翻译游戏简介: {title}")
translated_desc = await self._translate_description(description)
game_items.append({
"title": title,
"translated_desc": translated_desc,
"free_start": free_start,
"free_end": free_end,
"link": link,
"cover": cover,
"original_price_desc": original_price_desc
})
# 发送聊天记录(包含所有游戏的文本和链接卡片)
await self._send_chat_records_with_links(bot, to_wxid, "Epic免费游戏", game_items)
logger.success(f"已发送 {len(games)} 个游戏信息")
except Exception as e:
logger.error(f"发送游戏信息失败: {e}")
import traceback
logger.error(traceback.format_exc())
async def _send_chat_records_with_links(self, bot: WechatHookClient, from_wxid: str, title: str, game_items: List[Dict]):
"""发送聊天记录格式消息(包含文本和链接卡片)"""
try:
import uuid
import time
import hashlib
import xml.etree.ElementTree as ET
is_group = from_wxid.endswith("@chatroom")
# 构建聊天记录数据项列表
data_items = []
for game in game_items:
# 1. 添加文本消息
text_content = f"""🎮游戏名称:{game['title']}
⏰活动时间:{game['free_start']} - {game['free_end']}
💰原价:{game['original_price_desc']}
📖简介:{game['translated_desc']}
↓点击下方卡片直达↓"""
data_items.append({
"type": "text",
"content": text_content
})
# 2. 添加链接卡片
data_items.append({
"type": "link",
"title": game['title'],
"desc": game['translated_desc'],
"url": game['link'],
"cover": game['cover']
})
# 构造聊天记录XML
recordinfo = ET.Element("recordinfo")
info_el = ET.SubElement(recordinfo, "info")
info_el.text = title
is_group_el = ET.SubElement(recordinfo, "isChatRoom")
is_group_el.text = "1" if is_group else "0"
datalist = ET.SubElement(recordinfo, "datalist")
datalist.set("count", str(len(data_items)))
desc_el = ET.SubElement(recordinfo, "desc")
desc_el.text = title
fromscene_el = ET.SubElement(recordinfo, "fromscene")
fromscene_el.text = "3"
for i, item in enumerate(data_items):
di = ET.SubElement(datalist, "dataitem")
di.set("dataid", uuid.uuid4().hex)
src_local_id = str((int(time.time() * 1000) % 90000) + 10000)
new_msg_id = str(int(time.time() * 1000) + i)
create_time = str(int(time.time()) - len(data_items) + i)
ET.SubElement(di, "srcMsgLocalid").text = src_local_id
ET.SubElement(di, "sourcetime").text = time.strftime("%Y-%m-%d %H:%M", time.localtime(int(create_time)))
ET.SubElement(di, "fromnewmsgid").text = new_msg_id
ET.SubElement(di, "srcMsgCreateTime").text = create_time
ET.SubElement(di, "sourcename").text = "Epic助手"
ET.SubElement(di, "sourceheadurl").text = ""
if item["type"] == "text":
# 文本消息
di.set("datatype", "1")
ET.SubElement(di, "datatitle").text = item["content"]
ET.SubElement(di, "datadesc").text = item["content"]
ET.SubElement(di, "datafmt").text = "text"
else:
# 链接卡片
di.set("datatype", "5")
ET.SubElement(di, "datatitle").text = item["title"]
ET.SubElement(di, "datadesc").text = "点击查看"
ET.SubElement(di, "datafmt").text = "url"
ET.SubElement(di, "link").text = item["url"]
# 添加链接卡片的额外信息
weburlitem = ET.SubElement(di, "weburlitem")
ET.SubElement(weburlitem, "thumburl").text = item["cover"]
ET.SubElement(di, "thumbwidth").text = "200"
ET.SubElement(di, "thumbheight").text = "200"
ET.SubElement(weburlitem, "title").text = item["title"]
ET.SubElement(weburlitem, "link").text = item["url"]
ET.SubElement(weburlitem, "desc").text = item["desc"]
appmsgshareitem = ET.SubElement(weburlitem, "appmsgshareitem")
ET.SubElement(appmsgshareitem, "itemshowtype").text = "-1"
ET.SubElement(di, "ischatroom").text = "1" if is_group else "0"
dataitemsource = ET.SubElement(di, "dataitemsource")
ET.SubElement(dataitemsource, "hashusername").text = hashlib.sha256(from_wxid.encode("utf-8")).hexdigest()
record_xml = ET.tostring(recordinfo, encoding="unicode")
appmsg_parts = [
"<appmsg appid=\"\" sdkver=\"0\">",
f"<title>{title}</title>",
f"<des>{title}</des>",
"<type>19</type>",
"<url>https://support.weixin.qq.com/cgi-bin/mmsupport-bin/readtemplate?t=page/favorite_record__w_unsupport</url>",
"<appattach><cdnthumbaeskey></cdnthumbaeskey><aeskey></aeskey></appattach>",
f"<recorditem><![CDATA[{record_xml}]]></recorditem>",
"<percent>0</percent>",
"</appmsg>"
]
appmsg_xml = "".join(appmsg_parts)
await bot._send_data_async(11214, {"to_wxid": from_wxid, "content": appmsg_xml})
logger.success(f"已发送聊天记录: {title},包含 {len(game_items)} 个游戏")
except Exception as e:
logger.error(f"发送聊天记录失败: {e}")
import traceback
logger.error(traceback.format_exc())
@on_text_message(priority=70)
async def handle_command(self, bot: WechatHookClient, message: dict):
"""处理指令触发"""
if self.config is None:
return True
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
is_group = message.get("IsGroup", False)
# 检查是否是触发指令
keywords = self.config["behavior"]["command_keywords"]
matched = False
for keyword in keywords:
if content == keyword or content.endswith(f" {keyword}"):
matched = True
break
if not matched:
return True
if not self.config["behavior"]["enabled"]:
return True
# 检查群聊过滤
if is_group:
enabled_groups = self.config["behavior"]["enabled_groups"]
disabled_groups = self.config["behavior"]["disabled_groups"]
if from_wxid in disabled_groups:
return True
if enabled_groups and from_wxid not in enabled_groups:
return True
logger.info(f"收到Epic免费游戏查询请求: {from_wxid}")
await bot.send_text(from_wxid, "🎮 正在获取Epic免费游戏信息请稍候...")
try:
games = await self._fetch_epic_games()
if not games:
await bot.send_text(from_wxid, "❌ 获取Epic免费游戏失败请稍后重试")
return False
# 发送所有游戏的信息(包括当前免费和即将免费的)
await self._send_games_info(bot, from_wxid, games)
logger.success(f"已发送 {len(games)} 个Epic免费游戏信息")
except Exception as e:
logger.error(f"处理Epic免费游戏查询失败: {e}")
await bot.send_text(from_wxid, f"❌ 查询失败: {str(e)}")
return False
def _setup_schedule(self):
"""动态设置定时任务"""
if not self.config or not self.config["schedule"]["enabled"]:
return
from utils.decorators import scheduler
hour = self.config["schedule"]["hour"]
minute = self.config["schedule"]["minute"]
# 添加定时任务
scheduler.add_job(
self.scheduled_push,
'cron',
hour=hour,
minute=minute,
id='epic_scheduled_push',
replace_existing=True
)
logger.info(f"Epic定时任务已设置: 每天 {hour:02d}:{minute:02d}")
async def scheduled_push(self, bot=None):
"""定时推送Epic免费游戏"""
if not self.config or not self.config["schedule"]["enabled"]:
return
logger.info("开始执行Epic免费游戏定时推送任务")
try:
games = await self._fetch_epic_games()
if not games:
logger.error("定时任务获取Epic免费游戏失败")
return
# 获取bot实例
if not bot:
from utils.plugin_manager import PluginManager
bot = PluginManager().bot
if not bot:
logger.error("定时任务无法获取bot实例")
return
# 获取目标群组
enabled_groups = self.config["behavior"]["enabled_groups"]
disabled_groups = self.config["behavior"]["disabled_groups"]
# 如果没有配置enabled_groups从main_config获取所有群聊
if not enabled_groups:
import tomllib
main_config_path = Path(__file__).parent.parent.parent / "main_config.toml"
with open(main_config_path, "rb") as f:
main_config = tomllib.load(f)
# 这里需要从数据库或其他地方获取所有群聊列表
# 暂时只推送到配置的群组
logger.warning("未配置群组白名单,跳过定时推送")
return
success_count = 0
group_interval = self.config["schedule"]["group_interval"]
for group_id in enabled_groups:
if group_id in disabled_groups:
continue
try:
logger.info(f"向群聊 {group_id} 推送Epic免费游戏")
# 发送所有游戏的信息(包括当前免费和即将免费的)
await self._send_games_info(bot, group_id, games)
success_count += 1
logger.success(f"群聊 {group_id} 推送成功")
# 群聊之间的间隔
await asyncio.sleep(group_interval)
except Exception as e:
logger.error(f"推送到 {group_id} 失败: {e}")
import traceback
logger.error(traceback.format_exc())
logger.info(f"Epic免费游戏定时推送完成 - 成功: {success_count}/{len(enabled_groups)}")
except Exception as e:
logger.error(f"Epic免费游戏定时推送失败: {e}")
import traceback
logger.error(traceback.format_exc())
def get_llm_tools(self):
"""返回LLM工具定义"""
return [{
"type": "function",
"function": {
"name": "get_epic_free_games",
"description": "获取Epic商店当前免费游戏信息。当用户询问Epic免费游戏、Epic喜加一等内容时调用此工具。",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
}]
async def execute_llm_tool(self, tool_name: str, arguments: dict, bot: WechatHookClient, from_wxid: str) -> dict:
"""执行LLM工具调用"""
if tool_name != "get_epic_free_games":
return None
try:
logger.info(f"LLM工具调用Epic免费游戏: {from_wxid}")
games = await self._fetch_epic_games()
if not games:
return {
"success": False,
"message": "获取Epic免费游戏失败请稍后重试"
}
# 发送所有游戏的信息(包括当前免费和即将免费的)
await self._send_games_info(bot, from_wxid, games)
return {
"success": True,
"message": f"已获取并发送 {len(games)} 个Epic免费游戏信息",
"no_reply": True # 已发送游戏信息不需要AI再回复
}
except Exception as e:
logger.error(f"LLM工具执行失败: {e}")
return {
"success": False,
"message": f"执行失败: {str(e)}"
}