382 lines
16 KiB
Python
382 lines
16 KiB
Python
import os
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Optional, List, Dict, Any
|
||
from typing import Tuple
|
||
|
||
import aiohttp
|
||
from loguru import logger
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.robot_cmd.robot_command import PermissionStatus, GroupBotManager
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class QL:
|
||
def __init__(self, address: str, id: str, secret: str) -> None:
|
||
"""初始化"""
|
||
self.address = address
|
||
self.id = id
|
||
self.secret = secret
|
||
self.valid = True
|
||
self.auth = None
|
||
self._session: Optional[aiohttp.ClientSession] = None
|
||
|
||
async def login(self) -> bool:
|
||
"""异步登录"""
|
||
url = f"{self.address}/open/auth/token?client_id={self.id}&client_secret={self.secret}"
|
||
try:
|
||
async with aiohttp.ClientSession().get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
|
||
if response.status == 200:
|
||
rjson = await response.json()
|
||
if rjson['code'] == 200:
|
||
self.auth = f"{rjson['data']['token_type']} {rjson['data']['token']}"
|
||
self.valid = True
|
||
return True
|
||
else:
|
||
logger.info(f"登录失败:{rjson['message']}")
|
||
self.valid = False
|
||
return False
|
||
else:
|
||
logger.info(f"登录失败:HTTP {response.status}")
|
||
self.valid = False
|
||
return False
|
||
except Exception as e:
|
||
self.valid = False
|
||
logger.info(f"登录失败:{str(e)}")
|
||
return False
|
||
|
||
async def getEnvs(self) -> list:
|
||
"""异步获取环境变量"""
|
||
# 每次操作前先登录,确保token有效
|
||
if not await self.login():
|
||
return []
|
||
|
||
url = f"{self.address}/open/envs?searchValue="
|
||
headers = {"Authorization": self.auth}
|
||
try:
|
||
async with aiohttp.ClientSession().get(url, headers=headers,
|
||
timeout=aiohttp.ClientTimeout(total=15)) as response:
|
||
if response.status == 200:
|
||
rjson = await response.json()
|
||
if rjson['code'] == 200:
|
||
return rjson['data']
|
||
else:
|
||
logger.info(f"获取环境变量失败:{rjson['message']}")
|
||
return []
|
||
else:
|
||
logger.info(f"获取环境变量失败:HTTP {response.status}")
|
||
return []
|
||
except Exception as e:
|
||
logger.info(f"获取环境变量失败:{str(e)}")
|
||
return []
|
||
|
||
async def deleteEnvs(self, ids: list) -> bool:
|
||
"""异步删除环境变量"""
|
||
# 每次操作前先登录,确保token有效
|
||
if not await self.login():
|
||
return False
|
||
|
||
url = f"{self.address}/open/envs"
|
||
headers = {"Authorization": self.auth, "content-type": "application/json"}
|
||
try:
|
||
async with aiohttp.ClientSession().delete(url, headers=headers, json=ids,
|
||
timeout=aiohttp.ClientTimeout(total=15)) as response:
|
||
if response.status == 200:
|
||
rjson = await response.json()
|
||
if rjson['code'] == 200:
|
||
logger.info(f"删除环境变量成功:{len(ids)}")
|
||
return True
|
||
else:
|
||
logger.info(f"删除环境变量失败:{rjson['message']}")
|
||
return False
|
||
else:
|
||
logger.info(f"删除环境变量失败:HTTP {response.status}")
|
||
return False
|
||
except Exception as e:
|
||
logger.info(f"删除环境变量失败:{str(e)}")
|
||
return False
|
||
|
||
async def addEnvs(self, envs: list) -> bool:
|
||
"""异步添加环境变量"""
|
||
if not await self.login():
|
||
return False
|
||
|
||
url = f"{self.address}/open/envs"
|
||
headers = {"Authorization": self.auth, "content-type": "application/json"}
|
||
try:
|
||
async with aiohttp.ClientSession().post(url, headers=headers, json=envs,
|
||
timeout=aiohttp.ClientTimeout(total=15)) as response:
|
||
if response.status == 200:
|
||
rjson = await response.json()
|
||
if rjson['code'] == 200:
|
||
logger.info(f"添加环境变量成功:{len(envs)}")
|
||
return True
|
||
else:
|
||
logger.info(f"添加环境变量失败:{rjson['message']}")
|
||
return False
|
||
else:
|
||
logger.info(f"添加环境变量失败:HTTP {response.status}")
|
||
return False
|
||
except Exception as e:
|
||
logger.info(f"添加环境变量失败:{str(e)}")
|
||
return False
|
||
|
||
async def updateEnv(self, env: dict) -> bool:
|
||
"""异步更新环境变量"""
|
||
if not await self.login():
|
||
return False
|
||
|
||
url = f"{self.address}/open/envs"
|
||
headers = {"Authorization": self.auth, "content-type": "application/json"}
|
||
try:
|
||
async with aiohttp.ClientSession().put(url, headers=headers, json=env,
|
||
timeout=aiohttp.ClientTimeout(total=15)) as response:
|
||
if response.status == 200:
|
||
rjson = await response.json()
|
||
if rjson['code'] == 200:
|
||
logger.info(f"更新环境变量成功:{env.get('id')}")
|
||
return True
|
||
else:
|
||
logger.info(f"更新环境变量失败:{rjson['message']}")
|
||
return False
|
||
else:
|
||
logger.info(f"更新环境变量失败:HTTP {response.status}")
|
||
return False
|
||
except Exception as e:
|
||
logger.info(f"更新环境变量失败:{str(e)}")
|
||
return False
|
||
|
||
|
||
class JDTokenPlugin(MessagePluginInterface):
|
||
"""京东签到Token设置插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "JD_TOKEN"
|
||
FEATURE_DESCRIPTION = "🔑 JD_京豆token设置 [设置京东 pt_key=xxx;pt_pin=xxx; 备注名称]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "京东签到Token设置"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供京东签到Token设置功能,支持添加和更新Token"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "liu.wei"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
# 注册功能权限
|
||
self.feature = self.register_feature()
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logger
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.event_system = context.get("event_system")
|
||
|
||
# 从TOML配置文件加载配置
|
||
self._commands = self._config.get("JD_Token", {}).get("command", ["设置京东"])
|
||
self.command_format = self._config.get("JD_Token", {}).get("command-format", "设置京东 token内容 备注名称")
|
||
self.enable = self._config.get("JD_Token", {}).get("enable", True)
|
||
|
||
# 从TOML配置文件加载青龙面板配置
|
||
ql_host = self._config.get("JD_Token", {}).get("QL_HOST", "http://localhost:5700")
|
||
client_id = self._config.get("JD_Token", {}).get("CLIENT_ID", "")
|
||
client_secret = self._config.get("JD_Token", {}).get("CLIENT_SECRET", "")
|
||
|
||
# 初始化青龙面板连接
|
||
self.ql = QL(ql_host, client_id, client_secret)
|
||
|
||
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
command = content.split(" ")[0]
|
||
|
||
return command in self._commands
|
||
|
||
@plugin_stats_decorator(plugin_name="京东签到Token设置")
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
command = content.split(" ")[0]
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
bot: WechatAPIClient = message.get("bot")
|
||
|
||
# 先去除内容中"设置京东"之后的所有空格,便于后续处理
|
||
# 保留命令部分,去除后面的所有空格
|
||
parts = content.split(" ", 1) # 只分割一次
|
||
if len(parts) < 2:
|
||
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
|
||
, sender)
|
||
return False, "命令格式错误"
|
||
|
||
# 去除token部分的空格(只去除空格,保留分号)
|
||
token_part = parts[1].replace(" ", "")
|
||
|
||
# 检查格式:token部分应该包含 pt_key= 和 pt_pin=,以及最后的备注
|
||
# 备注可能包含在分号后面,所以需要智能分割
|
||
# 查找最后一个分号的位置,最后一个分号后面是备注
|
||
last_semicolon_pos = token_part.rfind(";")
|
||
if last_semicolon_pos == -1 or last_semicolon_pos == len(token_part) - 1:
|
||
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
|
||
, sender)
|
||
return False, "命令格式错误"
|
||
|
||
token = token_part[:last_semicolon_pos + 1] # 包含最后一个分号
|
||
remark = token_part[last_semicolon_pos + 1:] # 分号后面的内容
|
||
|
||
if not remark:
|
||
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
|
||
, sender)
|
||
return False, "备注不能为空"
|
||
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
# token已经去除空格,直接验证格式
|
||
|
||
# 确保token格式正确
|
||
if "pt_key=" not in token or "pt_pin=" not in token:
|
||
await bot.send_text_message((roomid if roomid else sender),
|
||
f"❌ Token格式错误!正确格式应为:pt_key=xxx;pt_pin=xxx;",
|
||
sender)
|
||
return False, "Token格式错误"
|
||
|
||
# 标准化token格式
|
||
# 1. 确保pt_key和pt_pin之间有分号
|
||
if "pt_key=" in token and "pt_pin=" in token:
|
||
# 提取pt_key和pt_pin部分
|
||
pt_key_part = re.search(r'pt_key=[^;]*', token)
|
||
pt_pin_part = re.search(r'pt_pin=[^;]*', token)
|
||
|
||
if pt_key_part and pt_pin_part:
|
||
# 重新组合token,确保格式正确
|
||
token = f"{pt_key_part.group(0)};{pt_pin_part.group(0)};"
|
||
|
||
# 确保token以分号结尾
|
||
if not token.endswith(";"):
|
||
token += ";"
|
||
|
||
self.LOG.info(f"处理后的token格式: {token}")
|
||
try:
|
||
# 设置京东Token
|
||
result = await self.set_jd_token(token, remark)
|
||
await bot.send_text_message((roomid if roomid else sender), result, sender)
|
||
|
||
# 发送二维码图片
|
||
qrcode_path = Path(os.path.join(os.path.dirname(os.path.abspath(__file__)), "qrcode.png"))
|
||
if qrcode_path.exists():
|
||
await bot.send_image_message((roomid if roomid else sender), qrcode_path)
|
||
else:
|
||
self.LOG.warning(f"二维码图片不存在: {qrcode_path}")
|
||
|
||
return True, "处理成功"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"处理京东Token设置请求出错: {e}")
|
||
await bot.send_text_message((roomid if roomid else sender), f"❌处理出错: {str(e)}", sender)
|
||
return False, f"处理出错: {e}"
|
||
|
||
async def set_jd_token(self, token: str, remark: str) -> str:
|
||
"""设置京东Token"""
|
||
if not self.ql.valid:
|
||
return f"❌ 青龙面板连接失败,请检查配置"
|
||
|
||
# 检查是否已存在相同备注的环境变量
|
||
envs = await self.ql.getEnvs()
|
||
if not envs:
|
||
return f"❌ 获取环境变量失败"
|
||
|
||
# 从当前token中提取pt_pin
|
||
pt_pin_match = re.search(r'pt_pin=([^;]*)', token)
|
||
if not pt_pin_match:
|
||
return f"❌ 无法从Token中提取pt_pin信息"
|
||
|
||
current_pt_pin = pt_pin_match.group(1)
|
||
self.LOG.info(f"当前Token的pt_pin: {current_pt_pin}")
|
||
|
||
# 查找是否有相同pt_pin的JD_COOKIE
|
||
existing_env = None
|
||
env_id = None
|
||
for env in envs:
|
||
if env.get('name') == 'JD_COOKIE':
|
||
# 从已存在的环境变量中提取pt_pin
|
||
env_pt_pin_match = re.search(r'pt_pin=([^;]*)', env.get('value', ''))
|
||
if env_pt_pin_match and env_pt_pin_match.group(1) == current_pt_pin:
|
||
existing_env = env
|
||
env_id = env.get('id')
|
||
break
|
||
|
||
result = False
|
||
self.LOG.debug(f"existing_env: {existing_env}")
|
||
if existing_env:
|
||
# 更新已存在的环境变量,保留原有的备注或使用新的备注
|
||
existing_remarks = existing_env.get('remarks', '')
|
||
final_remarks = remark if remark else existing_remarks
|
||
|
||
existing_env['value'] = token
|
||
env_update: dict = {"id": env_id, "value": token, "remarks": final_remarks, "name": "JD_COOKIE"}
|
||
result = await self.ql.updateEnv(env_update)
|
||
if result:
|
||
return f"✅ 已成功更新京东账号 [{final_remarks}] 的Token (pt_pin: {current_pt_pin})"
|
||
else:
|
||
return f"❌ 更新京东账号 [{final_remarks}] 的Token失败"
|
||
else:
|
||
# 添加新的环境变量
|
||
new_env = [{"name": "JD_COOKIE", "value": token, "remarks": remark}]
|
||
result = await self.ql.addEnvs(new_env)
|
||
if result:
|
||
return f"✅ 已成功添加京东账号 [{remark}] 的Token (pt_pin: {current_pt_pin})"
|
||
else:
|
||
return f"❌ 添加京东账号 [{remark}] 的Token失败"
|