Files
abot/plugins/jd_sign_token/main.py
2025-06-04 09:02:31 +08:00

343 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
import re
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
import aiohttp
import asyncio
from typing import Optional, List, Dict, Any
class QL:
def __init__(self, address: str, id: str, secret: str) -> None:
"""初始化"""
self.address = address
self.id = id
self.secret = secret
self.valid = True
self.auth = None
self._session: Optional[aiohttp.ClientSession] = None
async def login(self) -> bool:
"""异步登录"""
url = f"{self.address}/open/auth/token?client_id={self.id}&client_secret={self.secret}"
try:
async with aiohttp.ClientSession().get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
self.auth = f"{rjson['data']['token_type']} {rjson['data']['token']}"
self.valid = True
return True
else:
logger.info(f"登录失败:{rjson['message']}")
self.valid = False
return False
else:
logger.info(f"登录失败HTTP {response.status}")
self.valid = False
return False
except Exception as e:
self.valid = False
logger.info(f"登录失败:{str(e)}")
return False
async def getEnvs(self) -> list:
"""异步获取环境变量"""
# 每次操作前先登录确保token有效
if not await self.login():
return []
url = f"{self.address}/open/envs?searchValue="
headers = {"Authorization": self.auth}
try:
async with aiohttp.ClientSession().get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
return rjson['data']
else:
logger.info(f"获取环境变量失败:{rjson['message']}")
return []
else:
logger.info(f"获取环境变量失败HTTP {response.status}")
return []
except Exception as e:
logger.info(f"获取环境变量失败:{str(e)}")
return []
async def deleteEnvs(self, ids: list) -> bool:
"""异步删除环境变量"""
# 每次操作前先登录确保token有效
if not await self.login():
return False
url = f"{self.address}/open/envs"
headers = {"Authorization": self.auth, "content-type": "application/json"}
try:
async with aiohttp.ClientSession().delete(url, headers=headers, json=ids,
timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
logger.info(f"删除环境变量成功:{len(ids)}")
return True
else:
logger.info(f"删除环境变量失败:{rjson['message']}")
return False
else:
logger.info(f"删除环境变量失败HTTP {response.status}")
return False
except Exception as e:
logger.info(f"删除环境变量失败:{str(e)}")
return False
async def addEnvs(self, envs: list) -> bool:
"""异步添加环境变量"""
if not await self.login():
return False
url = f"{self.address}/open/envs"
headers = {"Authorization": self.auth, "content-type": "application/json"}
try:
async with aiohttp.ClientSession().post(url, headers=headers, json=envs,
timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
logger.info(f"添加环境变量成功:{len(envs)}")
return True
else:
logger.info(f"添加环境变量失败:{rjson['message']}")
return False
else:
logger.info(f"添加环境变量失败HTTP {response.status}")
return False
except Exception as e:
logger.info(f"添加环境变量失败:{str(e)}")
return False
async def updateEnv(self, env: dict) -> bool:
"""异步更新环境变量"""
if not await self.login():
return False
url = f"{self.address}/open/envs"
headers = {"Authorization": self.auth, "content-type": "application/json"}
try:
async with aiohttp.ClientSession().put(url, headers=headers, json=env, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
rjson = await response.json()
if rjson['code'] == 200:
logger.info(f"更新环境变量成功:{env.get('id')}")
return True
else:
logger.info(f"更新环境变量失败:{rjson['message']}")
return False
else:
logger.info(f"更新环境变量失败HTTP {response.status}")
return False
except Exception as e:
logger.info(f"更新环境变量失败:{str(e)}")
return False
class JDTokenPlugin(MessagePluginInterface):
"""京东签到Token设置插件"""
@property
def name(self) -> str:
return "京东签到Token设置"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供京东签到Token设置功能支持添加和更新Token"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
# 从TOML配置文件加载配置
self._commands = self._config.get("JD_Token", {}).get("command", ["设置京东"])
self.command_format = self._config.get("JD_Token", {}).get("command-format", "设置京东 token内容 备注名称")
self.enable = self._config.get("JD_Token", {}).get("enable", True)
# 从TOML配置文件加载青龙面板配置
ql_host = self._config.get("JD_Token", {}).get("QL_HOST", "http://localhost:5700")
client_id = self._config.get("JD_Token", {}).get("CLIENT_ID", "")
client_secret = self._config.get("JD_Token", {}).get("CLIENT_SECRET", "")
# 初始化青龙面板连接
self.ql = QL(ql_host, client_id, client_secret)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="京东签到Token设置")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查命令格式 - 修改正则表达式使用非贪婪匹配捕获token部分
pattern = r'^设置京东\s+(.+?)\s+([^\s].+)$'
match = re.match(pattern, content)
if not match:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
, sender)
return False, "命令格式错误"
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.JD_TOKEN) == PermissionStatus.DISABLED:
return False, "没有权限"
# 提取token和备注
token = match.group(1)
remark = match.group(2)
# 清理token中的空格
token = token.replace(" ", "")
# 确保token格式正确
if "pt_key=" not in token or "pt_pin=" not in token:
await bot.send_text_message((roomid if roomid else sender),
f"❌ Token格式错误正确格式应为pt_key=xxx;pt_pin=xxx;",
sender)
return False, "Token格式错误"
# 标准化token格式
# 1. 确保pt_key和pt_pin之间有分号
if "pt_key=" in token and "pt_pin=" in token:
# 提取pt_key和pt_pin部分
pt_key_part = re.search(r'pt_key=[^;]*', token)
pt_pin_part = re.search(r'pt_pin=[^;]*', token)
if pt_key_part and pt_pin_part:
# 重新组合token确保格式正确
token = f"{pt_key_part.group(0)};{pt_pin_part.group(0)};"
# 确保token以分号结尾
if not token.endswith(";"):
token += ";"
self.LOG.info(f"处理后的token格式: {token}")
try:
# 设置京东Token
result = self.set_jd_token(token, remark)
await bot.send_text_message((roomid if roomid else sender), result, sender)
return True, "处理成功"
except Exception as e:
self.LOG.error(f"处理京东Token设置请求出错: {e}")
await bot.send_text_message((roomid if roomid else sender), f"❌处理出错: {str(e)}", sender)
return False, f"处理出错: {e}"
def set_jd_token(self, token: str, remark: str) -> str:
"""设置京东Token"""
if not self.ql.valid:
return f"❌ 青龙面板连接失败,请检查配置"
# 检查是否已存在相同备注的环境变量
envs = self.ql.getEnvs()
if not envs:
return f"❌ 获取环境变量失败"
# 从当前token中提取pt_pin
pt_pin_match = re.search(r'pt_pin=([^;]*)', token)
if not pt_pin_match:
return f"❌ 无法从Token中提取pt_pin信息"
current_pt_pin = pt_pin_match.group(1)
self.LOG.info(f"当前Token的pt_pin: {current_pt_pin}")
# 查找是否有相同pt_pin的JD_COOKIE
existing_env = None
env_id = None
for env in envs:
if env.get('name') == 'JD_COOKIE':
# 从已存在的环境变量中提取pt_pin
env_pt_pin_match = re.search(r'pt_pin=([^;]*)', env.get('value', ''))
if env_pt_pin_match and env_pt_pin_match.group(1) == current_pt_pin:
existing_env = env
env_id = env.get('id')
break
result = False
self.LOG.debug(f"existing_env: {existing_env}")
if existing_env:
# 更新已存在的环境变量,保留原有的备注或使用新的备注
existing_remarks = existing_env.get('remarks', '')
final_remarks = remark if remark else existing_remarks
existing_env['value'] = token
env_update: dict = {"id": env_id, "value": token, "remarks": final_remarks, "name": "JD_COOKIE"}
result = self.ql.updateEnv(env_update)
if result:
return f"✅ 已成功更新京东账号 [{final_remarks}] 的Token (pt_pin: {current_pt_pin})"
else:
return f"❌ 更新京东账号 [{final_remarks}] 的Token失败"
else:
# 添加新的环境变量
new_env = [{"name": "JD_COOKIE", "value": token, "remarks": remark}]
result = self.ql.addEnvs(new_env)
if result:
return f"✅ 已成功添加京东账号 [{remark}] 的Token (pt_pin: {current_pt_pin})"
else:
return f"❌ 添加京东账号 [{remark}] 的Token失败"