269 lines
10 KiB
Python
269 lines
10 KiB
Python
import logging
|
||
import re
|
||
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
import xml.etree.ElementTree as ET
|
||
|
||
from wcferry import Wcf
|
||
|
||
from db.connection import DBConnectionManager
|
||
from db.points_db import PointsDBOperator, PointSource
|
||
from plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
|
||
import mysql.connector.pooling
|
||
|
||
|
||
class PointTradePlugin(MessagePluginInterface):
|
||
"""积分交易插件"""
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "积分交易"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供积分交易功能,支持用户之间的积分转账"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "水牛"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.db_pool = None
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logging.getLogger(f"Plugin.{self.name}")
|
||
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.wcf = context.get("wcf")
|
||
self.event_system = context.get("event_system")
|
||
self.message_util = context.get("message_util")
|
||
self.gbm = context.get("gbm")
|
||
self.db_manager = DBConnectionManager.get_instance()
|
||
|
||
# 初始化积分数据库操作类
|
||
self.points_db = PointsDBOperator(self.db_manager)
|
||
|
||
self.db_pool = self.db_manager.mysql_pool
|
||
|
||
if not self.db_pool:
|
||
self.LOG.error("数据库连接池未初始化,插件无法正常工作")
|
||
return False
|
||
|
||
# 从配置中获取参数
|
||
point_trade_config = self._config.get("PointTrade", {})
|
||
self._commands = point_trade_config.get("command", ["积分交易", "积分转账", "转账积分"])
|
||
self.command_format = point_trade_config.get("command-format", "积分转账 积分数 @用户")
|
||
self.enable = point_trade_config.get("enable", True)
|
||
|
||
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
command = content.split(" ")[0]
|
||
|
||
return command in self._commands
|
||
|
||
@plugin_stats_decorator(plugin_name="积分交易")
|
||
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.info(f"插件执行: {self.name}:{content}")
|
||
command = content.split(" ")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
wcf: Wcf = message.get("wcf")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
xml = message.get("xml", "")
|
||
|
||
# 检查命令格式
|
||
if len(command) < 3:
|
||
wcf.send_text(f"❌命令格式错误!{self.command_format}",
|
||
(roomid if roomid else sender), sender)
|
||
return True, "命令格式错误"
|
||
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, Feature.POINT_TRADE) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
# 检查积分数是否为正整数
|
||
if not command[1].isdigit():
|
||
wcf.send_text(f"🈚️转账积分无效(必须为正整数!) \n{self.command_format}",
|
||
(roomid if roomid else sender), sender)
|
||
return True, "积分无效"
|
||
|
||
# 检查@用户是否有效
|
||
at_users = self.at_list(xml)
|
||
if len(at_users) != 1:
|
||
wcf.send_text(f"转账失败❌\n🈚️转账人无效! \n{self.command_format}",
|
||
(roomid if roomid else sender), sender)
|
||
return True, "转账人无效"
|
||
|
||
reward_points = int(command[1])
|
||
target_wxid = next(iter(at_users))
|
||
trader_wxid = sender
|
||
group_id = roomid
|
||
|
||
try:
|
||
# 使用新的积分系统进行转账
|
||
success, result = self.points_db.transfer_points(
|
||
trader_wxid, target_wxid, group_id,
|
||
reward_points, f"积分转账命令执行"
|
||
)
|
||
|
||
if not success:
|
||
error_msg = result.get("error", "未知错误")
|
||
if "积分不足" in error_msg:
|
||
current_points = result.get("current_points", 0)
|
||
wcf.send_text(
|
||
f"❌转账失败!\n你的积分不足以进行转账!当前积分:{current_points},你需要 {reward_points} 积分。",
|
||
(roomid if roomid else sender), sender)
|
||
else:
|
||
wcf.send_text(
|
||
f"❌转账失败!\n{error_msg}",
|
||
(roomid if roomid else sender), sender)
|
||
return True, f"转账失败: {error_msg}"
|
||
|
||
# 获取转账后的积分信息
|
||
from_user = result.get("from_user", {})
|
||
to_user = result.get("to_user", {})
|
||
|
||
# 获取用户昵称
|
||
from_user_info = self._get_user_record(trader_wxid, group_id)
|
||
to_user_info = self._get_user_record(target_wxid, group_id)
|
||
|
||
from_user_name = from_user_info.get('wx_nick_name', trader_wxid) if from_user_info else trader_wxid
|
||
to_user_name = to_user_info.get('wx_nick_name', target_wxid) if to_user_info else target_wxid
|
||
|
||
output = (
|
||
f"✅积分转账成功!\n"
|
||
f"👤{from_user_name} 转给 👤{to_user_name} {reward_points} 积分\n"
|
||
f"👤{from_user_name} 当前积分: {from_user.get('total_points', 0)}\n"
|
||
f"👤{to_user_name} 当前积分: {to_user.get('total_points', 0)}"
|
||
)
|
||
|
||
wcf.send_text(output, (roomid if roomid else sender), sender)
|
||
return True, "转账成功"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"积分交易出错: {e}")
|
||
wcf.send_text(f"❌积分交易失败!请稍后重试。错误: {str(e)}",
|
||
(roomid if roomid else sender), sender)
|
||
return True, f"处理出错: {str(e)}"
|
||
|
||
def at_list(self, xml):
|
||
"""
|
||
解析消息中的 @用户列表
|
||
:param xml: 消息的 XML 数据
|
||
:return: @用户的集合
|
||
"""
|
||
try:
|
||
root = ET.fromstring(xml)
|
||
atuserlist_element = root.find('.//atuserlist')
|
||
atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip()
|
||
|
||
atuserlist_content_no_commas = atuserlist_content.strip(',')
|
||
atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas)
|
||
atuserlist_set = set(atuserlist_content_no_commas.split(','))
|
||
self.LOG.debug(f"解析到的 @用户列表: {atuserlist_set}")
|
||
return atuserlist_set
|
||
except ET.ParseError as e:
|
||
self.LOG.error(f"解析 XML 失败: {e}")
|
||
return set()
|
||
|
||
def _get_db_connection(self):
|
||
"""从连接池获取数据库连接"""
|
||
return self.db_pool.get_connection()
|
||
|
||
def _get_user_record(self, wx_id, group_id):
|
||
"""
|
||
查询用户的记录
|
||
:param wx_id: 用户的微信ID
|
||
:param group_id: 群组ID
|
||
:return: 用户记录(字典格式)
|
||
"""
|
||
try:
|
||
with self._get_db_connection() as conn:
|
||
with conn.cursor(dictionary=True) as cursor:
|
||
cursor.execute("""
|
||
SELECT id, wx_id, wx_nick_name, points FROM t_sign_record
|
||
WHERE wx_id = %s AND group_id = %s
|
||
""", (wx_id, group_id))
|
||
return cursor.fetchone()
|
||
except mysql.connector.Error as e:
|
||
self.LOG.error(f"查询用户记录失败: {e}")
|
||
return None
|
||
|
||
def _get_user_record_by_nick(self, wx_id, group_id):
|
||
"""
|
||
根据微信ID查询用户的记录
|
||
:param wx_id: 用户的微信ID
|
||
:param group_id: 群组ID
|
||
:return: 用户记录(字典格式)
|
||
"""
|
||
try:
|
||
with self._get_db_connection() as conn:
|
||
with conn.cursor(dictionary=True) as cursor:
|
||
cursor.execute("""
|
||
SELECT id, wx_id, wx_nick_name, points FROM t_sign_record
|
||
WHERE wx_id = %s AND group_id = %s
|
||
""", (wx_id, group_id))
|
||
return cursor.fetchone()
|
||
except mysql.connector.Error as e:
|
||
self.LOG.error(f"查询用户记录失败: {e}")
|
||
return None
|
||
|
||
def _update_user_points(self, user_id, points_change, group_id):
|
||
"""
|
||
更新用户积分,使用 SQL 增量调整
|
||
:param user_id: 用户ID (数据库中的 id 字段)
|
||
:param points_change: 积分变化量(正数增加,负数减少)
|
||
:param group_id: 群组ID
|
||
"""
|
||
try:
|
||
with self._get_db_connection() as conn:
|
||
with conn.cursor(dictionary=True) as cursor:
|
||
cursor.execute("""
|
||
UPDATE t_sign_record
|
||
SET points = points + %s, update_time = %s
|
||
WHERE id = %s AND group_id = %s
|
||
""", (points_change, datetime.now(), user_id, group_id))
|
||
conn.commit()
|
||
except mysql.connector.Error as e:
|
||
self.LOG.error(f"更新用户积分失败: {e}")
|
||
raise |