diff --git a/plugins/jd_sign_token/main.py b/plugins/jd_sign_token/main.py index fd59652..33f12e0 100644 --- a/plugins/jd_sign_token/main.py +++ b/plugins/jd_sign_token/main.py @@ -1,7 +1,5 @@ from loguru import logger -import requests import re -from json import dumps as jsonDumps from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface @@ -10,133 +8,142 @@ from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient +import aiohttp +import asyncio +from typing import Optional, List, Dict, Any + class QL: def __init__(self, address: str, id: str, secret: str) -> None: - """ - 初始化 - """ + """初始化""" self.address = address self.id = id self.secret = secret self.valid = True self.auth = None + self._session: Optional[aiohttp.ClientSession] = None - def log(self, content: str) -> None: - """ - 日志 - """ - print(content) - - def login(self) -> bool: - """ - 登录 - """ + async def login(self) -> bool: + """异步登录""" url = f"{self.address}/open/auth/token?client_id={self.id}&client_secret={self.secret}" try: - # 添加超时参数 - rjson = requests.get(url, timeout=(5, 15)).json() - if (rjson['code'] == 200): - self.auth = f"{rjson['data']['token_type']} {rjson['data']['token']}" - self.valid = True - return True - else: - self.log(f"登录失败:{rjson['message']}") - self.valid = False - return False + async with aiohttp.ClientSession().get(url, timeout=aiohttp.ClientTimeout(total=15)) as response: + if response.status == 200: + rjson = await response.json() + if rjson['code'] == 200: + self.auth = f"{rjson['data']['token_type']} {rjson['data']['token']}" + self.valid = True + return True + else: + logger.info(f"登录失败:{rjson['message']}") + self.valid = False + return False + else: + logger.info(f"登录失败:HTTP {response.status}") + self.valid = False + return False except Exception as e: self.valid = False - self.log(f"登录失败:{str(e)}") + logger.info(f"登录失败:{str(e)}") return False - def getEnvs(self) -> list: - """ - 获取环境变量 - """ + async def getEnvs(self) -> list: + """异步获取环境变量""" # 每次操作前先登录,确保token有效 - if not self.login(): + if not await self.login(): return [] - + url = f"{self.address}/open/envs?searchValue=" headers = {"Authorization": self.auth} try: - # 添加超时参数 - rjson = requests.get(url, headers=headers, timeout=(5, 15)).json() - if (rjson['code'] == 200): - return rjson['data'] - else: - self.log(f"获取环境变量失败:{rjson['message']}") - return [] + async with aiohttp.ClientSession().get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as response: + if response.status == 200: + rjson = await response.json() + if rjson['code'] == 200: + return rjson['data'] + else: + logger.info(f"获取环境变量失败:{rjson['message']}") + return [] + else: + logger.info(f"获取环境变量失败:HTTP {response.status}") + return [] except Exception as e: - self.log(f"获取环境变量失败:{str(e)}") + logger.info(f"获取环境变量失败:{str(e)}") return [] - def deleteEnvs(self, ids: list) -> bool: - """ - 删除环境变量 - """ + async def deleteEnvs(self, ids: list) -> bool: + """异步删除环境变量""" # 每次操作前先登录,确保token有效 - if not self.login(): - return False - - url = f"{self.address}/open/envs" - headers = {"Authorization": self.auth, "content-type": "application/json"} - try: - rjson = requests.delete(url, headers=headers, data=jsonDumps(ids)).json() - if (rjson['code'] == 200): - self.log(f"删除环境变量成功:{len(ids)}") - return True - else: - self.log(f"删除环境变量失败:{rjson['message']}") - return False - except Exception as e: - self.log(f"删除环境变量失败:{str(e)}") + if not await self.login(): return False - def addEnvs(self, envs: list) -> bool: - """ - 新建环境变量 - """ - # 每次操作前先登录,确保token有效 - if not self.login(): - return False - url = f"{self.address}/open/envs" headers = {"Authorization": self.auth, "content-type": "application/json"} try: - # 添加超时参数 - rjson = requests.post(url, headers=headers, data=jsonDumps(envs), timeout=(5, 15)).json() - if (rjson['code'] == 200): - self.log(f"新建环境变量成功:{len(envs)}") - return True - else: - self.log(f"新建环境变量失败:{rjson['message']}") - return False + async with aiohttp.ClientSession().delete(url, headers=headers, json=ids, + timeout=aiohttp.ClientTimeout(total=15)) as response: + if response.status == 200: + rjson = await response.json() + if rjson['code'] == 200: + logger.info(f"删除环境变量成功:{len(ids)}") + return True + else: + logger.info(f"删除环境变量失败:{rjson['message']}") + return False + else: + logger.info(f"删除环境变量失败:HTTP {response.status}") + return False except Exception as e: - self.log(f"新建环境变量失败:{str(e)}") + logger.info(f"删除环境变量失败:{str(e)}") return False - def updateEnv(self, env: dict) -> bool: - """ - 更新环境变量 - """ - # 每次操作前先登录,确保token有效 - if not self.login(): + async def addEnvs(self, envs: list) -> bool: + """异步添加环境变量""" + if not await self.login(): return False - + url = f"{self.address}/open/envs" headers = {"Authorization": self.auth, "content-type": "application/json"} try: - # 添加超时参数 - rjson = requests.put(url, headers=headers, data=jsonDumps(env), timeout=(5, 15)).json() - if (rjson['code'] == 200): - self.log(f"更新环境变量成功") - return True - else: - self.log(f"更新环境变量失败:{rjson['message']}") - return False + async with aiohttp.ClientSession().post(url, headers=headers, json=envs, + timeout=aiohttp.ClientTimeout(total=15)) as response: + if response.status == 200: + rjson = await response.json() + if rjson['code'] == 200: + logger.info(f"添加环境变量成功:{len(envs)}") + return True + else: + logger.info(f"添加环境变量失败:{rjson['message']}") + return False + else: + logger.info(f"添加环境变量失败:HTTP {response.status}") + return False except Exception as e: - self.log(f"更新环境变量失败:{str(e)}") + logger.info(f"添加环境变量失败:{str(e)}") + return False + + async def updateEnv(self, env: dict) -> bool: + """异步更新环境变量""" + if not await self.login(): + return False + + url = f"{self.address}/open/envs" + headers = {"Authorization": self.auth, "content-type": "application/json"} + try: + async with aiohttp.ClientSession().put(url, headers=headers, json=env, timeout=aiohttp.ClientTimeout(total=15)) as response: + if response.status == 200: + rjson = await response.json() + if rjson['code'] == 200: + logger.info(f"更新环境变量成功:{env.get('id')}") + return True + else: + logger.info(f"更新环境变量失败:{rjson['message']}") + return False + else: + logger.info(f"更新环境变量失败:HTTP {response.status}") + return False + except Exception as e: + logger.info(f"更新环境变量失败:{str(e)}") return False @@ -243,32 +250,32 @@ class JDTokenPlugin(MessagePluginInterface): # 提取token和备注 token = match.group(1) remark = match.group(2) - + # 清理token中的空格 token = token.replace(" ", "") - + # 确保token格式正确 if "pt_key=" not in token or "pt_pin=" not in token: await bot.send_text_message((roomid if roomid else sender), f"❌ Token格式错误!正确格式应为:pt_key=xxx;pt_pin=xxx;", sender) return False, "Token格式错误" - + # 标准化token格式 # 1. 确保pt_key和pt_pin之间有分号 if "pt_key=" in token and "pt_pin=" in token: # 提取pt_key和pt_pin部分 pt_key_part = re.search(r'pt_key=[^;]*', token) pt_pin_part = re.search(r'pt_pin=[^;]*', token) - + if pt_key_part and pt_pin_part: # 重新组合token,确保格式正确 token = f"{pt_key_part.group(0)};{pt_pin_part.group(0)};" - + # 确保token以分号结尾 if not token.endswith(";"): token += ";" - + self.LOG.info(f"处理后的token格式: {token}") try: # 设置京东Token @@ -290,12 +297,12 @@ class JDTokenPlugin(MessagePluginInterface): envs = self.ql.getEnvs() if not envs: return f"❌ 获取环境变量失败" - + # 从当前token中提取pt_pin pt_pin_match = re.search(r'pt_pin=([^;]*)', token) if not pt_pin_match: return f"❌ 无法从Token中提取pt_pin信息" - + current_pt_pin = pt_pin_match.group(1) self.LOG.info(f"当前Token的pt_pin: {current_pt_pin}") diff --git a/utils/markdown_to_image.py b/utils/markdown_to_image.py index 24d3cb5..9957acb 100644 --- a/utils/markdown_to_image.py +++ b/utils/markdown_to_image.py @@ -1,4 +1,5 @@ import subprocess +import time import markdown from playwright.async_api import async_playwright @@ -235,7 +236,8 @@ async def convert_md_str_to_image(md_content, output_image): """ 将 Markdown 字符串转换为图片(异步)。 """ - temp_html = 'temp_output.html' + timestamp = int(time.time()) + temp_html = f"temp_output_{timestamp}.html" md_str_to_html(md_content, temp_html) await html_to_image(temp_html, output_image) os.remove(temp_html)