import logging import re from datetime import datetime from typing import Dict, Any, List, Optional, Tuple import xml.etree.ElementTree as ET from wcferry import Wcf from db.connection import DBConnectionManager from db.points_db import PointsDBOperator, PointSource from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager import mysql.connector.pooling class PointTradePlugin(MessagePluginInterface): """积分交易插件""" @property def name(self) -> str: return "积分交易" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供积分交易功能,支持用户之间的积分转账" @property def author(self) -> str: return "水牛" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands def __init__(self): super().__init__() self.db_pool = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logging.getLogger(f"Plugin.{self.name}") self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.wcf = context.get("wcf") self.event_system = context.get("event_system") self.message_util = context.get("message_util") self.gbm = context.get("gbm") self.db_manager = DBConnectionManager.get_instance() # 初始化积分数据库操作类 self.points_db = PointsDBOperator(self.db_manager) self.db_pool = self.db_manager.mysql_pool if not self.db_pool: self.LOG.error("数据库连接池未初始化,插件无法正常工作") return False # 从配置中获取参数 point_trade_config = self._config.get("PointTrade", {}) self._commands = point_trade_config.get("command", ["积分交易", "积分转账", "转账积分", "我的积分", "积分排行"]) self.command_format = point_trade_config.get("command-format", """ 积分交易指令: 积分转账 积分数 @用户 - 转账给指定用户 我的积分 - 查询个人积分详情 积分排行 - 查看群内积分排行榜 """) self.enable = point_trade_config.get("enable", True) self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="积分交易") def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.info(f"插件执行: {self.name}:{content}") command = content.split(" ") sender = message.get("sender") roomid = message.get("roomid", "") wcf: Wcf = message.get("wcf") gbm: GroupBotManager = message.get("gbm") xml = message.get("xml", "") # 检查权限 if roomid and gbm.get_group_permission(roomid, Feature.POINT_TRADE) == PermissionStatus.DISABLED: return False, "没有权限" # 处理不同的命令 if command[0] == "我的积分": return self._handle_my_points(message) elif command[0] == "积分排行": return self._handle_points_ranking(message) elif command[0] in self._commands: return self._handle_transfer_points(message) else: wcf.send_text(f"❌未知命令!{self.command_format}", (roomid if roomid else sender), sender) return True, "未知命令" def _handle_transfer_points(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理积分转账命令""" content = str(message.get("content", "")).strip() command = content.split(" ") sender = message.get("sender") roomid = message.get("roomid", "") wcf: Wcf = message.get("wcf") xml = message.get("xml", "") # 检查命令格式 if len(command) < 3: wcf.send_text(f"❌命令格式错误!积分转账 积分数 @用户", (roomid if roomid else sender), sender) return True, "命令格式错误" # 检查积分数是否为正整数 if not command[1].isdigit(): wcf.send_text(f"🈚️转账积分无效(必须为正整数!) \n积分转账 积分数 @用户", (roomid if roomid else sender), sender) return True, "积分无效" # 检查@用户是否有效 at_users = self.at_list(xml) if len(at_users) != 1: wcf.send_text(f"转账失败❌\n🈚️转账人无效! \n积分转账 积分数 @用户", (roomid if roomid else sender), sender) return True, "转账人无效" reward_points = int(command[1]) target_wxid = next(iter(at_users)) trader_wxid = sender group_id = roomid try: # 使用积分系统进行转账 success, result = self.points_db.transfer_points( trader_wxid, target_wxid, group_id, reward_points, f"积分转账命令执行" ) if not success: error_msg = result.get("error", "未知错误") if "积分不足" in error_msg: current_points = result.get("current_points", 0) wcf.send_text( f"❌转账失败!\n你的积分不足以进行转账!当前积分:{current_points},你需要 {reward_points} 积分。", (roomid if roomid else sender), sender) else: wcf.send_text( f"❌转账失败!\n{error_msg}", (roomid if roomid else sender), sender) return True, f"转账失败: {error_msg}" # 获取转账后的积分信息 from_user = result.get("from_user", {}) to_user = result.get("to_user", {}) # 获取用户昵称 from_user_info = self._get_user_record(trader_wxid, group_id) to_user_info = self._get_user_record(target_wxid, group_id) from_user_name = from_user_info.get('wx_nick_name', trader_wxid) if from_user_info else trader_wxid to_user_name = to_user_info.get('wx_nick_name', target_wxid) if to_user_info else target_wxid output = ( f"✅积分转账成功!\n" f"👤{from_user_name} 转给 👤{to_user_name} {reward_points} 积分\n" f"👤{from_user_name} 当前积分: {from_user.get('total_points', 0)}\n" f"👤{to_user_name} 当前积分: {to_user.get('total_points', 0)}" ) wcf.send_text(output, (roomid if roomid else sender), sender) return True, "转账成功" except Exception as e: self.LOG.error(f"积分交易出错: {e}") wcf.send_text(f"❌积分交易失败!请稍后重试。错误: {str(e)}", (roomid if roomid else sender), sender) return True, f"处理出错: {str(e)}" def _handle_my_points(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理查询个人积分命令""" sender = message.get("sender") roomid = message.get("roomid", "") wcf: Wcf = message.get("wcf") try: # 获取用户积分信息 user_points = self.points_db.get_user_points(sender, roomid) if not user_points: wcf.send_text(f"❌未找到你的积分记录!请先参与积分活动[签到,答题/t]。", (roomid if roomid else sender), sender) return True, "未找到积分记录" # 获取用户昵称 user_info = self._get_user_record(sender, roomid) user_name = user_info.get('wx_nick_name', sender) if user_info else sender # 获取用户积分交易记录 transactions = self.points_db.get_user_transactions(sender, roomid, 5) # 统计不同来源的积分 source_stats = {} for tx in transactions: source = tx.get('source', '其他') points = tx.get('points', 0) if source not in source_stats: source_stats[source] = 0 source_stats[source] += points # 构建积分详情消息 source_details = "\n".join([f"- {source}: {points}" for source, points in source_stats.items()]) # 构建最近交易记录 recent_txs = "" if transactions: recent_txs = "\n最近交易记录:\n" for i, tx in enumerate(transactions[:5], 1): tx_type = "获得" if tx.get('points', 0) > 0 else "消费" points = abs(tx.get('points', 0)) desc = tx.get('description', '无描述') date = tx.get('created_at', '').strftime('%m-%d %H:%M') if tx.get('created_at') else '未知时间' recent_txs += f"{i}. {date} {tx_type} {points} 积分 - {desc}\n" output = ( f"📊 {user_name} 的积分详情\n" f"总积分: {user_points.get('total_points', 0)}\n" f"\n积分来源统计:\n{source_details}" f"{recent_txs}" ) wcf.send_text(output, (roomid if roomid else sender), sender) return True, "查询积分成功" except Exception as e: self.LOG.error(f"查询积分出错: {e}") wcf.send_text(f"❌查询积分失败!请稍后重试。错误: {str(e)}", (roomid if roomid else sender), sender) return True, f"处理出错: {str(e)}" def _handle_points_ranking(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理积分排行榜命令""" sender = message.get("sender") roomid = message.get("roomid", "") wcf: Wcf = message.get("wcf") if not roomid: wcf.send_text("❌积分排行榜仅在群聊中可用!", sender, "") return True, "非群聊环境" try: # 获取群内积分排行 ranking = self.points_db.get_points_ranking(roomid, 10) if not ranking: wcf.send_text("❌暂无积分排行数据!请先参与积分活动。", roomid, sender) return True, "无排行数据" # 构建排行榜消息 rank_list = [] for i, user in enumerate(ranking, 1): user_id = user.get('user_id', '') points = user.get('total_points', 0) # 获取用户昵称 user_info = self._get_user_record(user_id, roomid) user_name = user_info.get('wx_nick_name', user_id) if user_info else user_id # 添加排名标记 rank_mark = "🥇" if i == 1 else "🥈" if i == 2 else "🥉" if i == 3 else f"{i}." rank_list.append(f"{rank_mark} {user_name}: {points} 积分") output = ( f"🏆 积分排行榜 🏆\n" f"\n" f"{chr(10).join(rank_list)}\n" f"\n" f"更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) wcf.send_text(output, roomid, sender) return True, "查询排行榜成功" except Exception as e: self.LOG.error(f"查询积分排行榜出错: {e}") wcf.send_text(f"❌查询积分排行榜失败!请稍后重试。错误: {str(e)}", roomid, sender) return True, f"处理出错: {str(e)}" def at_list(self, xml): """ 解析消息中的 @用户列表 :param xml: 消息的 XML 数据 :return: @用户的集合 """ try: root = ET.fromstring(xml) atuserlist_element = root.find('.//atuserlist') atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip() atuserlist_content_no_commas = atuserlist_content.strip(',') atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas) atuserlist_set = set(atuserlist_content_no_commas.split(',')) self.LOG.debug(f"解析到的 @用户列表: {atuserlist_set}") return atuserlist_set except ET.ParseError as e: self.LOG.error(f"解析 XML 失败: {e}") return set() def _get_db_connection(self): """从连接池获取数据库连接""" return self.db_pool.get_connection() def _get_user_record(self, wx_id, group_id): """ 查询用户的记录 :param wx_id: 用户的微信ID :param group_id: 群组ID :return: 用户记录(字典格式) """ try: with self._get_db_connection() as conn: with conn.cursor(dictionary=True) as cursor: cursor.execute(""" SELECT id, wx_id, wx_nick_name, points FROM t_sign_record WHERE wx_id = %s AND group_id = %s """, (wx_id, group_id)) return cursor.fetchone() except mysql.connector.Error as e: self.LOG.error(f"查询用户记录失败: {e}") return None def _get_user_record_by_nick(self, wx_id, group_id): """ 根据微信ID查询用户的记录 :param wx_id: 用户的微信ID :param group_id: 群组ID :return: 用户记录(字典格式) """ try: with self._get_db_connection() as conn: with conn.cursor(dictionary=True) as cursor: cursor.execute(""" SELECT id, wx_id, wx_nick_name, points FROM t_sign_record WHERE wx_id = %s AND group_id = %s """, (wx_id, group_id)) return cursor.fetchone() except mysql.connector.Error as e: self.LOG.error(f"查询用户记录失败: {e}") return None def _update_user_points(self, user_id, points_change, group_id): """ 更新用户积分,使用 SQL 增量调整 :param user_id: 用户ID (数据库中的 id 字段) :param points_change: 积分变化量(正数增加,负数减少) :param group_id: 群组ID """ try: with self._get_db_connection() as conn: with conn.cursor(dictionary=True) as cursor: cursor.execute(""" UPDATE t_sign_record SET points = points + %s, update_time = %s WHERE id = %s AND group_id = %s """, (points_change, datetime.now(), user_id, group_id)) conn.commit() except mysql.connector.Error as e: self.LOG.error(f"更新用户积分失败: {e}") raise