diff --git a/douyin_parser/__init__.py b/douyin_parser/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/douyin_parser/main.py b/douyin_parser/main.py deleted file mode 100644 index 5945ac0..0000000 --- a/douyin_parser/main.py +++ /dev/null @@ -1,229 +0,0 @@ -import logging -import os -import re -import time -import tomllib -import traceback -import requests -from typing import Dict, Any - -from wcferry import WxMsg, Wcf - -from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus - - -class DouyinParserError(Exception): - """抖音解析器自定义异常基类""" - pass - - -class DouyinParser: - description = "抖音无水印解析插件" - author = "姜不吃先生" - version = "1.0.2" - - def __init__(self, wcf: Wcf, gbm: GroupBotManager): - self.url_pattern = re.compile(r'https?://v\.douyin\.com/\w+/?') - self.LOG = logging.getLogger(__name__) - self.wcf = wcf - self.gbm = gbm - with open("douyin_parser/config.toml", "rb") as f: - plugin_config = tomllib.load(f) - - config = plugin_config["Douyin"] - - self.enable = config.get("enable", True) - self.http_proxy = config.get("http_proxy", None) - self.LOG.info("[抖音] 插件初始化完成,代理设置: %s", self.http_proxy) - - def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]: - if not data: - return data - data[ - 'cover'] = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg" - - return data - - def _clean_url(self, url: str) -> str: - cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '') - self.LOG.debug("[抖音] 清理后的URL: %s", cleaned_url) - return cleaned_url - - def _get_real_video_url(self, video_url: str) -> str: - """获取真实视频链接""" - max_retries = 3 # 最大重试次数 - retry_delay = 2 # 重试延迟秒数 - max_redirects = 10 # 最大重定向次数,防止死循环 - proxies = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None - redirect_history = [] - - for retry in range(max_retries): - try: - self.LOG.info("[抖音] 开始获取真实视频链接: %s (第%d次尝试)", video_url, retry + 1) - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', - 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', - 'Range': 'bytes=0-' - } - - # 默认使用 allow_redirects=True 获取历史记录 - response = requests.get(video_url, headers=headers, proxies=proxies, allow_redirects=True, timeout=60) - - if response.history: - redirect_history = [resp.url for resp in response.history] - real_url = response.url - else: - # response.history 为空,手动解析重定向 - current_url = video_url - for _ in range(max_redirects): # 限制最大重定向次数 - resp = requests.get(current_url, headers=headers, proxies=proxies, allow_redirects=False, - timeout=60) - new_url = resp.headers.get('Location') - - if not new_url: - break # 没有新的 Location,停止 - - if not new_url.startswith("http"): - from urllib.parse import urljoin - new_url = urljoin(current_url, new_url) # 处理相对路径重定向 - - if new_url in redirect_history: - self.LOG.info("[抖音] 检测到循环重定向: %s", new_url) - break # 避免死循环 - - redirect_history.append(new_url) - self.LOG.info("[抖音] 发现重定向: %s -> %s", current_url, new_url) - current_url = new_url - - real_url = current_url - - if redirect_history: - self.LOG.info("[抖音] 重定向历史: %s", redirect_history) - - if real_url != video_url and ('v3-' in real_url.lower() or 'douyinvod.com' in real_url.lower()): - self.LOG.info("[抖音] 成功获取真实链接: %s", real_url) - return real_url - else: - self.LOG.info("[抖音] 未能获取到符合预期的视频链接,准备重试") - if retry < max_retries - 1: - time.sleep(retry_delay) - continue - return video_url - - except Exception as e: - self.LOG.error("[抖音] 获取真实链接失败: %s (第%d次尝试)", str(e), retry + 1) - if retry < max_retries - 1: - time.sleep(retry_delay) - continue - return video_url - - self.LOG.error("[抖音] 获取真实链接失败,已达到最大重试次数") - return video_url - - def _parse_douyin(self, url: str) -> Dict[str, Any]: - try: - api_url = "http://zj.v.api.aa1.cn/api/douyinjx/" - clean_url = self._clean_url(url) - params = {'text': clean_url, 'type': 'json'} - - self.LOG.info("[抖音] 请求API: %s, 参数: %s", api_url, repr(params)) - proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None - response = requests.get(api_url, params=params, timeout=30, proxies=proxy) - - if response.status_code != 200: - raise DouyinParserError(f"API请求失败,状态码: {response.status_code}") - - data = response.json() - self.LOG.info("[抖音] API响应数据: %s", data) - - if data.get("code") == 200: - result = data.get("data", {}) - self.LOG.info("[抖音] API响应数据result: %s", result) - if result.get('video'): - result['video'] = self._get_real_video_url(result['video']) - return self._clean_response_data(result) - else: - raise DouyinParserError(data.get("message", "未知错误")) - except Exception as e: - self.LOG.error("[抖音] 解析过程发生未知错误: %s\n%s", str(e), traceback.format_exc()) - raise DouyinParserError(f"未知错误: {str(e)}") - - def handle_douyin_links(self, message: WxMsg): - if not self.enable: - return - - # 如果触发了指令,但是没有权限,则返回权限不足 - if self.gbm.get_group_permission(message.roomid, Feature.DOUYIN_PARSER) == PermissionStatus.DISABLED: - return - - try: - match = self.url_pattern.search(message.content) - if not match: - return - - original_url = self._clean_url(match.group(0)) - self.LOG.info("发现抖音链接: %s", original_url) - self.LOG.info("检测到抖音分享链接,正在解析无水印视频...") - # self.wcf.send_text(f"检测到抖音分享链接,正在解析无水印视频...", - # (message.roomid if message.from_group() else message.sender), message.sender) - video_info = self._parse_douyin(original_url) - if not video_info: - raise DouyinParserError("无法获取视频信息") - - video_url = video_info.get('video', '') - title = video_info.get('title', '无标题') - author = video_info.get('name', '未知作者') - cover = video_info.get('cover', '') - - if not video_url: - raise DouyinParserError("无法获取视频地址") - - self.wcf.send_rich_text("BOT-PC直接查看", "gh_11", title[:30], f"PC直接查看-{title[:20]} - {author[:10]}", video_url, - cover, - message.roomid) - # self.LOG.info(f"video_url: {video_url}, title: {title}, author: {author}, cover: {cover}") - # mp4_path = self.download_stream(video_url, "douyin_parser/down_load_dir/douyin.mp4") - # self.LOG.info(f"发送抖音视频:{mp4_path}") - # self.wcf.send_file(mp4_path, message.roomid) - except Exception as e: - self.LOG.error("[抖音] 解析过程发生未知错误: %s\n%s", str(e), traceback.format_exc()) - raise DouyinParserError(f"未知错误: {str(e)}") - return - # - # def download_stream(self, url, save_path): - # """ - # 从指定URL读取视频流并保存到本地 - # :param url: 视频流的URL - # :param save_path: 本地保存路径(包含文件名,例如 "video.mp4") - # """ - # try: - # # 发送GET请求,启用流式传输 - # response = requests.get(url, stream=True) - # - # # 检查请求是否成功 - # response.raise_for_status() # 如果状态码不是200,将抛出异常 - # - # # 确保保存路径的目录存在 - # os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True) - # - # # 检查是否是视频流(可选,根据Content-Type判断) - # content_type = response.headers.get("Content-Type", "").lower() - # if "video" not in content_type and "application/octet-stream" not in content_type: - # print(f"警告: 返回的可能不是视频流,Content-Type: {content_type}") - # print("响应内容预览:", response.text[:100]) # 打印前100字符查看 - # return - # - # # 以二进制写入模式保存流数据 - # with open(save_path, "wb") as file: - # for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB - # if chunk: # 过滤空块 - # file.write(chunk) - # print(f"视频已下载到: {save_path}") - # return os.path.abspath(save_path) - # except requests.RequestException as e: - # print(f"请求失败: {e}") - # except IOError as e: - # print(f"文件写入失败: {e}") - # except Exception as e: - # print(f"发生未知错误: {e}") diff --git a/plugins/douyin_parser/__init__.py b/plugins/douyin_parser/__init__.py new file mode 100644 index 0000000..36bcb0b --- /dev/null +++ b/plugins/douyin_parser/__init__.py @@ -0,0 +1,7 @@ +# 从当前包的main模块导入DouyinParserPlugin类 +from .main import DouyinParserPlugin + +# 提供get_plugin函数,返回插件实例 +def get_plugin(): + """获取插件实例""" + return DouyinParserPlugin() \ No newline at end of file diff --git a/douyin_parser/config.toml b/plugins/douyin_parser/config.toml similarity index 72% rename from douyin_parser/config.toml rename to plugins/douyin_parser/config.toml index a2b8e6e..7102377 100644 --- a/douyin_parser/config.toml +++ b/plugins/douyin_parser/config.toml @@ -1,6 +1,9 @@ [Douyin] enable = true +# 发送模式: card(发送卡片) 或 file(下载并发送文件) +download_mode = "card" + # Http代理设置(用于获取真实链接发送卡片,如果家里有ipv6,可以设置为空) # 格式: http://用户名:密码@代理地址:代理端口 # 例如:http://127.0.0.1:7890 diff --git a/plugins/douyin_parser/main.py b/plugins/douyin_parser/main.py new file mode 100644 index 0000000..352eb63 --- /dev/null +++ b/plugins/douyin_parser/main.py @@ -0,0 +1,323 @@ +import logging +import os +import re +import time +import traceback +import requests +from typing import Dict, Any, List, Optional, Tuple + +from wcferry import Wcf, WxMsg + +from plugin_common.message_plugin_interface import MessagePluginInterface +from plugin_common.plugin_interface import PluginStatus +from plugins.stats_collector.decorators import plugin_stats_decorator +from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager + + +class DouyinParserError(Exception): + """抖音解析器自定义异常基类""" + pass + + +class DouyinParserPlugin(MessagePluginInterface): + """抖音无水印解析插件""" + + @property + def name(self) -> str: + return "抖音解析" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "提供抖音链接无水印解析功能,支持视频下载和分享" + + @property + def author(self) -> str: + return "姜不吃先生" + + @property + def command_prefix(self) -> Optional[str]: + return "" # 不需要前缀,直接匹配命令 + + @property + def commands(self) -> List[str]: + return [] # 不使用命令触发,而是通过消息内容匹配 + + def __init__(self): + super().__init__() + self.url_pattern = re.compile(r'https?://v\.douyin\.com/\w+/?') + # 修改为使用插件目录下的down_load_dir文件夹 + self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir") + # 确保下载目录存在 + if not os.path.exists(self.download_dir): + os.makedirs(self.download_dir, exist_ok=True) + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件""" + self.LOG = logging.getLogger(f"Plugin.{self.name}") + self.LOG.info(f"正在初始化 {self.name} 插件...") + + # 保存上下文对象 + self.wcf = context.get("wcf") + self.event_system = context.get("event_system") + self.message_util = context.get("message_util") + self.gbm = context.get("gbm") + + # 从配置中获取参数 + douyin_config = self._config.get("Douyin", {}) + self.enable = douyin_config.get("enable", True) + self.http_proxy = douyin_config.get("http_proxy", "") + self.download_mode = douyin_config.get("download_mode", "card") # card或file + + self.LOG.info(f"[{self.name}] 插件初始化完成,代理设置: {self.http_proxy}") + return True + + def start(self) -> bool: + """启动插件""" + self.LOG.info(f"[{self.name}] 插件已启动") + self.status = PluginStatus.RUNNING + return True + + def stop(self) -> bool: + """停止插件""" + self.LOG.info(f"[{self.name}] 插件已停止") + self.status = PluginStatus.STOPPED + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + """检查是否可以处理该消息""" + if not self.enable: + return False + + content = str(message.get("content", "")).strip() + match = self.url_pattern.search(content) + return match is not None + + @plugin_stats_decorator(plugin_name="抖音解析") + def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """处理消息""" + content = str(message.get("content", "")).strip() + self.LOG.info(f"插件执行: {self.name}:{content}") + sender = message.get("sender") + roomid = message.get("roomid", "") + wcf: Wcf = message.get("wcf") + gbm: GroupBotManager = message.get("gbm") + + # 检查权限 + if roomid and gbm.get_group_permission(roomid, Feature.DOUYIN_PARSER) == PermissionStatus.DISABLED: + return False, "没有权限" + + try: + match = self.url_pattern.search(content) + if not match: + return False, "未找到抖音链接" + + original_url = self._clean_url(match.group(0)) + self.LOG.info(f"发现抖音链接: {original_url}") + + # 解析抖音视频 + video_info = self._parse_douyin(original_url) + if not video_info: + wcf.send_text(f"❌无法解析抖音视频信息", + (roomid if roomid else sender), sender) + return True, "解析失败" + + video_url = video_info.get('video', '') + title = video_info.get('title', '无标题') + author = video_info.get('name', '未知作者') + cover = video_info.get('cover', '') + + if not video_url: + wcf.send_text(f"❌无法获取视频地址", + (roomid if roomid else sender), sender) + return True, "获取视频地址失败" + + # 根据模式选择发送方式 + if self.download_mode == "file": + # 下载并发送文件 + mp4_path = self._download_stream(video_url, os.path.join(self.download_dir, "douyin.mp4")) + if mp4_path: + wcf.send_file(mp4_path, (roomid if roomid else sender)) + return True, "发送视频文件成功" + else: + wcf.send_text(f"❌下载视频失败", + (roomid if roomid else sender), sender) + return True, "下载视频失败" + else: + # 发送卡片 + wcf.send_rich_text( + "BOT-PC直接查看", + "gh_11", + title[:30], + f"PC直接查看-{title[:20]} - {author[:10]}", + video_url, + cover, + (roomid if roomid else sender) + ) + return True, "发送卡片成功" + + except DouyinParserError as e: + self.LOG.error(f"抖音解析错误: {e}") + wcf.send_text(f"❌抖音解析失败: {str(e)}", + (roomid if roomid else sender), sender) + return True, f"解析错误: {e}" + except Exception as e: + self.LOG.error(f"处理抖音链接出错: {e}\n{traceback.format_exc()}") + wcf.send_text(f"❌处理抖音链接出错: {str(e)}", + (roomid if roomid else sender), sender) + return True, f"处理出错: {e}" + + def _clean_url(self, url: str) -> str: + """清理URL""" + cleaned_url = url.strip().replace(';', '').replace('\n', '').replace('\r', '') + self.LOG.debug(f"[抖音] 清理后的URL: {cleaned_url}") + return cleaned_url + + def _clean_response_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """清理响应数据""" + if not data: + return data + data['cover'] = "https://is1-ssl.mzstatic.com/image/thumb/Purple221/v4/7c/49/e1/7c49e1af-ce92-d1c4-9a93-0a316e47ba94/AppIcon_TikTok-0-0-1x_U007epad-0-1-0-0-85-220.png/512x512bb.jpg" + return data + + def _get_real_video_url(self, video_url: str) -> str: + """获取真实视频链接""" + max_retries = 3 # 最大重试次数 + retry_delay = 2 # 重试延迟秒数 + max_redirects = 10 # 最大重定向次数,防止死循环 + proxies = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None + redirect_history = [] + + for retry in range(max_retries): + try: + self.LOG.info(f"[抖音] 开始获取真实视频链接: {video_url} (第{retry + 1}次尝试)") + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Range': 'bytes=0-' + } + + # 默认使用 allow_redirects=True 获取历史记录 + response = requests.get(video_url, headers=headers, proxies=proxies, allow_redirects=True, timeout=60) + + if response.history: + redirect_history = [resp.url for resp in response.history] + real_url = response.url + else: + # response.history 为空,手动解析重定向 + current_url = video_url + for _ in range(max_redirects): # 限制最大重定向次数 + resp = requests.get(current_url, headers=headers, proxies=proxies, allow_redirects=False, + timeout=60) + new_url = resp.headers.get('Location') + + if not new_url: + break # 没有新的 Location,停止 + + if not new_url.startswith("http"): + from urllib.parse import urljoin + new_url = urljoin(current_url, new_url) # 处理相对路径重定向 + + if new_url in redirect_history: + self.LOG.info(f"[抖音] 检测到循环重定向: {new_url}") + break # 避免死循环 + + redirect_history.append(new_url) + self.LOG.info(f"[抖音] 发现重定向: {current_url} -> {new_url}") + current_url = new_url + + real_url = current_url + + if redirect_history: + self.LOG.info(f"[抖音] 重定向历史: {redirect_history}") + + if real_url != video_url and ('v3-' in real_url.lower() or 'douyinvod.com' in real_url.lower()): + self.LOG.info(f"[抖音] 成功获取真实链接: {real_url}") + return real_url + else: + self.LOG.info("[抖音] 未能获取到符合预期的视频链接,准备重试") + if retry < max_retries - 1: + time.sleep(retry_delay) + continue + return video_url + + except Exception as e: + self.LOG.error(f"[抖音] 获取真实链接失败: {str(e)} (第{retry + 1}次尝试)") + if retry < max_retries - 1: + time.sleep(retry_delay) + continue + return video_url + + self.LOG.error("[抖音] 获取真实链接失败,已达到最大重试次数") + return video_url + + def _parse_douyin(self, url: str) -> Dict[str, Any]: + """解析抖音链接""" + try: + api_url = "http://zj.v.api.aa1.cn/api/douyinjx/" + clean_url = self._clean_url(url) + params = {'text': clean_url, 'type': 'json'} + + self.LOG.info(f"[抖音] 请求API: {api_url}, 参数: {repr(params)}") + proxy = {"http": self.http_proxy, "https": self.http_proxy} if self.http_proxy else None + response = requests.get(api_url, params=params, timeout=30, proxies=proxy) + + if response.status_code != 200: + raise DouyinParserError(f"API请求失败,状态码: {response.status_code}") + + data = response.json() + self.LOG.info(f"[抖音] API响应数据: {data}") + + if data.get("code") == 200: + result = data.get("data", {}) + self.LOG.info(f"[抖音] API响应数据result: {result}") + if result.get('video'): + result['video'] = self._get_real_video_url(result['video']) + return self._clean_response_data(result) + else: + raise DouyinParserError(data.get("message", "未知错误")) + except Exception as e: + self.LOG.error(f"[抖音] 解析过程发生未知错误: {str(e)}\n{traceback.format_exc()}") + raise DouyinParserError(f"未知错误: {str(e)}") + + def _download_stream(self, url, save_path): + """ + 从指定URL读取视频流并保存到本地 + :param url: 视频流的URL + :param save_path: 本地保存路径(包含文件名,例如 "video.mp4") + """ + try: + # 发送GET请求,启用流式传输 + response = requests.get(url, stream=True) + + # 检查请求是否成功 + response.raise_for_status() # 如果状态码不是200,将抛出异常 + + # 确保保存路径的目录存在 + os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True) + + # 检查是否是视频流(可选,根据Content-Type判断) + content_type = response.headers.get("Content-Type", "").lower() + if "video" not in content_type and "application/octet-stream" not in content_type: + self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}") + self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看 + return None + + # 以二进制写入模式保存流数据 + with open(save_path, "wb") as file: + for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB + if chunk: # 过滤空块 + file.write(chunk) + self.LOG.info(f"视频已下载到: {save_path}") + return os.path.abspath(save_path) + except requests.RequestException as e: + self.LOG.error(f"请求失败: {e}") + except IOError as e: + self.LOG.error(f"文件写入失败: {e}") + except Exception as e: + self.LOG.error(f"发生未知错误: {e}") + return None \ No newline at end of file diff --git a/plugins/point_trade/__init__.py b/plugins/point_trade/__init__.py new file mode 100644 index 0000000..542231e --- /dev/null +++ b/plugins/point_trade/__init__.py @@ -0,0 +1,7 @@ +# 从当前包的main模块导入PointTradePlugin类 +from .main import PointTradePlugin + +# 提供get_plugin函数,返回插件实例 +def get_plugin(): + """获取插件实例""" + return PointTradePlugin() \ No newline at end of file diff --git a/point_trade/config.toml b/plugins/point_trade/config.toml similarity index 100% rename from point_trade/config.toml rename to plugins/point_trade/config.toml diff --git a/plugins/point_trade/main.py b/plugins/point_trade/main.py new file mode 100644 index 0000000..c26f28e --- /dev/null +++ b/plugins/point_trade/main.py @@ -0,0 +1,281 @@ +import logging +import re +import os +import toml +from datetime import datetime +from typing import Dict, Any, List, Optional, Tuple +import xml.etree.ElementTree as ET + +from wcferry import Wcf + +from plugin_common.message_plugin_interface import MessagePluginInterface +from plugin_common.plugin_interface import PluginStatus +from plugins.stats_collector.decorators import plugin_stats_decorator +from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager + +import mysql.connector.pooling + + +class PointTradePlugin(MessagePluginInterface): + """积分交易插件""" + + @property + def name(self) -> str: + return "积分交易" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "提供积分交易功能,支持用户之间的积分转账" + + @property + def author(self) -> str: + return "水牛" + + @property + def command_prefix(self) -> Optional[str]: + return "" # 不需要前缀,直接匹配命令 + + @property + def commands(self) -> List[str]: + return self._commands + + def __init__(self): + super().__init__() + self.db_pool = None + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件""" + self.LOG = logging.getLogger(f"Plugin.{self.name}") + self.LOG.info(f"正在初始化 {self.name} 插件...") + + # 保存上下文对象 + self.wcf = context.get("wcf") + self.event_system = context.get("event_system") + self.message_util = context.get("message_util") + self.gbm = context.get("gbm") + self.db_pool = context.get("db_pool") + + if not self.db_pool: + self.LOG.error("数据库连接池未初始化,插件无法正常工作") + return False + + # 从配置中获取参数 + point_trade_config = self._config.get("PointTrade", {}) + self._commands = point_trade_config.get("command", ["积分交易", "积分转账", "转账积分"]) + self.command_format = point_trade_config.get("command-format", "积分转账 积分数 @用户") + self.enable = point_trade_config.get("enable", True) + + self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") + return True + + def start(self) -> bool: + """启动插件""" + self.LOG.info(f"[{self.name}] 插件已启动") + self.status = PluginStatus.RUNNING + return True + + def stop(self) -> bool: + """停止插件""" + self.LOG.info(f"[{self.name}] 插件已停止") + self.status = PluginStatus.STOPPED + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + """检查是否可以处理该消息""" + if not self.enable: + return False + + content = str(message.get("content", "")).strip() + command = content.split(" ")[0] + + return command in self._commands + + @plugin_stats_decorator(plugin_name="积分交易") + def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """处理消息""" + content = str(message.get("content", "")).strip() + self.LOG.info(f"插件执行: {self.name}:{content}") + command = content.split(" ") + sender = message.get("sender") + roomid = message.get("roomid", "") + wcf: Wcf = message.get("wcf") + gbm: GroupBotManager = message.get("gbm") + xml = message.get("xml", "") + + # 检查命令格式 + if len(command) < 3: + wcf.send_text(f"❌命令格式错误!{self.command_format}", + (roomid if roomid else sender), sender) + return True, "命令格式错误" + + # 检查权限 + if roomid and gbm.get_group_permission(roomid, Feature.POINT_TRADE) == PermissionStatus.DISABLED: + return False, "没有权限" + + # 检查积分数是否为正整数 + if not command[1].isdigit(): + wcf.send_text(f"🈚️转账积分无效(必须为正整数!) \n{self.command_format}", + (roomid if roomid else sender), sender) + return True, "积分无效" + + # 检查@用户是否有效 + at_users = self.at_list(xml) + if len(at_users) != 1: + wcf.send_text(f"转账失败❌\n🈚️转账人无效! \n{self.command_format}", + (roomid if roomid else sender), sender) + return True, "转账人无效" + + reward_points = int(command[1]) + target_wxid = next(iter(at_users)) + trader_wxid = sender + group_id = roomid + + try: + # 查询发信人的记录 + sender_result = self._get_user_record(trader_wxid, group_id) + if not sender_result: + wcf.send_text(f"❌打赏失败!\n没有找到你的记录,无法进行打赏!", + (roomid if roomid else sender), sender) + return True, "发送者记录不存在" + + sender_user_id = sender_result['id'] + sender_wx_id = sender_result['wx_id'] + sender_wx_nick_name = sender_result['wx_nick_name'] + sender_current_points = int(sender_result['points']) + + # 检查发信人积分是否足够 + if sender_current_points < reward_points: + wcf.send_text( + f"❌打赏失败!\n你的积分不足以进行打赏!当前积分:{sender_current_points},你需要 {reward_points} 积分。", + (roomid if roomid else sender), sender) + return True, "积分不足" + + # 查询被打赏人的记录 + recipient_result = self._get_user_record_by_nick(target_wxid, group_id) + if not recipient_result: + wcf.send_text( + f"❌打赏失败!\n接收人[{target_wxid}]无法收取积分", + (roomid if roomid else sender), sender) + return True, "接收者记录不存在" + + recipient_user_id = recipient_result['id'] + recipient_wx_id = recipient_result['wx_id'] + recipient_wx_nick_name = recipient_result['wx_nick_name'] + recipient_current_points = int(recipient_result['points']) + + # 使用 SQL 增量更新积分 + self._update_user_points(sender_user_id, -reward_points, group_id) # 减少发送者积分 + self._update_user_points(recipient_user_id, reward_points, group_id) # 增加接收者积分 + + # 获取更新后的积分值用于显示 + updated_sender = self._get_user_record(trader_wxid, group_id) + updated_recipient = self._get_user_record_by_nick(target_wxid, group_id) + new_sender_points = int(updated_sender['points']) if updated_sender else sender_current_points + new_recipient_points = int(updated_recipient['points']) if updated_recipient else recipient_current_points + + output = ( + f"✅积分转账成功!\n" + f"👤{sender_wx_nick_name} 转给 👤{recipient_wx_nick_name} {reward_points} 积分\n" + f"👤{sender_wx_nick_name} 当前积分: {new_sender_points}\n" + f"👤{recipient_wx_nick_name} 当前积分: {new_recipient_points}" + ) + + wcf.send_text(output, (roomid if roomid else sender), sender) + return True, "转账成功" + + except mysql.connector.Error as e: + self.LOG.error(f"积分交易出错: {e}") + wcf.send_text(f"❌积分交易失败!请稍后重试。错误: {str(e)}", + (roomid if roomid else sender), sender) + return True, f"数据库错误: {str(e)}" + except Exception as e: + self.LOG.error(f"积分交易出错: {e}") + wcf.send_text(f"❌积分交易失败!请稍后重试。错误: {str(e)}", + (roomid if roomid else sender), sender) + return True, f"处理出错: {str(e)}" + + def at_list(self, xml): + """ + 解析消息中的 @用户列表 + :param xml: 消息的 XML 数据 + :return: @用户的集合 + """ + try: + root = ET.fromstring(xml) + atuserlist_element = root.find('.//atuserlist') + atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip() + + atuserlist_content_no_commas = atuserlist_content.strip(',') + atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas) + atuserlist_set = set(atuserlist_content_no_commas.split(',')) + self.LOG.debug(f"解析到的 @用户列表: {atuserlist_set}") + return atuserlist_set + except ET.ParseError as e: + self.LOG.error(f"解析 XML 失败: {e}") + return set() + + def _get_db_connection(self): + """从连接池获取数据库连接""" + return self.db_pool.get_connection() + + def _get_user_record(self, wx_id, group_id): + """ + 查询用户的记录 + :param wx_id: 用户的微信ID + :param group_id: 群组ID + :return: 用户记录(字典格式) + """ + try: + with self._get_db_connection() as conn: + with conn.cursor(dictionary=True) as cursor: + cursor.execute(""" + SELECT id, wx_id, wx_nick_name, points FROM t_sign_record + WHERE wx_id = %s AND group_id = %s + """, (wx_id, group_id)) + return cursor.fetchone() + except mysql.connector.Error as e: + self.LOG.error(f"查询用户记录失败: {e}") + return None + + def _get_user_record_by_nick(self, wx_id, group_id): + """ + 根据微信ID查询用户的记录 + :param wx_id: 用户的微信ID + :param group_id: 群组ID + :return: 用户记录(字典格式) + """ + try: + with self._get_db_connection() as conn: + with conn.cursor(dictionary=True) as cursor: + cursor.execute(""" + SELECT id, wx_id, wx_nick_name, points FROM t_sign_record + WHERE wx_id = %s AND group_id = %s + """, (wx_id, group_id)) + return cursor.fetchone() + except mysql.connector.Error as e: + self.LOG.error(f"查询用户记录失败: {e}") + return None + + def _update_user_points(self, user_id, points_change, group_id): + """ + 更新用户积分,使用 SQL 增量调整 + :param user_id: 用户ID (数据库中的 id 字段) + :param points_change: 积分变化量(正数增加,负数减少) + :param group_id: 群组ID + """ + try: + with self._get_db_connection() as conn: + with conn.cursor(dictionary=True) as cursor: + cursor.execute(""" + UPDATE t_sign_record + SET points = points + %s, update_time = %s + WHERE id = %s AND group_id = %s + """, (points_change, datetime.now(), user_id, group_id)) + conn.commit() + except mysql.connector.Error as e: + self.LOG.error(f"更新用户积分失败: {e}") + raise \ No newline at end of file diff --git a/point_trade/main.py b/point_trade/main.py deleted file mode 100644 index 3c9d0d7..0000000 --- a/point_trade/main.py +++ /dev/null @@ -1,214 +0,0 @@ -import logging -import re -import tomllib -from datetime import datetime - -from wcferry import Wcf, WxMsg - -from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus - -import xml.etree.ElementTree as ET - -import mysql.connector.pooling - - -class PointTrade: - description = "积分交易" - author = "shui niu" - version = "1.0.0" - - def __init__(self, wcf: Wcf, gbm: GroupBotManager, db_pool: mysql.connector.pooling.MySQLConnectionPool): - self.LOG = logging.getLogger(__name__) - self.wcf = wcf - self.gbm = gbm - self.db_pool = db_pool - with open("point_trade/config.toml", "rb") as f: - plugin_config = tomllib.load(f) - - config = plugin_config["PointTrade"] - - self.enable = config["enable"] - self.command = config["command"] - self.command_format = config["command-format"] - self.LOG.info(f"[积分交易] 组件初始化完成,指令: {self.command}") - - def at_list(self, xml): - """ - 解析消息中的 @用户列表 - :param xml: 消息的 XML 数据 - :return: @用户的集合 - """ - try: - root = ET.fromstring(xml) - atuserlist_element = root.find('.//atuserlist') - atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip() - - atuserlist_content_no_commas = atuserlist_content.strip(',') - atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas) - atuserlist_set = set(atuserlist_content_no_commas.split(',')) - self.LOG.debug(f"解析到的 @用户列表: {atuserlist_set}") - return atuserlist_set - except ET.ParseError as e: - self.LOG.error(f"解析 XML 失败: {e}") - return set() - - def handle_text(self, message: WxMsg): - """ - 处理文本消息,进行积分交易 - :param message: 微信消息对象 - """ - if not self.enable: - return - - content = str(message.content).strip() - command = content.split(" ") - - if command[0] not in self.command: - return - - if len(command) < 3: - self.wcf.send_text(f"❌命令格式错误!{self.command_format}", - (message.roomid if message.from_group() else message.sender), message.sender) - return - - if self.gbm.get_group_permission(message.roomid, Feature.POINT_TRADE) == PermissionStatus.DISABLED: - return - - if not command[1].isdigit(): - self.wcf.send_text(f"🈚️转账积分无效(必须为正整数!) \n{self.command_format}", - (message.roomid if message.from_group() else message.sender), message.sender) - return - - if len(self.at_list(message.xml)) != 1: - self.wcf.send_text(f"转账失败❌\n🈚️转账人无效! \n{self.command_format}", - (message.roomid if message.from_group() else message.sender), message.sender) - return - - reward_points = int(command[1]) - target_wxid = next(iter(self.at_list(message.xml))) - trader_wxid = message.sender - group_id = message.roomid - - # 查询发信人的记录 - sender_result = self._get_user_record(trader_wxid, group_id) - if not sender_result: - self.wcf.send_text(f"❌打赏失败!\n没有找到你的记录,无法进行打赏!", - (message.roomid if message.from_group() else message.sender), message.sender) - return - - sender_user_id = sender_result['id'] - sender_wx_id = sender_result['wx_id'] - sender_wx_nick_name = sender_result['wx_nick_name'] - sender_current_points = int(sender_result['points']) - - # 检查发信人积分是否足够 - if sender_current_points < reward_points: - self.wcf.send_text( - f"❌打赏失败!\n你的积分不足以进行打赏!当前积分:{sender_current_points},你需要 {reward_points} 积分。", - (message.roomid if message.from_group() else message.sender), message.sender) - return - - # 查询被打赏人的记录 - recipient_result = self._get_user_record_by_nick(target_wxid, group_id) - if not recipient_result: - self.wcf.send_text( - f"❌打赏失败!\n接收人[{target_wxid}]无法收取积分", - (message.roomid if message.from_group() else message.sender), message.sender) - return - - recipient_user_id = recipient_result['id'] - recipient_wx_id = recipient_result['wx_id'] - recipient_wx_nick_name = recipient_result['wx_nick_name'] - recipient_current_points = int(recipient_result['points']) - - # 使用 SQL 增量更新积分 - try: - self._update_user_points(sender_user_id, -reward_points, group_id) # 减少发送者积分 - self._update_user_points(recipient_user_id, reward_points, group_id) # 增加接收者积分 - except mysql.connector.Error as e: - self.wcf.send_text(f"❌积分更新失败!请稍后重试。错误: {str(e)}", - (message.roomid if message.from_group() else message.sender), message.sender) - return - - # 获取更新后的积分值用于显示 - updated_sender = self._get_user_record(trader_wxid, group_id) - updated_recipient = self._get_user_record_by_nick(target_wxid, group_id) - new_sender_points = int(updated_sender['points']) if updated_sender else sender_current_points - new_recipient_points = int(updated_recipient['points']) if updated_recipient else recipient_current_points - - output = ( - f"\n" - f"✅积分赠送成功!✨\n" - f"🤝{sender_wx_nick_name} 现在有 {new_sender_points} 点积分➖\n" - f"🤝{recipient_wx_nick_name} 现在有 {new_recipient_points} 点积分➕\n" - f"⌚️时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - ) - - # 将整数转换为字符串后再 join - self.wcf.send_text( - output, - (message.roomid if message.from_group() else message.sender), - message.sender - ) - - def _get_db_connection(self): - """从连接池获取数据库连接""" - return self.db_pool.get_connection() - - def _get_user_record(self, wx_id, group_id): - """ - 查询用户的记录 - :param wx_id: 用户的微信ID - :param group_id: 群组ID - :return: 用户记录(字典格式) - """ - try: - with self._get_db_connection() as conn: - with conn.cursor(dictionary=True) as cursor: - cursor.execute(""" - SELECT id, wx_id, wx_nick_name, points FROM t_sign_record - WHERE wx_id = %s AND group_id = %s - """, (wx_id, group_id)) - return cursor.fetchone() - except mysql.connector.Error as e: - self.LOG.error(f"查询用户记录失败: {e}") - return None - - def _get_user_record_by_nick(self, wx_id, group_id): - """ - 根据微信ID查询用户的记录 - :param wx_id: 用户的微信ID - :param group_id: 群组ID - :return: 用户记录(字典格式) - """ - try: - with self._get_db_connection() as conn: - with conn.cursor(dictionary=True) as cursor: - cursor.execute(""" - SELECT id, wx_id, wx_nick_name, points FROM t_sign_record - WHERE wx_id = %s AND group_id = %s - """, (wx_id, group_id)) - return cursor.fetchone() - except mysql.connector.Error as e: - self.LOG.error(f"查询用户记录失败: {e}") - return None - - def _update_user_points(self, user_id, points_change, group_id): - """ - 更新用户积分,使用 SQL 增量调整 - :param user_id: 用户ID (数据库中的 id 字段) - :param points_change: 积分变化量(正数增加,负数减少) - :param group_id: 群组ID - """ - try: - with self._get_db_connection() as conn: - with conn.cursor(dictionary=True) as cursor: - cursor.execute(""" - UPDATE t_sign_record - SET points = points + %s, update_time = %s - WHERE id = %s AND group_id = %s - """, (points_change, datetime.now(), user_id, group_id)) - conn.commit() - except mysql.connector.Error as e: - self.LOG.error(f"更新用户积分失败: {e}") - raise \ No newline at end of file diff --git a/robot.py b/robot.py index dae2c7b..0e3cde2 100644 --- a/robot.py +++ b/robot.py @@ -22,7 +22,6 @@ from base.func_xinghuo_web import XinghuoWeb from base.func_claude import Claude from configuration import Config from constants import ChatType -from douyin_parser.main import DouyinParser from game_task.game_task_encyclopedia import game_process_message, get_group_ids,run_random_task_assignment from group_add.main import GroupAdd from group_auto.group_auto_invite import get_first_group_id, process_command @@ -34,7 +33,6 @@ from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from plugin_common.plugin_manager import PluginManager from plugin_common.plugin_registry import PluginRegistry -from point_trade.main import PointTrade from robot_cmd.robot_command import GroupBotManager from job_mgmt import Job from robot_cmd.robot_command import Feature @@ -118,12 +116,8 @@ class Robot(Job): self.gmc = GroupMemberChange(wcf, self.redis_pool) # 签到模块加载 self.signin = SignInSystem(wcf, self.gbm, self.allContacts, self.db_pool, self.redis_pool, self.message_util) - # 积分赠送功能加载 - self.trade = PointTrade(wcf, self.gbm, self.db_pool) # 加群测试 self.group_add = GroupAdd(wcf, self.gbm) - # 抖音转视频 - self.douyin = DouyinParser(wcf, self.gbm) if ChatType.is_in_chat_types(chat_type): if chat_type == ChatType.TIGER_BOT.value and TigerBot.value_check(self.config.TIGERBOT): @@ -328,17 +322,6 @@ class Robot(Job): except Exception as e: self.LOG.error(f"member_sign_in error: {e}") - # 加入积分赠与功能 - try: - self.trade.handle_text(message=msg) - except Exception as e: - self.LOG.error(f"point trade error: {e}") - # 抖音组件 - try: - self.douyin.handle_douyin_links(message=msg) - except Exception as e: - self.LOG.error(f"douyin.handle_douyin_links error: {e}") - if msg.is_at(self.wxid): # 被@ self.toAt(msg) return # 处理完群聊信息,后面就不需要处理了