Files
abot/point_trade/main.py
2025-03-05 12:38:57 +08:00

210 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import re
import tomllib
from datetime import datetime
from wcferry import Wcf, WxMsg
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
import xml.etree.ElementTree as ET
import mysql.connector.pooling
class PointTrade:
description = "积分交易"
author = "shui niu"
version = "1.0.0"
def __init__(self, wcf: Wcf, gbm: GroupBotManager, db_pool: mysql.connector.pooling.MySQLConnectionPool):
self.LOG = logging.getLogger(__name__)
self.wcf = wcf # 假设 wcf 对象在此类中初始化
self.gbm = gbm # 权限功能
self.db_pool = db_pool
with open("point_trade/config.toml", "rb") as f:
plugin_config = tomllib.load(f)
config = plugin_config["PointTrade"]
self.enable = config["enable"]
self.command = config["command"]
self.command_format = config["command-format"]
self.LOG.info(f"[积分交易] 组件初始化完成,指令: {self.command}")
def at_list(self, xml):
# xml demo内容
# xml_data = '''<msgsource>
# <atuserlist>
# <![CDATA[,wxid_dx2ix9e2k4gz21,wxid_vd82ur5s1q4d22,wxid_kfhp3d3a1ynx22]]>
# </atuserlist>
# <pua>1</pua>
# <silence>1</silence>
# <membercount>5</membercount>
# <signature>V1_tTavqCYr|v1_tTavqCYr</signature>
# <tmp_node>
# <publisher-id />
# </tmp_node>
# <sec_msg_node>
# <alnode>
# <fr>1</fr>
# </alnode>
# </sec_msg_node>
# </msgsource>'''
# 解析 XML 数据
# 解析 XML 数据
root = ET.fromstring(xml)
# 查找 <atuserlist> 元素并提取其文本内容
atuserlist_element = root.find('.//atuserlist')
atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip() # 去除前后的所有空白字符
# 由于 CDATA 内容前后有逗号,我们需要先去除它们,然后再分割字符串
# 注意:这里我们假设 CDATA 内容只有前后的逗号是多余的,内部没有逗号需要保留
atuserlist_content_no_commas = atuserlist_content.strip(',') # 实际上这里只会去除前后的逗号,不会去除换行符等其他空白字符
# 如果需要去除换行符,可以再加一行:
atuserlist_content_no_commas = atuserlist_content_no_commas.replace('\n', '')
atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas)
# 但由于您的 CDATA 示例中没有换行符,所以这行通常是不必要的
# 按逗号分割成列表,然后转换成集合
atuserlist_set = set(atuserlist_content_no_commas.split(','))
# 输出集合
print("atuserlist 集合:", atuserlist_set)
return atuserlist_set
def handle_text(self, message: WxMsg):
if not self.enable:
return
content = str(message.content).strip()
command = content.split(" ")
if command[0] not in self.command:
return
if len(command) < 3:
self.wcf.send_text(f"-----Bot-----\n❌命令格式错误!{self.command_format}",
(message.roomid if message.from_group() else message.sender), message.sender)
return
if self.gbm.get_group_permission(message.roomid, Feature.POINT_TRADE) == PermissionStatus.DISABLED:
return
if not command[1].isdigit():
self.wcf.send_text(f"-----Bot-----\n🈚️转账积分无效(必须为正整数!) \n{self.command_format}",
(message.roomid if message.from_group() else message.sender), message.sender)
return
if len(self.at_list(message.xml)) != 1:
self.wcf.send_text(f"-----Bot-----\n转账失败❌\n🈚️转账人无效! \n{self.command_format}",
(message.roomid if message.from_group() else message.sender), message.sender)
return
reward_points = int(command[1])
target_wxid = next(iter(self.at_list(message.xml)))
trader_wxid = message.sender
group_id = message.roomid
# 查询发信人的记录(获取发信人当前积分)
sender_result = self._get_user_record(trader_wxid, group_id)
if not sender_result:
self.wcf.send_text(f"-----Bot-----\n❌打赏失败!\n没有找到你的记录,无法进行打赏!",
(message.roomid if message.from_group() else message.sender), message.sender)
return
sender_user_id, sender_wx_id, sender_wx_nick_name, sender_current_points = sender_result
sender_current_points = int(sender_current_points)
# 如果发信人积分不足
if sender_current_points < reward_points:
self.wcf.send_text(
f"-----Bot-----\n❌打赏失败!\n你的积分不足以进行打赏!当前积分:{sender_current_points},你需要 {reward_points} 积分。",
(message.roomid if message.from_group() else message.sender), message.sender)
return
# 查询被打赏人的记录
recipient_result = self._get_user_record_by_nick(target_wxid, group_id)
if not recipient_result:
self.wcf.send_text(
f"-----Bot-----\n❌打赏失败!\n接收人[{target_wxid}]无法收取积分",
(message.roomid if message.from_group() else message.sender), message.sender)
return
recipient_user_id, recipient_wx_id, recipient_wx_nick_name, recipient_current_points = recipient_result
recipient_current_points =int(recipient_current_points)
# 计算发信人和接收者的新积分
new_sender_points = sender_current_points - reward_points
new_recipient_points = recipient_current_points + reward_points
# 更新发信人和接收者的积分
self._update_user_points(sender_user_id, new_sender_points)
self._update_user_points(recipient_user_id, new_recipient_points)
output = (
f"\n-----Bot-----\n"
f"✅积分赠送成功!✨\n"
f"🤝{sender_wx_nick_name} 现在有 {new_sender_points} 点积分➖\n"
f"🤝{recipient_wx_nick_name} 现在有 {new_recipient_points} 点积分➕\n"
f"⌚️时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
self.wcf.send_text(
output,
(message.roomid if message.from_group() else message.sender),
','.join({sender_user_id, recipient_user_id})
)
def _get_db_connection(self):
"""从连接池获取数据库连接"""
return self.db_pool.get_connection()
def _get_user_record(self, wx_id, group_id):
"""
查询用户的记录
:param wx_id: 用户的微信ID
:param group_id: 群组ID
:return: 用户记录id, wx_id, wx_nick_name, points
"""
# 连接到数据库
with self._get_db_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("""
SELECT id, wx_id, wx_nick_name, points FROM t_sign_record
WHERE wx_id = %s AND group_id = %s
""", (wx_id, group_id))
return cursor.fetchone()
def _get_user_record_by_nick(self, wx_nick_name, group_id):
"""
根据微信昵称查询用户的记录
:param wx_nick_name: 用户的微信昵称
:param group_id: 群组ID
:return: 用户记录id, wx_id, wx_nick_name, points
"""
with self._get_db_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("""
SELECT id, wx_id, wx_nick_name, points FROM t_sign_record
WHERE wx_nick_name = %s AND group_id = %s
""", (wx_nick_name, group_id))
return self.cursor.fetchone()
def _update_user_points(self, user_id, new_points):
"""
更新用户积分
:param user_id: 用户ID
:param new_points: 新的积分数
"""
with self._get_db_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("""
UPDATE t_sign_record
SET points = %s, update_time = %s
WHERE id = %s
""", (new_points, datetime.now(), user_id))