Files
abot/point_trade/main.py
2025-03-05 13:25:22 +08:00

209 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import re
import tomllib
from datetime import datetime
from wcferry import Wcf, WxMsg
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
import xml.etree.ElementTree as ET
import mysql.connector.pooling
class PointTrade:
description = "积分交易"
author = "shui niu"
version = "1.0.0"
def __init__(self, wcf: Wcf, gbm: GroupBotManager, db_pool: mysql.connector.pooling.MySQLConnectionPool):
self.LOG = logging.getLogger(__name__)
self.wcf = wcf # 假设 wcf 对象在此类中初始化
self.gbm = gbm # 权限功能
self.db_pool = db_pool
with open("point_trade/config.toml", "rb") as f:
plugin_config = tomllib.load(f)
config = plugin_config["PointTrade"]
self.enable = config["enable"]
self.command = config["command"]
self.command_format = config["command-format"]
self.LOG.info(f"[积分交易] 组件初始化完成,指令: {self.command}")
def at_list(self, xml):
"""
解析消息中的 @用户列表
:param xml: 消息的 XML 数据
:return: @用户的集合
"""
try:
root = ET.fromstring(xml)
atuserlist_element = root.find('.//atuserlist')
atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip()
atuserlist_content_no_commas = atuserlist_content.strip(',')
atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas)
atuserlist_set = set(atuserlist_content_no_commas.split(','))
self.LOG.debug(f"解析到的 @用户列表: {atuserlist_set}")
return atuserlist_set
except ET.ParseError as e:
self.LOG.error(f"解析 XML 失败: {e}")
return set()
def handle_text(self, message: WxMsg):
"""
处理文本消息,进行积分交易
:param message: 微信消息对象
"""
if not self.enable:
return
content = str(message.content).strip()
command = content.split(" ")
if command[0] not in self.command:
return
if len(command) < 3:
self.wcf.send_text(f"-----Bot-----\n❌命令格式错误!{self.command_format}",
(message.roomid if message.from_group() else message.sender), message.sender)
return
if self.gbm.get_group_permission(message.roomid, Feature.POINT_TRADE) == PermissionStatus.DISABLED:
return
if not command[1].isdigit():
self.wcf.send_text(f"-----Bot-----\n🈚️转账积分无效(必须为正整数!) \n{self.command_format}",
(message.roomid if message.from_group() else message.sender), message.sender)
return
if len(self.at_list(message.xml)) != 1:
self.wcf.send_text(f"-----Bot-----\n转账失败❌\n🈚️转账人无效! \n{self.command_format}",
(message.roomid if message.from_group() else message.sender), message.sender)
return
reward_points = int(command[1])
target_wxid = next(iter(self.at_list(message.xml)))
trader_wxid = message.sender
group_id = message.roomid
# 查询发信人的记录
sender_result = self._get_user_record(trader_wxid, group_id)
if not sender_result:
self.wcf.send_text(f"-----Bot-----\n❌打赏失败!\n没有找到你的记录,无法进行打赏!",
(message.roomid if message.from_group() else message.sender), message.sender)
return
sender_user_id, sender_wx_id, sender_wx_nick_name, sender_current_points = sender_result
sender_current_points = int(sender_current_points)
# 检查发信人积分是否足够
if sender_current_points < reward_points:
self.wcf.send_text(
f"-----Bot-----\n❌打赏失败!\n你的积分不足以进行打赏!当前积分:{sender_current_points},你需要 {reward_points} 积分。",
(message.roomid if message.from_group() else message.sender), message.sender)
return
# 查询被打赏人的记录
recipient_result = self._get_user_record_by_nick(target_wxid, group_id)
if not recipient_result:
self.wcf.send_text(
f"-----Bot-----\n❌打赏失败!\n接收人[{target_wxid}]无法收取积分",
(message.roomid if message.from_group() else message.sender), message.sender)
return
recipient_user_id, recipient_wx_id, recipient_wx_nick_name, recipient_current_points = recipient_result
recipient_current_points = int(recipient_current_points)
# 使用 SQL 增量更新积分(不再在程序中计算)
try:
self._update_user_points(sender_user_id, -reward_points, group_id) # 减少发送者积分
self._update_user_points(recipient_user_id, reward_points, group_id) # 增加接收者积分
except mysql.connector.Error as e:
self.wcf.send_text(f"-----Bot-----\n❌积分更新失败!请稍后重试。错误: {str(e)}",
(message.roomid if message.from_group() else message.sender), message.sender)
return
# 获取更新后的积分值用于显示
updated_sender = self._get_user_record(trader_wxid, group_id)
updated_recipient = self._get_user_record_by_nick(target_wxid, group_id)
new_sender_points = int(updated_sender['points']) if updated_sender else sender_current_points
new_recipient_points = int(updated_recipient['points']) if updated_recipient else recipient_current_points
output = (
f"\n-----Bot-----\n"
f"✅积分赠送成功!✨\n"
f"🤝{sender_wx_nick_name} 现在有 {new_sender_points} 点积分➖\n"
f"🤝{recipient_wx_nick_name} 现在有 {new_recipient_points} 点积分➕\n"
f"⌚️时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
self.wcf.send_text(
output,
(message.roomid if message.from_group() else message.sender),
','.join({sender_user_id, recipient_user_id})
)
def _get_db_connection(self):
"""从连接池获取数据库连接"""
return self.db_pool.get_connection()
def _get_user_record(self, wx_id, group_id):
"""
查询用户的记录
:param wx_id: 用户的微信ID
:param group_id: 群组ID
:return: 用户记录id, wx_id, wx_nick_name, points
"""
try:
with self._get_db_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("""
SELECT id, wx_id, wx_nick_name, points FROM t_sign_record
WHERE wx_id = %s AND group_id = %s
""", (wx_id, group_id))
return cursor.fetchone()
except mysql.connector.Error as e:
self.LOG.error(f"查询用户记录失败: {e}")
return None
def _get_user_record_by_nick(self, wx_id, group_id):
"""
根据微信ID查询用户的记录此处与 _get_user_record 功能相同,可优化)
:param wx_id: 用户的微信ID
:param group_id: 群组ID
:return: 用户记录id, wx_id, wx_nick_name, points
"""
try:
with self._get_db_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("""
SELECT id, wx_id, wx_nick_name, points FROM t_sign_record
WHERE wx_id = %s AND group_id = %s
""", (wx_id, group_id))
return cursor.fetchone()
except mysql.connector.Error as e:
self.LOG.error(f"查询用户记录失败: {e}")
return None
def _update_user_points(self, user_id, points_change, group_id):
"""
更新用户积分,使用 SQL 增量调整
:param user_id: 用户ID (数据库中的 id 字段)
:param points_change: 积分变化量(正数增加,负数减少)
:param group_id: 群组ID
"""
try:
with self._get_db_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("""
UPDATE t_sign_record
SET points = points + %s, update_time = %s
WHERE id = %s AND group_id = %s
""", (points_change, datetime.now(), user_id, group_id))
conn.commit()
except mysql.connector.Error as e:
self.LOG.error(f"更新用户积分失败: {e}")
raise