import logging import re import os import toml from datetime import datetime from typing import Dict, Any, List, Optional, Tuple import xml.etree.ElementTree as ET from wcferry import Wcf from db.connection import DBConnectionManager from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from plugins.stats_collector.decorators import plugin_stats_decorator from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager import mysql.connector.pooling class PointTradePlugin(MessagePluginInterface): """积分交易插件""" @property def name(self) -> str: return "积分交易" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供积分交易功能,支持用户之间的积分转账" @property def author(self) -> str: return "水牛" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands def __init__(self): super().__init__() self.db_pool = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logging.getLogger(f"Plugin.{self.name}") self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.wcf = context.get("wcf") self.event_system = context.get("event_system") self.message_util = context.get("message_util") self.gbm = context.get("gbm") self.db_manager = DBConnectionManager.get_instance() self.db_pool = self.db_manager.mysql_pool if not self.db_pool: self.LOG.error("数据库连接池未初始化,插件无法正常工作") return False # 从配置中获取参数 point_trade_config = self._config.get("PointTrade", {}) self._commands = point_trade_config.get("command", ["积分交易", "积分转账", "转账积分"]) self.command_format = point_trade_config.get("command-format", "积分转账 积分数 @用户") self.enable = point_trade_config.get("enable", True) self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="积分交易") def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.info(f"插件执行: {self.name}:{content}") command = content.split(" ") sender = message.get("sender") roomid = message.get("roomid", "") wcf: Wcf = message.get("wcf") gbm: GroupBotManager = message.get("gbm") xml = message.get("xml", "") # 检查命令格式 if len(command) < 3: wcf.send_text(f"❌命令格式错误!{self.command_format}", (roomid if roomid else sender), sender) return True, "命令格式错误" # 检查权限 if roomid and gbm.get_group_permission(roomid, Feature.POINT_TRADE) == PermissionStatus.DISABLED: return False, "没有权限" # 检查积分数是否为正整数 if not command[1].isdigit(): wcf.send_text(f"🈚️转账积分无效(必须为正整数!) \n{self.command_format}", (roomid if roomid else sender), sender) return True, "积分无效" # 检查@用户是否有效 at_users = self.at_list(xml) if len(at_users) != 1: wcf.send_text(f"转账失败❌\n🈚️转账人无效! \n{self.command_format}", (roomid if roomid else sender), sender) return True, "转账人无效" reward_points = int(command[1]) target_wxid = next(iter(at_users)) trader_wxid = sender group_id = roomid try: # 查询发信人的记录 sender_result = self._get_user_record(trader_wxid, group_id) if not sender_result: wcf.send_text(f"❌打赏失败!\n没有找到你的记录,无法进行打赏!", (roomid if roomid else sender), sender) return True, "发送者记录不存在" sender_user_id = sender_result['id'] sender_wx_id = sender_result['wx_id'] sender_wx_nick_name = sender_result['wx_nick_name'] sender_current_points = int(sender_result['points']) # 检查发信人积分是否足够 if sender_current_points < reward_points: wcf.send_text( f"❌打赏失败!\n你的积分不足以进行打赏!当前积分:{sender_current_points},你需要 {reward_points} 积分。", (roomid if roomid else sender), sender) return True, "积分不足" # 查询被打赏人的记录 recipient_result = self._get_user_record_by_nick(target_wxid, group_id) if not recipient_result: wcf.send_text( f"❌打赏失败!\n接收人[{target_wxid}]无法收取积分", (roomid if roomid else sender), sender) return True, "接收者记录不存在" recipient_user_id = recipient_result['id'] recipient_wx_id = recipient_result['wx_id'] recipient_wx_nick_name = recipient_result['wx_nick_name'] recipient_current_points = int(recipient_result['points']) # 使用 SQL 增量更新积分 self._update_user_points(sender_user_id, -reward_points, group_id) # 减少发送者积分 self._update_user_points(recipient_user_id, reward_points, group_id) # 增加接收者积分 # 获取更新后的积分值用于显示 updated_sender = self._get_user_record(trader_wxid, group_id) updated_recipient = self._get_user_record_by_nick(target_wxid, group_id) new_sender_points = int(updated_sender['points']) if updated_sender else sender_current_points new_recipient_points = int(updated_recipient['points']) if updated_recipient else recipient_current_points output = ( f"✅积分转账成功!\n" f"👤{sender_wx_nick_name} 转给 👤{recipient_wx_nick_name} {reward_points} 积分\n" f"👤{sender_wx_nick_name} 当前积分: {new_sender_points}\n" f"👤{recipient_wx_nick_name} 当前积分: {new_recipient_points}" ) wcf.send_text(output, (roomid if roomid else sender), sender) return True, "转账成功" except mysql.connector.Error as e: self.LOG.error(f"积分交易出错: {e}") wcf.send_text(f"❌积分交易失败!请稍后重试。错误: {str(e)}", (roomid if roomid else sender), sender) return True, f"数据库错误: {str(e)}" except Exception as e: self.LOG.error(f"积分交易出错: {e}") wcf.send_text(f"❌积分交易失败!请稍后重试。错误: {str(e)}", (roomid if roomid else sender), sender) return True, f"处理出错: {str(e)}" def at_list(self, xml): """ 解析消息中的 @用户列表 :param xml: 消息的 XML 数据 :return: @用户的集合 """ try: root = ET.fromstring(xml) atuserlist_element = root.find('.//atuserlist') atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip() atuserlist_content_no_commas = atuserlist_content.strip(',') atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas) atuserlist_set = set(atuserlist_content_no_commas.split(',')) self.LOG.debug(f"解析到的 @用户列表: {atuserlist_set}") return atuserlist_set except ET.ParseError as e: self.LOG.error(f"解析 XML 失败: {e}") return set() def _get_db_connection(self): """从连接池获取数据库连接""" return self.db_pool.get_connection() def _get_user_record(self, wx_id, group_id): """ 查询用户的记录 :param wx_id: 用户的微信ID :param group_id: 群组ID :return: 用户记录(字典格式) """ try: with self._get_db_connection() as conn: with conn.cursor(dictionary=True) as cursor: cursor.execute(""" SELECT id, wx_id, wx_nick_name, points FROM t_sign_record WHERE wx_id = %s AND group_id = %s """, (wx_id, group_id)) return cursor.fetchone() except mysql.connector.Error as e: self.LOG.error(f"查询用户记录失败: {e}") return None def _get_user_record_by_nick(self, wx_id, group_id): """ 根据微信ID查询用户的记录 :param wx_id: 用户的微信ID :param group_id: 群组ID :return: 用户记录(字典格式) """ try: with self._get_db_connection() as conn: with conn.cursor(dictionary=True) as cursor: cursor.execute(""" SELECT id, wx_id, wx_nick_name, points FROM t_sign_record WHERE wx_id = %s AND group_id = %s """, (wx_id, group_id)) return cursor.fetchone() except mysql.connector.Error as e: self.LOG.error(f"查询用户记录失败: {e}") return None def _update_user_points(self, user_id, points_change, group_id): """ 更新用户积分,使用 SQL 增量调整 :param user_id: 用户ID (数据库中的 id 字段) :param points_change: 积分变化量(正数增加,负数减少) :param group_id: 群组ID """ try: with self._get_db_connection() as conn: with conn.cursor(dictionary=True) as cursor: cursor.execute(""" UPDATE t_sign_record SET points = points + %s, update_time = %s WHERE id = %s AND group_id = %s """, (points_change, datetime.now(), user_id, group_id)) conn.commit() except mysql.connector.Error as e: self.LOG.error(f"更新用户积分失败: {e}") raise