Files
abot/plugins/jd_sign_token/main.py
2026-01-16 13:34:37 +08:00

382 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
from pathlib import Path
from typing import Optional, List, Dict, Any
from typing import Tuple
import aiohttp
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
class QL:
def __init__(self, address: str, id: str, secret: str) -> None:
"""初始化"""
self.address = address
self.id = id
self.secret = secret
self.valid = True
self.auth = None
self._session: Optional[aiohttp.ClientSession] = None
async def login(self) -> bool:
"""异步登录"""
url = f"{self.address}/open/auth/token?client_id={self.id}&client_secret={self.secret}"
try:
async with aiohttp.ClientSession().get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
self.auth = f"{rjson['data']['token_type']} {rjson['data']['token']}"
self.valid = True
return True
else:
logger.info(f"登录失败:{rjson['message']}")
self.valid = False
return False
else:
logger.info(f"登录失败HTTP {response.status}")
self.valid = False
return False
except Exception as e:
self.valid = False
logger.info(f"登录失败:{str(e)}")
return False
async def getEnvs(self) -> list:
"""异步获取环境变量"""
# 每次操作前先登录确保token有效
if not await self.login():
return []
url = f"{self.address}/open/envs?searchValue="
headers = {"Authorization": self.auth}
try:
async with aiohttp.ClientSession().get(url, headers=headers,
timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
return rjson['data']
else:
logger.info(f"获取环境变量失败:{rjson['message']}")
return []
else:
logger.info(f"获取环境变量失败HTTP {response.status}")
return []
except Exception as e:
logger.info(f"获取环境变量失败:{str(e)}")
return []
async def deleteEnvs(self, ids: list) -> bool:
"""异步删除环境变量"""
# 每次操作前先登录确保token有效
if not await self.login():
return False
url = f"{self.address}/open/envs"
headers = {"Authorization": self.auth, "content-type": "application/json"}
try:
async with aiohttp.ClientSession().delete(url, headers=headers, json=ids,
timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
logger.info(f"删除环境变量成功:{len(ids)}")
return True
else:
logger.info(f"删除环境变量失败:{rjson['message']}")
return False
else:
logger.info(f"删除环境变量失败HTTP {response.status}")
return False
except Exception as e:
logger.info(f"删除环境变量失败:{str(e)}")
return False
async def addEnvs(self, envs: list) -> bool:
"""异步添加环境变量"""
if not await self.login():
return False
url = f"{self.address}/open/envs"
headers = {"Authorization": self.auth, "content-type": "application/json"}
try:
async with aiohttp.ClientSession().post(url, headers=headers, json=envs,
timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
logger.info(f"添加环境变量成功:{len(envs)}")
return True
else:
logger.info(f"添加环境变量失败:{rjson['message']}")
return False
else:
logger.info(f"添加环境变量失败HTTP {response.status}")
return False
except Exception as e:
logger.info(f"添加环境变量失败:{str(e)}")
return False
async def updateEnv(self, env: dict) -> bool:
"""异步更新环境变量"""
if not await self.login():
return False
url = f"{self.address}/open/envs"
headers = {"Authorization": self.auth, "content-type": "application/json"}
try:
async with aiohttp.ClientSession().put(url, headers=headers, json=env,
timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
logger.info(f"更新环境变量成功:{env.get('id')}")
return True
else:
logger.info(f"更新环境变量失败:{rjson['message']}")
return False
else:
logger.info(f"更新环境变量失败HTTP {response.status}")
return False
except Exception as e:
logger.info(f"更新环境变量失败:{str(e)}")
return False
class JDTokenPlugin(MessagePluginInterface):
"""京东签到Token设置插件"""
# 功能权限常量
FEATURE_KEY = "JD_TOKEN"
FEATURE_DESCRIPTION = "🔑 JD_京豆token设置 [设置京东 pt_key=xxx;pt_pin=xxx; 备注名称]"
@property
def name(self) -> str:
return "京东签到Token设置"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供京东签到Token设置功能支持添加和更新Token"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
# 注册功能权限
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
# 从TOML配置文件加载配置
self._commands = self._config.get("JD_Token", {}).get("command", ["设置京东"])
self.command_format = self._config.get("JD_Token", {}).get("command-format", "设置京东 token内容 备注名称")
self.enable = self._config.get("JD_Token", {}).get("enable", True)
# 从TOML配置文件加载青龙面板配置
ql_host = self._config.get("JD_Token", {}).get("QL_HOST", "http://localhost:5700")
client_id = self._config.get("JD_Token", {}).get("CLIENT_ID", "")
client_secret = self._config.get("JD_Token", {}).get("CLIENT_SECRET", "")
# 初始化青龙面板连接
self.ql = QL(ql_host, client_id, client_secret)
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="京东签到Token设置")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 先去除内容中"设置京东"之后的所有空格,便于后续处理
# 保留命令部分,去除后面的所有空格
parts = content.split(" ", 1) # 只分割一次
if len(parts) < 2:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
, sender)
return False, "命令格式错误"
# 去除token部分的空格只去除空格保留分号
token_part = parts[1].replace(" ", "")
# 检查格式token部分应该包含 pt_key= 和 pt_pin=,以及最后的备注
# 备注可能包含在分号后面,所以需要智能分割
# 查找最后一个分号的位置,最后一个分号后面是备注
last_semicolon_pos = token_part.rfind(";")
if last_semicolon_pos == -1 or last_semicolon_pos == len(token_part) - 1:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
, sender)
return False, "命令格式错误"
token = token_part[:last_semicolon_pos + 1] # 包含最后一个分号
remark = token_part[last_semicolon_pos + 1:] # 分号后面的内容
if not remark:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
, sender)
return False, "备注不能为空"
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# token已经去除空格直接验证格式
# 确保token格式正确
if "pt_key=" not in token or "pt_pin=" not in token:
await bot.send_text_message((roomid if roomid else sender),
f"❌ Token格式错误正确格式应为pt_key=xxx;pt_pin=xxx;",
sender)
return False, "Token格式错误"
# 标准化token格式
# 1. 确保pt_key和pt_pin之间有分号
if "pt_key=" in token and "pt_pin=" in token:
# 提取pt_key和pt_pin部分
pt_key_part = re.search(r'pt_key=[^;]*', token)
pt_pin_part = re.search(r'pt_pin=[^;]*', token)
if pt_key_part and pt_pin_part:
# 重新组合token确保格式正确
token = f"{pt_key_part.group(0)};{pt_pin_part.group(0)};"
# 确保token以分号结尾
if not token.endswith(";"):
token += ";"
self.LOG.info(f"处理后的token格式: {token}")
try:
# 设置京东Token
result = await self.set_jd_token(token, remark)
await bot.send_text_message((roomid if roomid else sender), result, sender)
# 发送二维码图片
qrcode_path = Path(os.path.join(os.path.dirname(os.path.abspath(__file__)), "qrcode.png"))
if qrcode_path.exists():
await bot.send_image_message((roomid if roomid else sender), qrcode_path)
else:
self.LOG.warning(f"二维码图片不存在: {qrcode_path}")
return True, "处理成功"
except Exception as e:
self.LOG.error(f"处理京东Token设置请求出错: {e}")
await bot.send_text_message((roomid if roomid else sender), f"❌处理出错: {str(e)}", sender)
return False, f"处理出错: {e}"
async def set_jd_token(self, token: str, remark: str) -> str:
"""设置京东Token"""
if not self.ql.valid:
return f"❌ 青龙面板连接失败,请检查配置"
# 检查是否已存在相同备注的环境变量
envs = await self.ql.getEnvs()
if not envs:
return f"❌ 获取环境变量失败"
# 从当前token中提取pt_pin
pt_pin_match = re.search(r'pt_pin=([^;]*)', token)
if not pt_pin_match:
return f"❌ 无法从Token中提取pt_pin信息"
current_pt_pin = pt_pin_match.group(1)
self.LOG.info(f"当前Token的pt_pin: {current_pt_pin}")
# 查找是否有相同pt_pin的JD_COOKIE
existing_env = None
env_id = None
for env in envs:
if env.get('name') == 'JD_COOKIE':
# 从已存在的环境变量中提取pt_pin
env_pt_pin_match = re.search(r'pt_pin=([^;]*)', env.get('value', ''))
if env_pt_pin_match and env_pt_pin_match.group(1) == current_pt_pin:
existing_env = env
env_id = env.get('id')
break
result = False
self.LOG.debug(f"existing_env: {existing_env}")
if existing_env:
# 更新已存在的环境变量,保留原有的备注或使用新的备注
existing_remarks = existing_env.get('remarks', '')
final_remarks = remark if remark else existing_remarks
existing_env['value'] = token
env_update: dict = {"id": env_id, "value": token, "remarks": final_remarks, "name": "JD_COOKIE"}
result = await self.ql.updateEnv(env_update)
if result:
return f"✅ 已成功更新京东账号 [{final_remarks}] 的Token (pt_pin: {current_pt_pin})"
else:
return f"❌ 更新京东账号 [{final_remarks}] 的Token失败"
else:
# 添加新的环境变量
new_env = [{"name": "JD_COOKIE", "value": token, "remarks": remark}]
result = await self.ql.addEnvs(new_env)
if result:
return f"✅ 已成功添加京东账号 [{remark}] 的Token (pt_pin: {current_pt_pin})"
else:
return f"❌ 添加京东账号 [{remark}] 的Token失败"