Files
WechatHookBot/plugins/SignInPlugin/main.py
2025-12-12 18:35:39 +08:00

1614 lines
69 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
签到插件
用户发送签到关键词即可进行签到随机获得3-10积分
每天只能签到一次,支持连续签到奖励
"""
import asyncio
import random
import tomllib
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Optional, Tuple, Dict, List
import os
import aiohttp
from io import BytesIO
import pymysql
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message, schedule
from utils.redis_cache import get_cache
from WechatHook import WechatHookClient
try:
from PIL import Image, ImageDraw, ImageFont, ImageFilter
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
logger.warning("PIL库未安装将使用文本模式")
class SignInPlugin(PluginBase):
"""签到插件"""
description = "签到插件 - 每日签到获取积分"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
self.db_config = None
async def async_init(self):
"""异步初始化"""
# 读取配置
config_path = Path(__file__).parent / "config.toml"
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
self.db_config = self.config["database"]
# 创建临时文件夹
self.temp_dir = Path(__file__).parent / "temp"
self.temp_dir.mkdir(exist_ok=True)
# 图片文件夹
self.images_dir = Path(__file__).parent / "images"
logger.success("签到插件初始化完成")
def get_db_connection(self):
"""获取数据库连接"""
return pymysql.connect(
host=self.db_config["host"],
port=self.db_config["port"],
user=self.db_config["user"],
password=self.db_config["password"],
database=self.db_config["database"],
charset=self.db_config["charset"],
autocommit=True
)
def get_user_info(self, wxid: str) -> Optional[dict]:
"""获取用户信息"""
try:
with self.get_db_connection() as conn:
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
SELECT wxid, nickname, city, points, last_signin_date,
signin_streak, total_signin_days
FROM user_signin
WHERE wxid = %s
"""
cursor.execute(sql, (wxid,))
return cursor.fetchone()
except Exception as e:
logger.error(f"获取用户信息失败: {e}")
return None
def create_or_update_user(self, wxid: str, nickname: str = "", city: str = "") -> bool:
"""创建或更新用户信息"""
try:
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
# 使用 ON DUPLICATE KEY UPDATE 处理用户存在的情况
sql = """
INSERT INTO user_signin (wxid, nickname, city, points, last_signin_date,
signin_streak, total_signin_days)
VALUES (%s, %s, %s, 0, NULL, 0, 0)
ON DUPLICATE KEY UPDATE
nickname = CASE
WHEN VALUES(nickname) != '' THEN VALUES(nickname)
ELSE nickname
END,
city = CASE
WHEN VALUES(city) != '' THEN VALUES(city)
ELSE city
END
"""
cursor.execute(sql, (wxid, nickname, city))
return True
except Exception as e:
logger.error(f"创建/更新用户失败: {e}")
return False
def update_user_city(self, wxid: str, city: str) -> bool:
"""更新用户城市信息(支持覆盖更新)"""
try:
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
sql = """
INSERT INTO user_signin (wxid, city, points, last_signin_date,
signin_streak, total_signin_days)
VALUES (%s, %s, 0, NULL, 0, 0)
ON DUPLICATE KEY UPDATE
city = VALUES(city),
updated_at = NOW()
"""
cursor.execute(sql, (wxid, city))
return True
except Exception as e:
logger.error(f"更新用户城市失败: {e}")
return False
def update_user_nickname(self, wxid: str, nickname: str) -> bool:
"""更新用户昵称"""
try:
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
sql = """
UPDATE user_signin
SET nickname = %s, updated_at = NOW()
WHERE wxid = %s
"""
cursor.execute(sql, (nickname, wxid))
return cursor.rowcount > 0
except Exception as e:
logger.error(f"更新用户昵称失败: {e}")
return False
def record_points_change(self, wxid: str, nickname: str, change_type: str,
points_change: int, points_before: int, points_after: int,
description: str = "", related_id: str = "") -> bool:
"""
记录积分变动
Args:
wxid: 用户微信ID
nickname: 用户昵称
change_type: 变动类型 (signin/bonus/consume/admin/other)
points_change: 变动数量(正数增加,负数减少)
points_before: 变动前积分
points_after: 变动后积分
description: 变动说明
related_id: 关联ID
"""
try:
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
sql = """
INSERT INTO points_history
(wxid, nickname, change_type, points_change, points_before, points_after, description, related_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
wxid, nickname, change_type, points_change,
points_before, points_after, description, related_id
))
return True
except Exception as e:
logger.error(f"记录积分变动失败: {e}")
return False
def add_points(self, wxid: str, points: int, change_type: str = "other",
description: str = "", related_id: str = "") -> Tuple[bool, int]:
"""
增加用户积分(通用方法)
Returns:
(success, new_points)
"""
try:
user_info = self.get_user_info(wxid)
if not user_info:
return False, 0
points_before = user_info.get("points", 0)
points_after = points_before + points
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
sql = """
UPDATE user_signin
SET points = %s, updated_at = NOW()
WHERE wxid = %s
"""
cursor.execute(sql, (points_after, wxid))
# 记录积分变动
self.record_points_change(
wxid, user_info.get("nickname", ""),
change_type, points, points_before, points_after,
description, related_id
)
return True, points_after
except Exception as e:
logger.error(f"增加积分失败: {e}")
return False, 0
def deduct_points(self, wxid: str, points: int, change_type: str = "consume",
description: str = "", related_id: str = "") -> Tuple[bool, int]:
"""
扣除用户积分(通用方法)
Returns:
(success, new_points) - 如果积分不足返回 (False, current_points)
"""
try:
user_info = self.get_user_info(wxid)
if not user_info:
return False, 0
points_before = user_info.get("points", 0)
if points_before < points:
logger.warning(f"用户 {wxid} 积分不足: {points_before} < {points}")
return False, points_before
points_after = points_before - points
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
sql = """
UPDATE user_signin
SET points = %s, updated_at = NOW()
WHERE wxid = %s
"""
cursor.execute(sql, (points_after, wxid))
# 记录积分变动(负数)
self.record_points_change(
wxid, user_info.get("nickname", ""),
change_type, -points, points_before, points_after,
description, related_id
)
return True, points_after
except Exception as e:
logger.error(f"扣除积分失败: {e}")
return False, 0
def get_points_history(self, wxid: str, limit: int = 20) -> List[dict]:
"""获取用户积分变动历史"""
try:
with self.get_db_connection() as conn:
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
SELECT change_type, points_change, points_before, points_after,
description, created_at
FROM points_history
WHERE wxid = %s
ORDER BY created_at DESC
LIMIT %s
"""
cursor.execute(sql, (wxid, limit))
return cursor.fetchall()
except Exception as e:
logger.error(f"获取积分历史失败: {e}")
return []
def get_points_leaderboard(self, wxid_list: List[str] = None, limit: int = 20) -> List[dict]:
"""
获取积分排行榜
Args:
wxid_list: 限定的用户wxid列表用于群聊排行为None则返回全局排行
limit: 返回数量限制
"""
try:
with self.get_db_connection() as conn:
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
if wxid_list:
# 群聊排行:只查询指定用户
placeholders = ','.join(['%s'] * len(wxid_list))
sql = f"""
SELECT wxid, nickname, points, signin_streak, total_signin_days
FROM user_signin
WHERE wxid IN ({placeholders})
ORDER BY points DESC
LIMIT %s
"""
cursor.execute(sql, (*wxid_list, limit))
else:
# 全局排行
sql = """
SELECT wxid, nickname, points, signin_streak, total_signin_days
FROM user_signin
ORDER BY points DESC
LIMIT %s
"""
cursor.execute(sql, (limit,))
return cursor.fetchall()
except Exception as e:
logger.error(f"获取积分排行榜失败: {e}")
return []
async def update_group_members_info(self, client: WechatHookClient, group_wxid: str) -> Tuple[int, int]:
"""
更新群成员信息到 Redis队列方式不并发
Returns:
(成功数, 总数)
"""
redis_cache = get_cache()
if not redis_cache or not redis_cache.enabled:
logger.warning("Redis 缓存未启用,无法更新群成员信息")
return 0, 0
try:
# 获取群成员列表
logger.info(f"开始获取群成员列表: {group_wxid}")
members = await client.get_chatroom_members(group_wxid)
if not members:
logger.warning(f"获取群成员列表为空: {group_wxid}")
return 0, 0
total = len(members)
success = 0
logger.info(f"获取到 {total} 个群成员,开始逐个更新信息")
# 逐个获取详细信息并缓存(队列方式,不并发)
for i, member in enumerate(members):
wxid = member.get("wxid", "")
if not wxid:
continue
try:
# 获取用户详细信息
user_info = await client.get_user_info_in_chatroom(group_wxid, wxid)
if user_info:
# 存入 Redis 缓存
redis_cache.set_user_info(group_wxid, wxid, user_info)
success += 1
logger.debug(f"[{i+1}/{total}] 更新成功: {wxid}")
else:
logger.debug(f"[{i+1}/{total}] 获取信息失败: {wxid}")
# 每个请求间隔一小段时间,避免请求过快
await asyncio.sleep(0.3)
except Exception as e:
logger.error(f"更新成员信息失败 {wxid}: {e}")
continue
logger.success(f"群成员信息更新完成: {group_wxid}, 成功 {success}/{total}")
return success, total
except Exception as e:
logger.error(f"更新群成员信息失败: {e}")
return 0, 0
def get_group_member_wxids(self, group_wxid: str) -> List[str]:
"""从 Redis 缓存获取群成员 wxid 列表"""
redis_cache = get_cache()
if not redis_cache or not redis_cache.enabled or not redis_cache.client:
return []
try:
pattern = f"user_info:{group_wxid}:*"
keys = redis_cache.client.keys(pattern)
wxids = []
for key in keys:
# decode_responses=True 时 key 已经是字符串
if isinstance(key, bytes):
key = key.decode('utf-8')
parts = key.split(':')
if len(parts) >= 3:
wxids.append(parts[2])
return wxids
except Exception as e:
logger.error(f"获取群成员 wxid 列表失败: {e}")
return []
async def markdown_to_image(self, markdown_content: str) -> Optional[str]:
"""
将 Markdown 内容转换为图片
Args:
markdown_content: Markdown 格式的内容
Returns:
图片文件路径,失败返回 None
"""
import urllib.parse
try:
# URL 编码 Markdown 内容
encoded_content = urllib.parse.quote(markdown_content)
# 调用 API
api_url = f"https://oiapi.net/api/MarkdownToImage?content={encoded_content}&height=1"
async with aiohttp.ClientSession() as session:
async with session.get(api_url, timeout=aiohttp.ClientTimeout(total=180)) as resp:
if resp.status != 200:
logger.error(f"Markdown 转图片 API 返回错误: {resp.status}")
return None
# 检查返回类型
content_type = resp.headers.get("Content-Type", "")
if "image" not in content_type.lower():
logger.error(f"API 返回非图片类型: {content_type}")
return None
# 保存图片
image_data = await resp.read()
output_path = self.temp_dir / f"leaderboard_{int(datetime.now().timestamp())}.png"
with open(output_path, "wb") as f:
f.write(image_data)
logger.success(f"Markdown 转图片成功: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Markdown 转图片失败: {e}")
return None
async def get_user_nickname_from_group(self, client: WechatHookClient,
group_wxid: str, user_wxid: str) -> str:
"""从群聊中获取用户昵称(优先使用缓存)"""
try:
# 动态获取缓存实例(由 MessageLogger 初始化)
redis_cache = get_cache()
# 1. 先尝试从 Redis 缓存获取
if redis_cache and redis_cache.enabled:
cached_info = redis_cache.get_user_basic_info(group_wxid, user_wxid)
if cached_info and cached_info.get("nickname"):
logger.debug(f"[缓存命中] {user_wxid}: {cached_info['nickname']}")
return cached_info["nickname"]
# 2. 缓存未命中,调用 API 获取
logger.debug(f"[缓存未命中] 调用API获取用户昵称: {user_wxid}")
user_info = await client.get_user_info_in_chatroom(group_wxid, user_wxid)
if user_info:
# 从返回的详细信息中提取昵称
nickname = user_info.get("nickName", {}).get("string", "")
if nickname:
logger.success(f"API获取用户昵称成功: {user_wxid} -> {nickname}")
# 3. 将用户信息存入缓存
if redis_cache and redis_cache.enabled:
redis_cache.set_user_info(group_wxid, user_wxid, user_info)
logger.debug(f"[已缓存] {user_wxid}: {nickname}")
return nickname
else:
logger.warning(f"用户 {user_wxid} 的昵称字段为空")
else:
logger.warning(f"未找到用户 {user_wxid} 在群 {group_wxid} 中的信息")
return ""
except Exception as e:
logger.error(f"获取群成员昵称失败: {e}")
return ""
async def download_avatar(self, avatar_url: str, user_wxid: str) -> Optional[str]:
"""下载用户头像"""
if not avatar_url:
return None
try:
async with aiohttp.ClientSession() as session:
async with session.get(avatar_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
avatar_data = await resp.read()
avatar_path = self.temp_dir / f"avatar_{user_wxid}.jpg"
with open(avatar_path, "wb") as f:
f.write(avatar_data)
return str(avatar_path)
except Exception as e:
logger.error(f"下载头像失败: {e}")
return None
def get_random_background(self) -> Optional[str]:
"""随机选择背景图片"""
if not self.images_dir.exists():
return None
image_files = list(self.images_dir.glob("*.jpg")) + list(self.images_dir.glob("*.png"))
if not image_files:
return None
return str(random.choice(image_files))
async def generate_signin_card(self, user_info: dict, points_earned: int,
streak: int, avatar_url: str = None) -> Optional[str]:
"""生成签到卡片(现代化横屏设计)"""
if not PIL_AVAILABLE:
return None
try:
# 创建横屏画布 (16:9 比例)
canvas_width, canvas_height = 800, 450
# 获取背景图片
bg_path = self.get_random_background()
if not bg_path:
# 创建默认渐变背景
img = Image.new('RGB', (canvas_width, canvas_height), (135, 206, 235))
else:
bg_img = Image.open(bg_path)
# 不拉伸图片,使用裁剪居中的方式
bg_width, bg_height = bg_img.size
# 计算缩放比例,保持宽高比
scale = max(canvas_width / bg_width, canvas_height / bg_height)
new_width = int(bg_width * scale)
new_height = int(bg_height * scale)
# 缩放图片
bg_img = bg_img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 居中裁剪
left = (new_width - canvas_width) // 2
top = (new_height - canvas_height) // 2
img = bg_img.crop((left, top, left + canvas_width, top + canvas_height))
# 轻微模糊背景
img = img.filter(ImageFilter.GaussianBlur(radius=1.5))
img = img.convert('RGBA')
# 创建白色卡片区域(现代化设计)
card_x, card_y = 50, 50
card_width, card_height = canvas_width - 100, canvas_height - 100
# 绘制白色半透明卡片背景
card_overlay = Image.new('RGBA', (canvas_width, canvas_height), (0, 0, 0, 0))
card_draw = ImageDraw.Draw(card_overlay)
# 圆角白色卡片
card_draw.rounded_rectangle(
(card_x, card_y, card_x + card_width, card_y + card_height),
radius=20, fill=(255, 255, 255, 240)
)
img = Image.alpha_composite(img, card_overlay)
draw = ImageDraw.Draw(img)
# 加载字体
try:
title_font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 32)
text_font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 20)
small_font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 16)
big_font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 24)
except:
title_font = ImageFont.load_default()
text_font = ImageFont.load_default()
small_font = ImageFont.load_default()
big_font = ImageFont.load_default()
# 下载并处理头像
avatar_img = None
if avatar_url:
avatar_path = await self.download_avatar(avatar_url, user_info.get('wxid', ''))
if avatar_path and os.path.exists(avatar_path):
try:
avatar_img = Image.open(avatar_path)
avatar_img = avatar_img.resize((80, 80))
# 创建圆形头像
mask = Image.new('L', (80, 80), 0)
mask_draw = ImageDraw.Draw(mask)
mask_draw.ellipse((0, 0, 80, 80), fill=255)
avatar_img.putalpha(mask)
except Exception as e:
logger.error(f"处理头像失败: {e}")
avatar_img = None
# 绘制头像
avatar_x, avatar_y = 80, 80
if avatar_img:
img.paste(avatar_img, (avatar_x, avatar_y), avatar_img)
else:
# 默认头像
draw.ellipse((avatar_x, avatar_y, avatar_x + 80, avatar_y + 80),
fill=(200, 200, 200), outline=(150, 150, 150), width=2)
draw.text((avatar_x + 40, avatar_y + 40), "头像", font=text_font,
fill=(100, 100, 100), anchor="mm")
# 用户名和日期
nickname = user_info.get('nickname', '用户')
today = datetime.now().strftime("%m/%d")
# 用户名
draw.text((avatar_x + 100, avatar_y + 10), nickname, font=title_font, fill=(50, 50, 50))
# 日期(右上角,调整位置避免超出边界)
date_bbox = draw.textbbox((0, 0), today, font=big_font)
date_width = date_bbox[2] - date_bbox[0]
draw.text((canvas_width - date_width - 70, 70), today, font=big_font, fill=(100, 100, 100))
# 签到成功信息
draw.text((avatar_x + 100, avatar_y + 50), "签到成功!", font=text_font, fill=(76, 175, 80))
draw.text((avatar_x + 100, avatar_y + 80), f"+{points_earned} 积分", font=text_font, fill=(255, 152, 0))
# 积分信息区域
total_points = user_info.get('points', 0)
info_y = 220
draw.text((100, info_y), f"总积分: {total_points}", font=text_font, fill=(50, 50, 50))
draw.text((350, info_y), f"连续签到: {streak}", font=text_font, fill=(50, 50, 50))
# 进度条
progress_x, progress_y = 100, 270
progress_width, progress_height = 600, 20
# 进度条背景
draw.rounded_rectangle(
(progress_x, progress_y, progress_x + progress_width, progress_y + progress_height),
radius=10, fill=(230, 230, 230)
)
# 进度条填充
progress = min(streak / 30, 1.0)
fill_width = int(progress_width * progress)
if fill_width > 0:
draw.rounded_rectangle(
(progress_x, progress_y, progress_x + fill_width, progress_y + progress_height),
radius=10, fill=(76, 175, 80)
)
# 进度条文字
progress_text = f"{streak}/30 天"
draw.text((progress_x + progress_width//2, progress_y + progress_height + 15),
progress_text, font=small_font, fill=(100, 100, 100), anchor="mm")
# 底部信息
city = user_info.get('city', '未设置')
draw.text((100, 340), f"城市: {city}", font=small_font, fill=(120, 120, 120))
# 保存图片
output_path = self.temp_dir / f"signin_{user_info.get('wxid', 'unknown')}_{int(datetime.now().timestamp())}.jpg"
img = img.convert('RGB')
img.save(output_path, 'JPEG', quality=95)
return str(output_path)
except Exception as e:
logger.error(f"生成签到卡片失败: {e}")
return None
async def generate_profile_card(self, user_info: dict, avatar_url: str = None) -> Optional[str]:
"""生成个人信息卡片(现代化横屏设计,与签到卡片保持一致)"""
if not PIL_AVAILABLE:
return None
try:
# 创建横屏画布 (16:9 比例) - 与签到卡片一致
canvas_width, canvas_height = 800, 450
# 获取背景图片
bg_path = self.get_random_background()
if not bg_path:
# 创建默认渐变背景
img = Image.new('RGB', (canvas_width, canvas_height), (135, 206, 235))
else:
bg_img = Image.open(bg_path)
# 不拉伸图片,使用裁剪居中的方式
bg_width, bg_height = bg_img.size
# 计算缩放比例,保持宽高比
scale = max(canvas_width / bg_width, canvas_height / bg_height)
new_width = int(bg_width * scale)
new_height = int(bg_height * scale)
# 缩放图片
bg_img = bg_img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 居中裁剪
left = (new_width - canvas_width) // 2
top = (new_height - canvas_height) // 2
img = bg_img.crop((left, top, left + canvas_width, top + canvas_height))
# 轻微模糊背景
img = img.filter(ImageFilter.GaussianBlur(radius=1.5))
img = img.convert('RGBA')
# 创建白色卡片区域(现代化设计)
card_x, card_y = 50, 50
card_width, card_height = canvas_width - 100, canvas_height - 100
# 绘制白色半透明卡片背景
card_overlay = Image.new('RGBA', (canvas_width, canvas_height), (0, 0, 0, 0))
card_draw = ImageDraw.Draw(card_overlay)
# 圆角白色卡片
card_draw.rounded_rectangle(
(card_x, card_y, card_x + card_width, card_y + card_height),
radius=20, fill=(255, 255, 255, 240)
)
img = Image.alpha_composite(img, card_overlay)
draw = ImageDraw.Draw(img)
# 加载字体
try:
title_font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 32)
text_font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 20)
small_font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 16)
big_font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 24)
except:
title_font = ImageFont.load_default()
text_font = ImageFont.load_default()
small_font = ImageFont.load_default()
big_font = ImageFont.load_default()
# 下载并处理头像
avatar_img = None
if avatar_url:
avatar_path = await self.download_avatar(avatar_url, user_info.get('wxid', ''))
if avatar_path and os.path.exists(avatar_path):
try:
avatar_img = Image.open(avatar_path)
avatar_img = avatar_img.resize((80, 80))
# 创建圆形头像
mask = Image.new('L', (80, 80), 0)
mask_draw = ImageDraw.Draw(mask)
mask_draw.ellipse((0, 0, 80, 80), fill=255)
avatar_img.putalpha(mask)
except Exception as e:
logger.error(f"处理头像失败: {e}")
avatar_img = None
# 绘制头像
avatar_x, avatar_y = 80, 80
if avatar_img:
img.paste(avatar_img, (avatar_x, avatar_y), avatar_img)
else:
# 默认头像
draw.ellipse((avatar_x, avatar_y, avatar_x + 80, avatar_y + 80),
fill=(200, 200, 200), outline=(150, 150, 150), width=2)
draw.text((avatar_x + 40, avatar_y + 40), "头像", font=text_font,
fill=(100, 100, 100), anchor="mm")
# 用户名和日期
nickname = user_info.get('nickname', '用户')
today = datetime.now().strftime("%m/%d")
# 用户名
draw.text((avatar_x + 100, avatar_y + 10), nickname, font=title_font, fill=(50, 50, 50))
# 日期(右上角,调整位置避免超出边界)
date_bbox = draw.textbbox((0, 0), today, font=big_font)
date_width = date_bbox[2] - date_bbox[0]
draw.text((canvas_width - date_width - 70, 70), today, font=big_font, fill=(100, 100, 100))
# 个人信息标题
draw.text((avatar_x + 100, avatar_y + 50), "个人信息", font=text_font, fill=(33, 150, 243))
# 积分信息区域
total_points = user_info.get('points', 0)
streak = user_info.get('signin_streak', 0)
total_days = user_info.get('total_signin_days', 0)
info_y = 220
draw.text((100, info_y), f"总积分: {total_points}", font=text_font, fill=(50, 50, 50))
draw.text((350, info_y), f"连续签到: {streak}", font=text_font, fill=(50, 50, 50))
draw.text((100, info_y + 30), f"累计签到: {total_days}", font=text_font, fill=(50, 50, 50))
# 进度条
progress_x, progress_y = 100, 290
progress_width, progress_height = 600, 20
# 进度条背景
draw.rounded_rectangle(
(progress_x, progress_y, progress_x + progress_width, progress_y + progress_height),
radius=10, fill=(230, 230, 230)
)
# 进度条填充
progress = min(streak / 30, 1.0)
fill_width = int(progress_width * progress)
if fill_width > 0:
draw.rounded_rectangle(
(progress_x, progress_y, progress_x + fill_width, progress_y + progress_height),
radius=10, fill=(33, 150, 243)
)
# 进度条文字
progress_text = f"{streak}/30 天"
draw.text((progress_x + progress_width//2, progress_y + progress_height + 15),
progress_text, font=small_font, fill=(100, 100, 100), anchor="mm")
# 底部信息
city = user_info.get('city', '未设置')
draw.text((100, 360), f"城市: {city}", font=small_font, fill=(120, 120, 120))
# 保存图片
output_path = self.temp_dir / f"profile_{user_info.get('wxid', 'unknown')}_{int(datetime.now().timestamp())}.jpg"
img = img.convert('RGB')
img.save(output_path, 'JPEG', quality=95)
return str(output_path)
except Exception as e:
logger.error(f"生成个人信息卡片失败: {e}")
return None
async def send_image_file(self, client, to_wxid: str, image_path: str) -> bool:
"""发送图片文件使用API文档中的正确格式"""
try:
import os
# 检查文件是否存在
if not os.path.exists(image_path):
logger.error(f"图片文件不存在: {image_path}")
return False
# 获取文件大小
file_size = os.path.getsize(image_path)
logger.info(f"准备发送图片文件: {image_path}, 大小: {file_size} bytes")
# 直接使用API文档中的格式发送图片 (type=11040)
data = {
"to_wxid": to_wxid,
"file": image_path
}
logger.info(f"使用API文档格式发送图片: type=11040, data={data}")
result = await client._send_data_async(11040, data)
if result:
logger.success(f"图片发送成功: {image_path}")
return True
else:
logger.error(f"图片发送失败: {image_path}")
return False
except Exception as e:
logger.error(f"发送图片文件失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return False
def is_admin(self, wxid: str) -> bool:
"""检查用户是否是管理员"""
admins = self.config["signin"].get("admins", [])
return wxid in admins
def check_can_signin(self, wxid: str) -> Tuple[bool, Optional[dict]]:
"""检查用户是否可以签到"""
# 检查是否是管理员且允许无限制签到
if self.config["signin"].get("admin_unlimited_signin", False) and self.is_admin(wxid):
user_info = self.get_user_info(wxid)
return True, user_info # 管理员可以无限制签到
user_info = self.get_user_info(wxid)
if not user_info:
return True, None # 新用户可以签到
last_signin = user_info.get("last_signin_date")
if not last_signin:
return True, user_info # 从未签到过
today = date.today()
if isinstance(last_signin, str):
last_signin = datetime.strptime(last_signin, "%Y-%m-%d").date()
return last_signin < today, user_info
def calculate_signin_reward(self, current_streak: int) -> Tuple[int, int]:
"""计算签到奖励"""
# 基础随机积分
base_points = random.randint(
self.config["signin"]["min_points"],
self.config["signin"]["max_points"]
)
# 连续签到奖励
bonus_points = 0
bonus_streak_days = self.config["signin"]["bonus_streak_days"]
if current_streak > 0 and (current_streak + 1) % bonus_streak_days == 0:
bonus_points = self.config["signin"]["bonus_points"]
return base_points, bonus_points
def update_signin_record(self, wxid: str, nickname: str, points_earned: int,
new_streak: int, user_info: Optional[dict] = None) -> bool:
"""更新签到记录"""
try:
with self.get_db_connection() as conn:
with conn.cursor() as cursor:
today = date.today()
# 更新用户签到表
if user_info:
# 更新现有用户
new_points = user_info["points"] + points_earned
new_total_days = user_info["total_signin_days"] + 1
sql_update = """
UPDATE user_signin
SET nickname = CASE WHEN %s != '' THEN %s ELSE nickname END,
points = %s,
last_signin_date = %s,
signin_streak = %s,
total_signin_days = %s,
updated_at = NOW()
WHERE wxid = %s
"""
cursor.execute(sql_update, (
nickname, nickname, new_points, today,
new_streak, new_total_days, wxid
))
else:
# 新用户
sql_insert = """
INSERT INTO user_signin
(wxid, nickname, points, last_signin_date, signin_streak, total_signin_days)
VALUES (%s, %s, %s, %s, %s, 1)
"""
cursor.execute(sql_insert, (
wxid, nickname, points_earned, today, new_streak
))
# 插入签到记录(管理员无限制签到时使用 ON DUPLICATE KEY UPDATE
if self.config["signin"].get("admin_unlimited_signin", False) and self.is_admin(wxid):
# 管理员无限制签到,更新现有记录而不是插入新记录
sql_record = """
INSERT INTO signin_records
(wxid, nickname, signin_date, points_earned, signin_streak)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
points_earned = points_earned + VALUES(points_earned),
signin_streak = VALUES(signin_streak)
"""
else:
# 普通用户,正常插入
sql_record = """
INSERT INTO signin_records
(wxid, nickname, signin_date, points_earned, signin_streak)
VALUES (%s, %s, %s, %s, %s)
"""
cursor.execute(sql_record, (
wxid, nickname, today, points_earned, new_streak
))
# 记录积分变动到 points_history
points_before = user_info["points"] if user_info else 0
points_after = points_before + points_earned
sql_points_history = """
INSERT INTO points_history
(wxid, nickname, change_type, points_change, points_before, points_after, description, related_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql_points_history, (
wxid, nickname, "signin", points_earned, points_before, points_after,
f"签到获得 {points_earned} 积分(连续{new_streak}天)", str(today)
))
return True
except Exception as e:
logger.error(f"更新签到记录失败: {e}")
return False
def calculate_new_streak(self, user_info: Optional[dict]) -> int:
"""计算新的连续签到天数"""
if not user_info or not user_info.get("last_signin_date"):
return 1 # 首次签到
last_signin = user_info["last_signin_date"]
if isinstance(last_signin, str):
last_signin = datetime.strptime(last_signin, "%Y-%m-%d").date()
yesterday = date.today() - timedelta(days=1)
if last_signin == yesterday:
# 连续签到
return user_info["signin_streak"] + 1
else:
# 中断了,重新开始
return 1
@on_text_message(priority=60)
async def handle_messages(self, client: WechatHookClient, message: dict):
"""处理所有相关消息"""
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
sender_wxid = message.get("SenderWxid", "")
is_group = message.get("IsGroup", False)
# 获取实际发送者群聊中使用SenderWxid私聊使用FromWxid
user_wxid = sender_wxid if is_group else from_wxid
# 检查是否是签到关键词(精确匹配)
signin_keywords = self.config["signin"]["keywords"]
if content in signin_keywords:
await self.handle_signin(client, message, user_wxid, from_wxid, is_group)
return False
# 检查是否是个人信息查询(精确匹配)
profile_keywords = self.config["signin"]["profile_keywords"]
logger.info(f"检查个人信息关键词: content='{content}', keywords={profile_keywords}")
if content in profile_keywords:
logger.info(f"匹配到个人信息查询关键词: {content}")
await self.handle_profile_query(client, message, user_wxid, from_wxid, is_group)
return False
# 检查是否是城市注册
register_keywords = self.config["signin"]["register_keywords"]
if any(content.startswith(keyword) for keyword in register_keywords):
await self.handle_city_register(client, message, user_wxid, from_wxid, content)
return False
# 检查是否是积分榜查询
leaderboard_keywords = self.config["signin"].get("leaderboard_keywords", ["/积分榜", "积分榜", "积分排行"])
if content in leaderboard_keywords:
await self.handle_leaderboard(client, message, from_wxid, is_group)
return False
# 检查是否是更新群成员信息指令
update_keywords = self.config["signin"].get("update_keywords", ["/更新信息", "更新信息"])
if content in update_keywords and is_group:
await self.handle_update_members(client, from_wxid)
return False
return True # 不是相关消息,继续处理
async def handle_signin(self, client: WechatHookClient, message: dict,
user_wxid: str, from_wxid: str, is_group: bool):
"""处理签到消息"""
logger.info(f"用户 {user_wxid} 尝试签到")
try:
# 获取用户昵称
nickname = ""
if is_group:
nickname = await self.get_user_nickname_from_group(client, from_wxid, user_wxid)
# 检查是否可以签到
can_signin, user_info = self.check_can_signin(user_wxid)
# 更新昵称(如果获取到了新昵称)
if nickname and user_info and user_info.get("nickname") != nickname:
self.update_user_nickname(user_wxid, nickname)
if not can_signin:
# 今天已经签到过了
current_points = user_info["points"] if user_info else 0
current_streak = user_info["signin_streak"] if user_info else 0
reply = self.config["messages"]["already_signed"].format(
total_points=current_points,
streak=current_streak
)
await client.send_text(from_wxid, reply)
return
# 确保用户存在
if not user_info:
self.create_or_update_user(user_wxid, nickname)
user_info = self.get_user_info(user_wxid)
# 计算连续签到天数
new_streak = self.calculate_new_streak(user_info)
# 计算奖励积分
base_points, bonus_points = self.calculate_signin_reward(
user_info["signin_streak"] if user_info else 0
)
total_earned = base_points + bonus_points
# 更新签到记录
if self.update_signin_record(user_wxid, nickname, total_earned, new_streak, user_info):
# 获取更新后的积分
updated_user = self.get_user_info(user_wxid)
current_points = updated_user["points"] if updated_user else total_earned
updated_user["points"] = current_points
# 尝试获取用户头像(优先使用缓存)
avatar_url = None
if is_group:
try:
redis_cache = get_cache()
# 先从缓存获取
if redis_cache and redis_cache.enabled:
cached_info = redis_cache.get_user_basic_info(from_wxid, user_wxid)
if cached_info:
avatar_url = cached_info.get("avatar_url", "")
# 缓存未命中则调用 API
if not avatar_url:
user_detail = await client.get_user_info_in_chatroom(from_wxid, user_wxid)
if user_detail:
avatar_url = user_detail.get("bigHeadImgUrl", "")
# 存入缓存
if redis_cache and redis_cache.enabled:
redis_cache.set_user_info(from_wxid, user_wxid, user_detail)
except Exception as e:
logger.warning(f"获取用户头像失败: {e}")
# 尝试生成图片卡片
logger.info(f"开始生成签到卡片: user={updated_user}, earned={total_earned}, streak={new_streak}")
card_path = await self.generate_signin_card(updated_user, total_earned, new_streak, avatar_url)
logger.info(f"签到卡片生成结果: {card_path}")
if card_path and os.path.exists(card_path):
logger.info(f"签到卡片文件存在,准备发送: {card_path}")
# 发送图片卡片
try:
success = await self.send_image_file(client, from_wxid, card_path)
if success:
logger.success(f"用户 {user_wxid} 签到成功,发送图片卡片")
else:
raise Exception("CDN图片发送失败")
# 暂时不删除临时文件,便于调试
logger.info(f"签到卡片已保存: {card_path}")
# try:
# os.remove(card_path)
# except:
# pass
except Exception as e:
logger.error(f"发送图片卡片失败: {e}")
# 发送文本消息作为备用
reply = self.config["messages"]["success"].format(
points=base_points,
total_points=current_points,
streak=new_streak
)
if bonus_points > 0:
bonus_msg = self.config["messages"]["bonus_message"].format(
days=self.config["signin"]["bonus_streak_days"],
bonus=bonus_points
)
reply += bonus_msg
await client.send_text(from_wxid, reply)
else:
# 生成图片失败,发送文本消息
reply = self.config["messages"]["success"].format(
points=base_points,
total_points=current_points,
streak=new_streak
)
if bonus_points > 0:
bonus_msg = self.config["messages"]["bonus_message"].format(
days=self.config["signin"]["bonus_streak_days"],
bonus=bonus_points
)
reply += bonus_msg
await client.send_text(from_wxid, reply)
logger.success(f"用户 {user_wxid} 签到成功,发送文本消息")
else:
# 签到失败
await client.send_text(from_wxid, self.config["messages"]["error"])
logger.error(f"用户 {user_wxid} 签到失败")
except Exception as e:
logger.error(f"处理签到消息失败: {e}")
await client.send_text(from_wxid, self.config["messages"]["error"])
async def handle_profile_query(self, client: WechatHookClient, message: dict,
user_wxid: str, from_wxid: str, is_group: bool):
"""处理个人信息查询"""
logger.info(f"用户 {user_wxid} 查询个人信息")
try:
# 获取用户昵称
nickname = ""
if is_group:
nickname = await self.get_user_nickname_from_group(client, from_wxid, user_wxid)
# 获取用户信息
user_info = self.get_user_info(user_wxid)
if not user_info:
# 用户不存在,创建新用户
self.create_or_update_user(user_wxid, nickname)
user_info = self.get_user_info(user_wxid)
else:
# 更新昵称(如果获取到了新昵称)
if nickname and user_info.get("nickname") != nickname:
self.update_user_nickname(user_wxid, nickname)
user_info["nickname"] = nickname
# 尝试获取用户头像(优先使用缓存)
avatar_url = None
if is_group:
try:
redis_cache = get_cache()
# 先从缓存获取
if redis_cache and redis_cache.enabled:
cached_info = redis_cache.get_user_basic_info(from_wxid, user_wxid)
if cached_info:
avatar_url = cached_info.get("avatar_url", "")
# 缓存未命中则调用 API
if not avatar_url:
user_detail = await client.get_user_info_in_chatroom(from_wxid, user_wxid)
if user_detail:
avatar_url = user_detail.get("bigHeadImgUrl", "")
# 存入缓存
if redis_cache and redis_cache.enabled:
redis_cache.set_user_info(from_wxid, user_wxid, user_detail)
except Exception as e:
logger.warning(f"获取用户头像失败: {e}")
# 尝试生成个人信息卡片
logger.info(f"PIL_AVAILABLE: {PIL_AVAILABLE}")
card_path = await self.generate_profile_card(user_info, avatar_url)
logger.info(f"生成的卡片路径: {card_path}")
if card_path and os.path.exists(card_path):
# 发送图片卡片
try:
success = await self.send_image_file(client, from_wxid, card_path)
if success:
logger.success(f"用户 {user_wxid} 个人信息查询成功,发送图片卡片")
else:
raise Exception("CDN图片发送失败")
# 暂时不删除临时文件,便于调试
logger.info(f"个人信息卡片已保存: {card_path}")
# try:
# os.remove(card_path)
# except:
# pass
except Exception as e:
logger.error(f"发送个人信息图片卡片失败: {e}")
# 发送文本消息作为备用
display_nickname = user_info.get("nickname") or "未设置"
display_city = user_info.get("city") or "未设置"
reply = self.config["messages"]["profile"].format(
nickname=display_nickname,
points=user_info.get("points", 0),
streak=user_info.get("signin_streak", 0),
city=display_city
)
await client.send_text(from_wxid, reply)
else:
# 生成图片失败,发送文本消息
display_nickname = user_info.get("nickname") or "未设置"
display_city = user_info.get("city") or "未设置"
reply = self.config["messages"]["profile"].format(
nickname=display_nickname,
points=user_info.get("points", 0),
streak=user_info.get("signin_streak", 0),
city=display_city
)
await client.send_text(from_wxid, reply)
logger.success(f"用户 {user_wxid} 个人信息查询成功,发送文本消息")
except Exception as e:
logger.error(f"处理个人信息查询失败: {e}")
await client.send_text(from_wxid, self.config["messages"]["error"])
async def handle_city_register(self, client: WechatHookClient, message: dict,
user_wxid: str, from_wxid: str, content: str):
"""处理城市注册"""
logger.info(f"用户 {user_wxid} 尝试注册城市")
try:
# 解析城市名称
parts = content.split()
if len(parts) < 2:
# 格式错误
await client.send_text(from_wxid, self.config["messages"]["register_format"])
return
city = parts[1].strip()
if not city:
await client.send_text(from_wxid, self.config["messages"]["register_format"])
return
# 更新用户城市信息
if self.update_user_city(user_wxid, city):
reply = self.config["messages"]["register_success"].format(city=city)
await client.send_text(from_wxid, reply)
logger.success(f"用户 {user_wxid} 城市注册成功: {city}")
else:
await client.send_text(from_wxid, self.config["messages"]["error"])
logger.error(f"用户 {user_wxid} 城市注册失败")
except Exception as e:
logger.error(f"处理城市注册失败: {e}")
await client.send_text(from_wxid, self.config["messages"]["error"])
async def handle_leaderboard(self, client: WechatHookClient, message: dict,
from_wxid: str, is_group: bool):
"""处理积分榜查询"""
logger.info(f"查询积分榜: from={from_wxid}, is_group={is_group}")
try:
# 获取排行榜配置
limit = self.config["signin"].get("leaderboard_limit", 10)
# 获取群成员列表和头像(从缓存)
redis_cache = get_cache()
group_member_wxids = None
user_avatars = {}
is_filtered = False # 标记是否成功过滤
if is_group and redis_cache and redis_cache.enabled:
# 获取群成员 wxid 列表(用于过滤排行榜)
group_member_wxids = self.get_group_member_wxids(from_wxid)
if group_member_wxids:
logger.info(f"从缓存获取到 {len(group_member_wxids)} 个群成员")
is_filtered = True
else:
logger.warning(f"未找到群成员缓存,将显示全局排行。请先执行 /更新信息")
# 获取排行榜数据(如果有群成员列表则只查询群内用户)
if group_member_wxids:
leaderboard = self.get_points_leaderboard(wxid_list=group_member_wxids, limit=limit)
else:
leaderboard = self.get_points_leaderboard(limit=limit)
if not leaderboard:
await client.send_text(from_wxid, "暂无排行数据\n提示:请先执行 /更新信息 更新群成员")
return
# 获取用户头像
if redis_cache and redis_cache.enabled and is_group:
for user in leaderboard:
wxid = user.get("wxid", "")
cached_info = redis_cache.get_user_basic_info(from_wxid, wxid)
if cached_info and cached_info.get("avatar_url"):
user_avatars[wxid] = cached_info["avatar_url"]
# 生成 Markdown + HTML 格式排行榜
markdown_lines = [
"# 🏆 积分排行榜",
""
]
# 奖牌表情
medals = ["🥇", "🥈", "🥉"]
for i, user in enumerate(leaderboard):
rank = i + 1
wxid = user.get("wxid", "")
nickname = user.get("nickname") or "未知用户"
points = user.get("points", 0)
streak = user.get("signin_streak", 0)
# 截断过长的昵称
if len(nickname) > 12:
nickname = nickname[:11] + ""
# 头像 HTML固定 32x32 圆形)
avatar_url = user_avatars.get(wxid, "")
if avatar_url:
avatar_html = f'<img src="{avatar_url}" width="32" height="32" style="border-radius: 50%; vertical-align: middle; margin-right: 8px;">'
else:
avatar_html = '<span style="display: inline-block; width: 32px; height: 32px; border-radius: 50%; background: #ddd; text-align: center; line-height: 32px; margin-right: 8px; vertical-align: middle;">👤</span>'
# 排名显示
if rank <= 3:
prefix = medals[rank - 1]
# 前三名加粗
markdown_lines.append(f'{prefix} {avatar_html} **{nickname}** — {points}分 · 连签{streak}')
else:
markdown_lines.append(f'`{rank}.` {avatar_html} {nickname}{points}分 · 连签{streak}')
# 每行之间加空行,避免挤在一起
markdown_lines.append("")
markdown_lines.append("---")
# 显示是否为本群排行
if is_group and is_filtered:
markdown_lines.append(f"*本群共 {len(leaderboard)} 人上榜*")
elif is_group:
markdown_lines.append(f"*全局排行(共 {len(leaderboard)} 人)*")
markdown_lines.append("*提示:发送 /更新信息 可查看本群排行*")
else:
markdown_lines.append(f"*共 {len(leaderboard)} 人上榜*")
markdown_content = "\n".join(markdown_lines)
logger.debug(f"生成的 Markdown:\n{markdown_content}")
# 转换为图片
image_path = await self.markdown_to_image(markdown_content)
if image_path and os.path.exists(image_path):
# 发送图片
success = await self.send_image_file(client, from_wxid, image_path)
if success:
logger.success(f"积分榜图片发送成功")
else:
# 图片发送失败,发送文本
await self._send_leaderboard_text(client, from_wxid, leaderboard, is_filtered)
else:
# 图片生成失败,发送文本
await self._send_leaderboard_text(client, from_wxid, leaderboard, is_filtered)
except Exception as e:
logger.error(f"处理积分榜查询失败: {e}")
import traceback
logger.error(traceback.format_exc())
await client.send_text(from_wxid, self.config["messages"]["error"])
async def _send_leaderboard_text(self, client: WechatHookClient, from_wxid: str,
leaderboard: List[dict], is_filtered: bool = False):
"""发送文本格式的排行榜(备用方案)"""
lines = ["🏆 积分排行榜", "" * 20]
medals = ["🥇", "🥈", "🥉"]
for i, user in enumerate(leaderboard):
rank = i + 1
nickname = user.get("nickname") or "未知用户"
points = user.get("points", 0)
streak = user.get("signin_streak", 0)
if rank <= 3:
prefix = medals[rank - 1]
else:
prefix = f"{rank}."
if len(nickname) > 8:
nickname = nickname[:7] + ""
lines.append(f"{prefix} {nickname} {points}分 连签{streak}")
lines.append("" * 20)
if is_filtered:
lines.append(f"本群共 {len(leaderboard)} 人上榜")
else:
lines.append(f"{len(leaderboard)} 人上榜")
lines.append("提示:发送 /更新信息 可查看本群排行")
await client.send_text(from_wxid, "\n".join(lines))
logger.success(f"积分榜文本发送成功")
async def handle_update_members(self, client: WechatHookClient, group_wxid: str):
"""处理更新群成员信息指令"""
logger.info(f"开始更新群成员信息: {group_wxid}")
try:
# 先发送提示
await client.send_text(group_wxid, "⏳ 正在更新群成员信息,请稍候...")
# 执行更新
success, total = await self.update_group_members_info(client, group_wxid)
if total > 0:
await client.send_text(group_wxid, f"✅ 群成员信息更新完成\n成功: {success}/{total}")
else:
await client.send_text(group_wxid, "❌ 更新失败,无法获取群成员列表")
except Exception as e:
logger.error(f"处理更新群成员信息失败: {e}")
await client.send_text(group_wxid, "❌ 更新失败,请稍后重试")
@schedule('cron', day_of_week='wed', hour=3, minute=0)
async def scheduled_update_members(self, bot=None):
"""每周三凌晨3点自动更新群成员信息"""
# 检查是否启用自动更新
if not self.config["signin"].get("auto_update_enabled", False):
logger.debug("自动更新群成员信息未启用")
return
logger.info("开始执行定时任务:更新群成员信息")
try:
# 获取 bot 实例
if not bot:
from utils.plugin_manager import PluginManager
bot = PluginManager().bot
if not bot:
logger.error("定时任务:无法获取 bot 实例")
return
# 获取需要更新的群组列表
target_groups = self.config["signin"].get("auto_update_groups", [])
if not target_groups:
logger.warning("未配置自动更新群组列表,跳过定时任务")
return
total_success = 0
total_count = 0
# 逐个更新群组(队列方式,不并发)
for group_wxid in target_groups:
logger.info(f"定时任务:更新群 {group_wxid} 的成员信息")
try:
success, total = await self.update_group_members_info(bot, group_wxid)
total_success += success
total_count += total
logger.info(f"{group_wxid} 更新完成: {success}/{total}")
# 群组之间间隔一段时间
await asyncio.sleep(5)
except Exception as e:
logger.error(f"更新群 {group_wxid} 失败: {e}")
continue
logger.success(f"定时任务完成:共更新 {len(target_groups)} 个群,成功 {total_success}/{total_count}")
except Exception as e:
logger.error(f"定时任务执行失败: {e}")
def get_llm_tools(self) -> List[dict]:
"""返回LLM工具定义供AIChat插件调用"""
return [
{
"type": "function",
"function": {
"name": "user_signin",
"description": "用户签到,获取积分奖励",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "check_profile",
"description": "查看用户个人信息,包括积分、连续签到天数等",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "register_city",
"description": "注册或更新用户城市信息",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称"
}
},
"required": ["city"]
}
}
}
]
async def execute_llm_tool(self, tool_name: str, arguments: dict, bot, from_wxid: str) -> dict:
"""执行LLM工具调用供AIChat插件调用"""
try:
# 从 arguments 中获取用户信息AIChat 插件已经传递)
user_wxid = arguments.get("user_wxid", from_wxid)
is_group = arguments.get("is_group", from_wxid.endswith("@chatroom"))
# 构造消息对象
message = {
"FromWxid": from_wxid,
"SenderWxid": user_wxid if is_group else from_wxid,
"IsGroup": is_group,
"Content": ""
}
# 确保使用正确的用户 wxid
if not is_group:
user_wxid = from_wxid
if tool_name == "user_signin":
# 执行签到
await self.handle_signin(bot, message, user_wxid, from_wxid, is_group)
return {"success": True, "message": "签到请求已处理"}
elif tool_name == "check_profile":
# 查看个人信息
await self.handle_profile_query(bot, message, user_wxid, from_wxid, is_group)
return {"success": True, "message": "个人信息查询已处理"}
elif tool_name == "register_city":
# 注册城市
city = arguments.get("city")
if not city:
return {"success": False, "message": "缺少城市参数"}
content = f"注册城市 {city}"
await self.handle_city_register(bot, message, user_wxid, from_wxid, content)
return {"success": True, "message": f"城市注册请求已处理: {city}"}
else:
return None
except Exception as e:
logger.error(f"LLM工具执行失败: {e}")
return {"success": False, "message": f"执行失败: {str(e)}"}