572 lines
22 KiB
Python
572 lines
22 KiB
Python
"""
|
||
鹿打卡插件
|
||
|
||
用户通过发送"🦌"表情进行每日打卡,插件会自动记录并生成月度打卡日历图
|
||
"""
|
||
|
||
import asyncio
|
||
import sqlite3
|
||
import calendar
|
||
import re
|
||
import time
|
||
import os
|
||
from datetime import date
|
||
from pathlib import Path
|
||
from typing import Dict, Optional, List
|
||
from loguru import logger
|
||
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
|
||
try:
|
||
import tomllib
|
||
except ImportError:
|
||
import tomli as tomllib
|
||
|
||
from utils.plugin_base import PluginBase
|
||
from utils.decorators import on_text_message
|
||
from WechatHook import WechatHookClient
|
||
|
||
|
||
class DeerCheckin(PluginBase):
|
||
"""鹿打卡插件"""
|
||
|
||
description = "鹿打卡插件 - 发送🦌表情进行打卡并生成月度日历"
|
||
author = "Assistant"
|
||
version = "1.0.0"
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.config = None
|
||
self.db_path = None
|
||
self.font_path = None
|
||
self.temp_dir = None
|
||
self._initialized = False
|
||
logger.info("DeerCheckin插件__init__完成")
|
||
|
||
async def async_init(self):
|
||
"""异步初始化"""
|
||
logger.info("鹿打卡插件开始初始化...")
|
||
try:
|
||
config_path = Path(__file__).parent / "config.toml"
|
||
if not config_path.exists():
|
||
logger.error(f"鹿打卡插件配置文件不存在: {config_path}")
|
||
return
|
||
|
||
with open(config_path, "rb") as f:
|
||
self.config = tomllib.load(f)
|
||
logger.info("配置文件加载成功")
|
||
|
||
# 设置路径
|
||
plugin_dir = Path(__file__).parent
|
||
self.db_path = plugin_dir / "deer_checkin.db"
|
||
self.font_path = plugin_dir / "resources" / "font.ttf"
|
||
self.temp_dir = plugin_dir / "tmp"
|
||
|
||
# 确保目录存在
|
||
self.temp_dir.mkdir(exist_ok=True)
|
||
|
||
# 初始化数据库
|
||
await self._init_db()
|
||
await self._monthly_cleanup()
|
||
|
||
self._initialized = True
|
||
logger.success("鹿打卡插件已加载")
|
||
|
||
except Exception as e:
|
||
logger.error(f"鹿打卡插件初始化失败: {e}")
|
||
self.config = None
|
||
|
||
async def _init_db(self):
|
||
"""初始化数据库"""
|
||
try:
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS checkin (
|
||
user_id TEXT NOT NULL,
|
||
checkin_date TEXT NOT NULL,
|
||
deer_count INTEGER NOT NULL,
|
||
PRIMARY KEY (user_id, checkin_date)
|
||
)
|
||
''')
|
||
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS metadata (
|
||
key TEXT PRIMARY KEY,
|
||
value TEXT
|
||
)
|
||
''')
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
logger.info("鹿打卡数据库初始化成功")
|
||
except Exception as e:
|
||
logger.error(f"数据库初始化失败: {e}")
|
||
|
||
async def _monthly_cleanup(self):
|
||
"""月度数据清理"""
|
||
current_month = date.today().strftime("%Y-%m")
|
||
try:
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute("SELECT value FROM metadata WHERE key = 'last_cleanup_month'")
|
||
result = cursor.fetchone()
|
||
|
||
if not result or result[0] != current_month:
|
||
cursor.execute("DELETE FROM checkin WHERE strftime('%Y-%m', checkin_date) != ?", (current_month,))
|
||
cursor.execute("INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)",
|
||
("last_cleanup_month", current_month))
|
||
conn.commit()
|
||
logger.info(f"已执行月度清理,现在是 {current_month}")
|
||
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error(f"月度数据清理失败: {e}")
|
||
|
||
@on_text_message(priority=90)
|
||
async def handle_deer_message(self, bot: WechatHookClient, message: dict):
|
||
"""处理鹿相关消息"""
|
||
content = message.get("Content", "").strip()
|
||
if content.startswith("🦌"):
|
||
logger.info(f"收到鹿消息: {content}, 初始化状态: {self._initialized}, 配置状态: {self.config is not None}")
|
||
|
||
if not self._initialized or not self.config:
|
||
return True
|
||
|
||
content = message.get("Content", "").strip()
|
||
from_wxid = message.get("FromWxid", "")
|
||
sender_wxid = message.get("SenderWxid", "")
|
||
is_group = message.get("IsGroup", False)
|
||
|
||
# 获取用户ID和昵称
|
||
user_id = sender_wxid if is_group else from_wxid
|
||
nickname = message.get("SenderNickname", "") or "用户"
|
||
|
||
# 检查是否启用
|
||
if not self.config["behavior"]["enabled"]:
|
||
logger.info(f"DeerCheckin插件未启用,跳过消息: {content}")
|
||
return True
|
||
|
||
# 检查群聊/私聊权限
|
||
if is_group and not self.config["behavior"]["enable_group"]:
|
||
logger.info(f"DeerCheckin插件群聊未启用,跳过消息: {content}")
|
||
return True
|
||
if not is_group and not self.config["behavior"]["enable_private"]:
|
||
logger.info(f"DeerCheckin插件私聊未启用,跳过消息: {content}")
|
||
return True
|
||
|
||
# 只处理🦌相关消息
|
||
if not content.startswith("🦌"):
|
||
return True
|
||
|
||
logger.info(f"DeerCheckin插件处理消息: {content}")
|
||
# 处理不同命令
|
||
if re.match(r'^🦌+$', content):
|
||
# 打卡命令
|
||
await self._handle_checkin(bot, from_wxid, user_id, nickname, content)
|
||
return False
|
||
elif content == "🦌日历":
|
||
# 查看日历
|
||
await self._handle_calendar(bot, from_wxid, user_id, nickname)
|
||
return False
|
||
elif content == "🦌帮助":
|
||
# 帮助信息
|
||
await self._handle_help(bot, from_wxid)
|
||
return False
|
||
elif re.match(r'^🦌补签\s+(\d{1,2})\s+(\d+)\s*$', content):
|
||
# 补签命令
|
||
await self._handle_retro_checkin(bot, from_wxid, user_id, nickname, content)
|
||
return False
|
||
|
||
return True
|
||
|
||
async def _handle_checkin(self, bot: WechatHookClient, from_wxid: str, user_id: str, nickname: str, content: str):
|
||
"""处理打卡"""
|
||
# 获取真实昵称
|
||
if from_wxid.endswith("@chatroom"):
|
||
try:
|
||
user_info = await bot.get_user_info_in_chatroom(from_wxid, user_id)
|
||
if user_info and user_info.get("nickName", {}).get("string"):
|
||
nickname = user_info["nickName"]["string"]
|
||
except:
|
||
pass
|
||
|
||
deer_count = content.count("🦌")
|
||
today_str = date.today().strftime("%Y-%m-%d")
|
||
|
||
try:
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
INSERT INTO checkin (user_id, checkin_date, deer_count)
|
||
VALUES (?, ?, ?)
|
||
ON CONFLICT(user_id, checkin_date)
|
||
DO UPDATE SET deer_count = deer_count + excluded.deer_count
|
||
''', (user_id, today_str, deer_count))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
logger.info(f"用户 {nickname} ({user_id}) 打卡成功,记录了 {deer_count} 个🦌")
|
||
|
||
# 生成并发送日历
|
||
await self._generate_and_send_calendar(bot, from_wxid, user_id, nickname)
|
||
|
||
except Exception as e:
|
||
logger.error(f"记录打卡数据失败: {e}")
|
||
await bot.send_text(from_wxid, "打卡失败,数据库出错了 >_<")
|
||
|
||
async def _handle_calendar(self, bot: WechatHookClient, from_wxid: str, user_id: str, nickname: str):
|
||
"""处理查看日历"""
|
||
logger.info(f"用户 {nickname} ({user_id}) 查询日历")
|
||
await self._generate_and_send_calendar(bot, from_wxid, user_id, nickname)
|
||
|
||
async def _handle_retro_checkin(self, bot: WechatHookClient, from_wxid: str, user_id: str, nickname: str, content: str):
|
||
"""处理补签"""
|
||
match = re.match(r'^🦌补签\s+(\d{1,2})\s+(\d+)\s*$', content)
|
||
if not match:
|
||
await bot.send_text(from_wxid, "命令格式不正确,请使用:🦌补签 日期 次数")
|
||
return
|
||
|
||
try:
|
||
day_to_checkin = int(match.group(1))
|
||
deer_count = int(match.group(2))
|
||
|
||
if deer_count <= 0:
|
||
await bot.send_text(from_wxid, "补签次数必须是大于0的整数哦!")
|
||
return
|
||
|
||
# 验证日期
|
||
today = date.today()
|
||
days_in_month = calendar.monthrange(today.year, today.month)[1]
|
||
|
||
if not (1 <= day_to_checkin <= days_in_month):
|
||
await bot.send_text(from_wxid, f"日期无效!本月只有 {days_in_month} 天")
|
||
return
|
||
|
||
if day_to_checkin > today.day:
|
||
await bot.send_text(from_wxid, "抱歉,不能对未来进行补签哦!")
|
||
return
|
||
|
||
# 执行补签
|
||
target_date = date(today.year, today.month, day_to_checkin)
|
||
target_date_str = target_date.strftime("%Y-%m-%d")
|
||
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
INSERT INTO checkin (user_id, checkin_date, deer_count)
|
||
VALUES (?, ?, ?)
|
||
ON CONFLICT(user_id, checkin_date)
|
||
DO UPDATE SET deer_count = deer_count + excluded.deer_count
|
||
''', (user_id, target_date_str, deer_count))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
await bot.send_text(from_wxid, f"补签成功!已为 {today.month}月{day_to_checkin}日 增加了 {deer_count} 个鹿")
|
||
await self._generate_and_send_calendar(bot, from_wxid, user_id, nickname)
|
||
|
||
except Exception as e:
|
||
logger.error(f"补签失败: {e}")
|
||
await bot.send_text(from_wxid, "补签失败,数据库出错了 >_<")
|
||
|
||
async def _handle_help(self, bot: WechatHookClient, from_wxid: str):
|
||
"""处理帮助命令"""
|
||
help_text = """--- 🦌打卡帮助菜单 ---
|
||
|
||
1️⃣ **🦌打卡**
|
||
▸ 命令: 直接发送 🦌 (可发送多个)
|
||
▸ 作用: 记录今天🦌的数量
|
||
▸ 示例: 🦌🦌🦌
|
||
|
||
2️⃣ **查看记录**
|
||
▸ 命令: 🦌日历
|
||
▸ 作用: 查看本月的打卡日历
|
||
|
||
3️⃣ **补签**
|
||
▸ 命令: 🦌补签 [日期] [次数]
|
||
▸ 作用: 为本月指定日期补上打卡记录
|
||
▸ 示例: 🦌补签 1 5
|
||
|
||
4️⃣ **显示此帮助**
|
||
▸ 命令: 🦌帮助
|
||
|
||
祝您一🦌顺畅!"""
|
||
|
||
await bot.send_text(from_wxid, help_text)
|
||
|
||
async def _generate_and_send_calendar(self, bot: WechatHookClient, from_wxid: str, user_id: str, nickname: str):
|
||
"""生成并发送日历"""
|
||
try:
|
||
current_year = date.today().year
|
||
current_month = date.today().month
|
||
current_month_str = date.today().strftime("%Y-%m")
|
||
|
||
# 查询打卡记录
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute(
|
||
"SELECT checkin_date, deer_count FROM checkin WHERE user_id = ? AND strftime('%Y-%m', checkin_date) = ?",
|
||
(user_id, current_month_str)
|
||
)
|
||
|
||
rows = cursor.fetchall()
|
||
conn.close()
|
||
|
||
if not rows:
|
||
await bot.send_text(from_wxid, "您本月还没有打卡记录哦,发送🦌开始第一次打卡吧!")
|
||
return
|
||
|
||
# 处理数据
|
||
checkin_records = {}
|
||
total_deer = 0
|
||
for row in rows:
|
||
day = int(row[0].split('-')[2])
|
||
count = row[1]
|
||
checkin_records[day] = count
|
||
total_deer += count
|
||
|
||
# 生成图片
|
||
image_path = await self._create_calendar_image(
|
||
user_id, nickname, current_year, current_month, checkin_records, total_deer
|
||
)
|
||
|
||
if image_path:
|
||
# 发送图片
|
||
data = {"to_wxid": from_wxid, "file": str(image_path)}
|
||
await bot._send_data_async(11040, data)
|
||
|
||
# 不删除临时文件
|
||
else:
|
||
# 发送文本版本
|
||
total_days = len(checkin_records)
|
||
await bot.send_text(from_wxid, f"本月您已打卡{total_days}天,累计{total_deer}个🦌")
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成日历失败: {e}")
|
||
await bot.send_text(from_wxid, "生成日历时发生错误 >_<")
|
||
|
||
async def _create_calendar_image(self, user_id: str, nickname: str, year: int, month: int, checkin_data: Dict, total_deer: int) -> Optional[str]:
|
||
"""创建日历图片"""
|
||
try:
|
||
WIDTH, HEIGHT = 700, 620
|
||
BG_COLOR = (255, 255, 255)
|
||
HEADER_COLOR = (50, 50, 50)
|
||
WEEKDAY_COLOR = (100, 100, 100)
|
||
DAY_COLOR = (80, 80, 80)
|
||
TODAY_BG_COLOR = (240, 240, 255)
|
||
CHECKIN_MARK_COLOR = (0, 150, 50)
|
||
DEER_COUNT_COLOR = (139, 69, 19)
|
||
|
||
# 尝试加载字体
|
||
try:
|
||
if self.font_path.exists():
|
||
font_header = ImageFont.truetype(str(self.font_path), 32)
|
||
font_weekday = ImageFont.truetype(str(self.font_path), 18)
|
||
font_day = ImageFont.truetype(str(self.font_path), 20)
|
||
font_check_mark = ImageFont.truetype(str(self.font_path), 28)
|
||
font_deer_count = ImageFont.truetype(str(self.font_path), 16)
|
||
font_summary = ImageFont.truetype(str(self.font_path), 18)
|
||
else:
|
||
raise FileNotFoundError("字体文件不存在")
|
||
except:
|
||
# 使用默认字体
|
||
font_header = ImageFont.load_default()
|
||
font_weekday = ImageFont.load_default()
|
||
font_day = ImageFont.load_default()
|
||
font_check_mark = ImageFont.load_default()
|
||
font_deer_count = ImageFont.load_default()
|
||
font_summary = ImageFont.load_default()
|
||
|
||
img = Image.new('RGB', (WIDTH, HEIGHT), BG_COLOR)
|
||
draw = ImageDraw.Draw(img)
|
||
|
||
# 绘制标题
|
||
header_text = f"{year}年{month}月 - {nickname}的鹿日历"
|
||
bbox = draw.textbbox((0, 0), header_text, font=font_header)
|
||
text_width = bbox[2] - bbox[0]
|
||
draw.text(((WIDTH - text_width) // 2, 20), header_text, font=font_header, fill=HEADER_COLOR)
|
||
|
||
# 绘制星期标题
|
||
weekdays = ["一", "二", "三", "四", "五", "六", "日"]
|
||
cell_width = WIDTH / 7
|
||
for i, day in enumerate(weekdays):
|
||
bbox = draw.textbbox((0, 0), day, font=font_weekday)
|
||
text_width = bbox[2] - bbox[0]
|
||
x = i * cell_width + (cell_width - text_width) // 2
|
||
draw.text((x, 90), day, font=font_weekday, fill=WEEKDAY_COLOR)
|
||
|
||
# 绘制日历
|
||
cal = calendar.monthcalendar(year, month)
|
||
y_offset = 120
|
||
cell_height = 75
|
||
today_num = date.today().day if date.today().year == year and date.today().month == month else 0
|
||
|
||
for week in cal:
|
||
for i, day_num in enumerate(week):
|
||
if day_num == 0:
|
||
continue
|
||
x_pos = i * cell_width
|
||
|
||
# 今天的背景
|
||
if day_num == today_num:
|
||
draw.rectangle(
|
||
[x_pos, y_offset, x_pos + cell_width, y_offset + cell_height],
|
||
fill=TODAY_BG_COLOR
|
||
)
|
||
|
||
# 绘制日期
|
||
day_text = str(day_num)
|
||
bbox = draw.textbbox((0, 0), day_text, font=font_day)
|
||
text_width = bbox[2] - bbox[0]
|
||
draw.text((x_pos + cell_width - text_width - 10, y_offset + 5), day_text, font=font_day, fill=DAY_COLOR)
|
||
|
||
# 绘制打卡标记
|
||
if day_num in checkin_data:
|
||
# 绘制 √
|
||
check_text = "√"
|
||
bbox = draw.textbbox((0, 0), check_text, font=font_check_mark)
|
||
text_width = bbox[2] - bbox[0]
|
||
text_height = bbox[3] - bbox[1]
|
||
check_x = x_pos + (cell_width - text_width) // 2
|
||
check_y = y_offset + cell_height // 2 - text_height - 5
|
||
draw.text((check_x, check_y), check_text, font=font_check_mark, fill=CHECKIN_MARK_COLOR)
|
||
|
||
# 绘制鹿数量
|
||
deer_text = f"鹿了 {checkin_data[day_num]} 次"
|
||
bbox = draw.textbbox((0, 0), deer_text, font=font_deer_count)
|
||
text_width = bbox[2] - bbox[0]
|
||
deer_x = x_pos + (cell_width - text_width) // 2
|
||
deer_y = y_offset + cell_height // 2 + 15
|
||
draw.text((deer_x, deer_y), deer_text, font=font_deer_count, fill=DEER_COUNT_COLOR)
|
||
|
||
y_offset += cell_height
|
||
|
||
# 绘制总结
|
||
total_days = len(checkin_data)
|
||
summary_text = f"本月总结:累计鹿了 {total_days} 天,共鹿 {total_deer} 次"
|
||
bbox = draw.textbbox((0, 0), summary_text, font=font_summary)
|
||
text_width = bbox[2] - bbox[0]
|
||
draw.text(((WIDTH - text_width) // 2, HEIGHT - 30), summary_text, font=font_summary, fill=HEADER_COLOR)
|
||
|
||
# 保存图片
|
||
file_path = self.temp_dir / f"checkin_{user_id}_{int(time.time())}.png"
|
||
img.save(file_path, format='PNG')
|
||
return file_path
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建日历图片失败: {e}")
|
||
return None
|
||
|
||
def get_llm_tools(self) -> List[dict]:
|
||
"""返回LLM工具定义,供AIChat插件调用"""
|
||
return [
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "deer_checkin",
|
||
"description": "鹿打卡,记录今天的鹿数量",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"count": {
|
||
"type": "integer",
|
||
"description": "鹿的数量,默认为1",
|
||
"default": 1
|
||
}
|
||
},
|
||
"required": []
|
||
}
|
||
}
|
||
},
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "view_calendar",
|
||
"description": "查看本月的鹿打卡日历",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {},
|
||
"required": []
|
||
}
|
||
}
|
||
},
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "makeup_checkin",
|
||
"description": "补签指定日期的鹿打卡记录",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"day": {
|
||
"type": "integer",
|
||
"description": "要补签的日期(1-31)"
|
||
},
|
||
"count": {
|
||
"type": "integer",
|
||
"description": "补签的鹿数量",
|
||
"default": 1
|
||
}
|
||
},
|
||
"required": ["day"]
|
||
}
|
||
}
|
||
}
|
||
]
|
||
|
||
async def execute_llm_tool(self, tool_name: str, arguments: dict, bot, from_wxid: str) -> dict:
|
||
"""执行LLM工具调用,供AIChat插件调用"""
|
||
try:
|
||
if not self._initialized or not self.config:
|
||
return {"success": False, "message": "鹿打卡插件未初始化"}
|
||
|
||
is_group = from_wxid.endswith("@chatroom")
|
||
|
||
# 构造消息对象
|
||
message = {
|
||
"FromWxid": from_wxid,
|
||
"SenderWxid": arguments.get("user_wxid", from_wxid) if is_group else from_wxid,
|
||
"IsGroup": is_group,
|
||
"Content": "",
|
||
"SenderNickname": arguments.get("nickname", "")
|
||
}
|
||
|
||
user_id = message["SenderWxid"] if is_group else from_wxid
|
||
nickname = message.get("SenderNickname", "") or "用户"
|
||
|
||
if tool_name == "deer_checkin":
|
||
# 鹿打卡
|
||
count = arguments.get("count", 1)
|
||
content = "🦌" * count
|
||
await self._handle_checkin(bot, from_wxid, user_id, nickname, content)
|
||
return {"success": True, "message": f"鹿打卡已处理,记录了{count}个🦌"}
|
||
|
||
elif tool_name == "view_calendar":
|
||
# 查看日历
|
||
await self._handle_calendar(bot, from_wxid, user_id, nickname)
|
||
return {"success": True, "message": "日历查询已处理"}
|
||
|
||
elif tool_name == "makeup_checkin":
|
||
# 补签
|
||
day = arguments.get("day")
|
||
count = arguments.get("count", 1)
|
||
if not day:
|
||
return {"success": False, "message": "缺少日期参数"}
|
||
|
||
content = f"🦌补签 {day} {count}"
|
||
await self._handle_retro_checkin(bot, from_wxid, user_id, nickname, content)
|
||
return {"success": True, "message": f"补签请求已处理:{day}日 {count}个🦌"}
|
||
|
||
else:
|
||
return None # 不是本插件的工具,返回None让其他插件处理
|
||
|
||
except Exception as e:
|
||
logger.error(f"LLM工具执行失败: {e}")
|
||
return {"success": False, "message": f"执行失败: {str(e)}"} |