Files
abot/plugins/epic_free/main.py
liuwei e57e521900 优化Epic免费游戏推送:对齐官网活动并支持变化推送
变更项:1) 数据源切换为Epic官方freeGamesPromotions接口,不再依赖第三方页面抓取。2) 活动识别升级:区分当前可领与即将免费,仅提取0%折扣活动,按活动窗口过滤脏数据。3) 推送策略从周五固定时点改为小时级检查,并支持 only_on_change 变化去重,避免重复刷屏。4) 新增地区/语言参数(locale、country、allow_countries)与是否包含即将免费配置。5) 增强推送内容:发行商、原价到现价、开始/截止时间、直达链接,信息更贴近官网展示。6) 增加Redis摘要缓存与中文注释,保证活动变化判断稳定可追踪。
2026-04-16 17:59:26 +08:00

412 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import hashlib
import json
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
import requests
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
class EpicFreePlugin(MessagePluginInterface):
"""Epic 免费游戏自动播报插件。"""
FEATURE_KEY = "EPIC"
FEATURE_DESCRIPTION = "📊 EPIC自动播报 [每周五自动发送]"
@property
def name(self) -> str:
return "Epic播报"
@property
def version(self) -> str:
return "1.1.0"
@property
def description(self) -> str:
return "将 Epic 免费游戏播报从系统任务迁移到插件任务。"
@property
def author(self) -> str:
return "ABOT Team"
@property
def commands(self) -> List[str]:
return []
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.db_manager = None
def initialize(self, context: Dict[str, Any]) -> bool:
# 保留数据库管理器,用于读取 Redis 做“活动变化去重推送”。
self.db_manager = context.get("db_manager")
return True
def start(self) -> bool:
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
return False
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
return False, None
def get_schedule_actions(self) -> List[Dict[str, Any]]:
return [
{
"action_key": "weekly_free_games_push",
"name": "Epic免费游戏推送",
"description": "按官网活动信息检查 Epic 免费游戏,变化时推送",
# 用小时级轮询替代“仅周五固定时刻”,避免错过非该时点仍可领取的活动窗口。
"trigger_type": "every_seconds",
"trigger_config": {"seconds": 3600},
"target_scope": "all_enabled_groups",
"target_config": {},
"payload": {
"force": False,
"only_on_change": True,
"include_upcoming": True,
"locale": "zh-CN",
"country": "CN",
"allow_countries": "CN",
},
"default_enabled": True,
}
]
async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]:
if action_key != "weekly_free_games_push":
return {
"success": False,
"summary": f"不支持的动作: {action_key}",
"detail": {"action_key": action_key},
}
if not self.bot:
return {"success": False, "summary": "bot 未注入", "detail": {}}
payload = context.get("payload") or {}
force = bool(payload.get("force", False))
only_on_change = bool(payload.get("only_on_change", True))
include_upcoming = bool(payload.get("include_upcoming", True))
locale = str(payload.get("locale") or "zh-CN").strip() or "zh-CN"
country = str(payload.get("country") or "CN").strip() or "CN"
allow_countries = str(payload.get("allow_countries") or country).strip() or country
target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()]
if not target_groups:
target_groups = [
gid for gid in GroupBotManager.get_group_list()
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED
]
if not target_groups:
return {"success": False, "summary": "没有可推送目标群", "detail": {"target_count": 0}}
try:
games = self._fetch_official_free_games(
locale=locale,
country=country,
allow_countries=allow_countries,
include_upcoming=include_upcoming,
)
except Exception as e:
return {"success": False, "summary": f"获取 Epic 免费游戏失败: {e}", "detail": {"error": str(e)}}
current_count = sum(1 for item in games if item.get("state") == "current")
upcoming_count = sum(1 for item in games if item.get("state") == "upcoming")
digest = self._build_games_digest(games)
if only_on_change and (not force):
last_digest = self._get_last_digest(locale=locale, country=country, include_upcoming=include_upcoming)
if last_digest and last_digest == digest:
return {
"success": True,
"summary": "Epic 活动无变化,已跳过推送",
"detail": {
"skipped": True,
"reason": "no_change",
"current_count": current_count,
"upcoming_count": upcoming_count,
},
}
text = self._render_push_text(
games=games,
locale=locale,
country=country,
include_upcoming=include_upcoming,
)
success_groups = []
failed_groups = {}
for gid in target_groups:
try:
await self.bot.send_text_message(gid, text)
success_groups.append(gid)
except Exception as e:
failed_groups[gid] = str(e)
if len(success_groups) > 0:
self._set_last_digest(
locale=locale,
country=country,
include_upcoming=include_upcoming,
digest=digest,
)
return {
"success": len(failed_groups) == 0,
"summary": f"Epic播报完成: 成功{len(success_groups)}群, 失败{len(failed_groups)}",
"detail": {
"target_count": len(target_groups),
"success_groups": success_groups,
"failed_groups": failed_groups,
"force": force,
"only_on_change": only_on_change,
"include_upcoming": include_upcoming,
"locale": locale,
"country": country,
"current_count": current_count,
"upcoming_count": upcoming_count,
},
}
@staticmethod
def _parse_iso_to_utc(text: str) -> Optional[datetime]:
if not text:
return None
value = str(text).strip()
try:
# Epic 常见格式2026-04-16T15:00:00.000Z
if value.endswith("Z"):
value = value[:-1] + "+00:00"
return datetime.fromisoformat(value).astimezone(timezone.utc)
except Exception:
return None
@staticmethod
def _fmt_utc(dt: Optional[datetime]) -> str:
if not dt:
return "-"
return dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
@staticmethod
def _safe_offer_link(locale: str, element: Dict[str, Any]) -> str:
locale_text = str(locale or "en-US")
mappings = ((element.get("catalogNs") or {}).get("mappings") or [])
if isinstance(mappings, list) and mappings:
page_slug = str((mappings[0] or {}).get("pageSlug") or "").strip()
if page_slug:
return f"https://store.epicgames.com/{locale_text}/p/{page_slug}"
product_slug = str(element.get("productSlug") or "").strip()
if product_slug:
product_slug = product_slug.replace("/home", "").strip("/")
if product_slug:
return f"https://store.epicgames.com/{locale_text}/p/{product_slug}"
return f"https://store.epicgames.com/{locale_text}/free-games"
def _fetch_official_free_games(
self,
locale: str,
country: str,
allow_countries: str,
include_upcoming: bool,
) -> List[Dict[str, Any]]:
"""从 Epic 官方 freeGamesPromotions 接口提取免费活动。
提取规则:
1. 只保留折扣类型为 PERCENTAGE 且折扣为 0 的活动(官网“免费领取”语义)。
2. promotionalOffers -> 当前可领upcomingPromotionalOffers -> 即将可领。
3. 默认返回当前+即将,按活动时间排序,便于推送阅读。
"""
api = (
"https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions"
f"?locale={locale}&country={country}&allowCountries={allow_countries}"
)
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
)
}
response = requests.get(api, headers=headers, timeout=20)
response.raise_for_status()
payload = response.json() or {}
elements = (((payload.get("data") or {}).get("Catalog") or {}).get("searchStore") or {}).get("elements") or []
now_utc = datetime.now(timezone.utc)
records: List[Dict[str, Any]] = []
for element in elements:
title = str(element.get("title") or "").strip()
if not title:
continue
seller_name = str((element.get("seller") or {}).get("name") or "").strip()
total_price = ((element.get("price") or {}).get("totalPrice") or {})
fmt_price = total_price.get("fmtPrice") or {}
original_price_text = str(fmt_price.get("originalPrice") or "").strip()
discount_price_text = str(fmt_price.get("discountPrice") or "").strip()
# 先拆出当前活动窗口。
promotions = element.get("promotions") or {}
promo_groups = [
("current", promotions.get("promotionalOffers") or []),
("upcoming", promotions.get("upcomingPromotionalOffers") or []),
]
for state, buckets in promo_groups:
if state == "upcoming" and not include_upcoming:
continue
for bucket in buckets:
for promo in (bucket or {}).get("promotionalOffers", []) or []:
discount_setting = promo.get("discountSetting") or {}
discount_type = str(discount_setting.get("discountType") or "").upper()
discount_percent = int(discount_setting.get("discountPercentage") or -1)
# 仅保留“0%折扣(即免费)”活动,避免把普通折扣活动混进来。
if not (discount_type == "PERCENTAGE" and discount_percent == 0):
continue
start_utc = self._parse_iso_to_utc(promo.get("startDate"))
end_utc = self._parse_iso_to_utc(promo.get("endDate"))
# 即将活动需在未来;当前活动需在窗口内,避免脏数据导致状态错乱。
if state == "upcoming" and start_utc and start_utc <= now_utc:
continue
if state == "current" and start_utc and end_utc and not (start_utc <= now_utc < end_utc):
continue
record = {
"offer_id": str(element.get("id") or ""),
"namespace": str(element.get("namespace") or ""),
"title": title,
"description": str(element.get("description") or "").strip(),
"seller": seller_name,
"state": state,
"start_utc": start_utc,
"end_utc": end_utc,
"original_price_text": original_price_text or "-",
"discount_price_text": discount_price_text or "0",
"link": self._safe_offer_link(locale=locale, element=element),
}
records.append(record)
# 去重:同一 offer 在同一状态下可能有重复 bucket按活动窗口去重。
uniq = {}
for item in records:
key = (
item.get("offer_id"),
item.get("state"),
self._fmt_utc(item.get("start_utc")),
self._fmt_utc(item.get("end_utc")),
)
uniq[key] = item
records = list(uniq.values())
records.sort(
key=lambda x: (
0 if x.get("state") == "current" else 1,
x.get("end_utc") or datetime.max.replace(tzinfo=timezone.utc),
x.get("start_utc") or datetime.max.replace(tzinfo=timezone.utc),
x.get("title") or "",
)
)
return records
def _build_games_digest(self, games: List[Dict[str, Any]]) -> str:
# 用稳定字段计算摘要,做“活动变化去重推送”。
norm = []
for item in games:
norm.append(
{
"offer_id": item.get("offer_id", ""),
"state": item.get("state", ""),
"start": self._fmt_utc(item.get("start_utc")),
"end": self._fmt_utc(item.get("end_utc")),
"original_price": item.get("original_price_text", ""),
"discount_price": item.get("discount_price_text", ""),
}
)
payload = json.dumps(norm, ensure_ascii=False, sort_keys=True)
return hashlib.md5(payload.encode("utf-8")).hexdigest()
def _redis_key_digest(self, locale: str, country: str, include_upcoming: bool) -> str:
return f"abot:epic_free:last_digest:{locale}:{country}:{1 if include_upcoming else 0}"
def _get_last_digest(self, locale: str, country: str, include_upcoming: bool) -> str:
if not self.db_manager:
return ""
try:
redis_conn = self.db_manager.get_redis_connection()
value = redis_conn.get(self._redis_key_digest(locale, country, include_upcoming))
if isinstance(value, bytes):
return value.decode("utf-8", errors="ignore")
return str(value or "")
except Exception:
return ""
def _set_last_digest(self, locale: str, country: str, include_upcoming: bool, digest: str):
if not self.db_manager:
return
try:
redis_conn = self.db_manager.get_redis_connection()
redis_conn.set(self._redis_key_digest(locale, country, include_upcoming), digest)
except Exception:
# 去重写失败不影响主流程推送。
return
def _render_push_text(
self,
games: List[Dict[str, Any]],
locale: str,
country: str,
include_upcoming: bool,
) -> str:
current_games = [item for item in games if item.get("state") == "current"]
upcoming_games = [item for item in games if item.get("state") == "upcoming"]
lines = []
lines.append("Epic 官方免费活动播报")
lines.append(f"地区: {country} 语言: {locale}")
lines.append(f"官方页: https://store.epicgames.com/{locale}/free-games")
lines.append("")
if current_games:
lines.append(f"当前可领取 ({len(current_games)}):")
for idx, item in enumerate(current_games, 1):
lines.append(f"{idx}. {item.get('title')}")
lines.append(f" 发行商: {item.get('seller') or '-'}")
lines.append(f" 价格: {item.get('original_price_text')} -> {item.get('discount_price_text')}")
lines.append(f" 截止: {self._fmt_utc(item.get('end_utc'))}")
lines.append(f" 链接: {item.get('link')}")
else:
lines.append("当前可领取: 暂无")
if include_upcoming:
lines.append("")
if upcoming_games:
lines.append(f"即将免费 ({len(upcoming_games)}):")
for idx, item in enumerate(upcoming_games, 1):
lines.append(f"{idx}. {item.get('title')}")
lines.append(f" 开始: {self._fmt_utc(item.get('start_utc'))}")
lines.append(f" 结束: {self._fmt_utc(item.get('end_utc'))}")
lines.append(f" 链接: {item.get('link')}")
else:
lines.append("即将免费: 暂无")
return "\n".join(lines)