新增点歌插件。

This commit is contained in:
liuwei
2025-03-18 15:45:07 +08:00
parent adfa3505a5
commit ede83d6c94
8 changed files with 417 additions and 37 deletions

63
job_decorators.py Normal file
View File

@@ -0,0 +1,63 @@
import functools
import inspect
import logging
from typing import Callable, Optional, Dict, List, Any
LOG = logging.getLogger("JobDecorator")
# 存储所有被装饰的任务
scheduled_tasks = []
def scheduled_job(cron: str, name: Optional[str] = None, enabled: bool = True):
"""
定时任务装饰器,用于标记需要定时执行的方法
:param cron: cron表达式例如 "0 0 * * * *" 表示每小时执行一次
:param name: 任务名称,默认使用方法名
:param enabled: 是否启用该任务默认为True
"""
def decorator(func):
task_name = name or func.__name__
# 记录任务信息
task_info = {
"func": func,
"cron": cron,
"name": task_name,
"enabled": enabled
}
scheduled_tasks.append(task_info)
LOG.info(f"注册定时任务: {task_name}, cron: {cron}, enabled: {enabled}")
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return decorator
def register_scheduled_jobs(job_instance):
"""
注册所有被装饰的定时任务到Job实例
:param job_instance: Job类实例必须有add_job方法
:return: 注册的任务数量
"""
count = 0
# 获取实例的所有方法
for name, method in inspect.getmembers(job_instance, predicate=inspect.ismethod):
# 检查原始函数是否在scheduled_tasks中
original_func = method.__func__
for task in scheduled_tasks:
if task["func"].__name__ == original_func.__name__ and task["enabled"]:
# 注册任务
job_instance.add_job(method, task["cron"], task["name"])
count += 1
break
LOG.info(f"已注册 {count} 个定时任务")
return count

25
main.py
View File

@@ -2,7 +2,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import signal import signal
import sys # 添加sys模块导入
from argparse import ArgumentParser from argparse import ArgumentParser
from configuration import Config from configuration import Config
@@ -30,35 +29,11 @@ def main(chat_type: int):
robot.send_text_msg("启动成功!", "filehelper") robot.send_text_msg("启动成功!", "filehelper")
# 接收消息 # 接收消息
# robot.enableRecvMsg() # 可能会丢消息?
robot.enableReceivingMsg() # 加队列 robot.enableReceivingMsg() # 加队列
# 加载插件系统 # 加载插件系统
robot.plugin_manager.load_all_plugins() robot.plugin_manager.load_all_plugins()
# 每天 8:30 发送新闻
robot.onEveryTime("08:30", robot.news_baidu_report_auto)
# epic
robot.onEveryTime("10:30", robot.send_epic_free_games)
# message report 1:数据自动从redis 转到sqllite
robot.onEveryTime("00:30", robot.message_count_to_db)
# 从db中提取并发送给相关群
robot.onEveryTime("09:30", robot.generate_and_send_ranking)
# sehuatang
robot.onEveryTime("15:00", robot.generate_sehuatang_pdf)
# 游戏的定时任务每小时执行
robot.onEveryTime("18:00", robot.game_auto_tasks)
# 秀人网每天自动下载帖子
robot.onEveryTime("01:30", robot.xiu_ren_download_task)
# 秀人网每天自动发pdf
robot.onEveryTime("17:30", robot.xiu_ren_pdf_send)
# 让机器人一直跑 # 让机器人一直跑
robot.keep_running_and_block_process() robot.keep_running_and_block_process()

View File

@@ -3,7 +3,7 @@ import toml
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import logging import logging
from enum import Enum from enum import Enum
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional, Tuple
class PluginStatus(Enum): class PluginStatus(Enum):
@@ -168,3 +168,33 @@ class PluginInterface(ABC):
清理是否成功 清理是否成功
""" """
return True return True
# ... 其他现有方法 ...
def can_process(self, data: Any) -> bool:
"""检查插件是否可以处理给定的数据
这是一个通用方法,用于检查插件是否可以处理特定类型的数据。
子类可以根据需要重写此方法。
Args:
data: 要检查的数据
Returns:
如果插件可以处理该数据则返回True否则返回False
"""
return False
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
处理消息
Args:
message: 消息字典,包含消息的各种属性,以及发送消息所需的对象
- wcf: WcfAPI对象可用于发送消息
- message_util: 消息工具类,提供更高级的消息处理功能
Returns:
(是否已处理, 处理结果)
"""
raise NotImplementedError("子类必须实现此方法")

View File

@@ -41,13 +41,10 @@ class MessageSummaryPlugin(MessagePluginInterface):
def initialize(self, context: Dict[str, Any]) -> bool: def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件""" """初始化插件"""
try: try:
# 从插件配置中获取API密钥和URL # 从插件配置中获取API密钥和URL
api_config = self._config.get("api", {}) api_config = self._config.get("api", {})
self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo") self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo")
self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages") self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages")
self.all_contacts = context["all_contacts"]
self.message_storage = MessageStorage() self.message_storage = MessageStorage()
self.LOG.info(f"初始化 {self.name} 插件成功") self.LOG.info(f"初始化 {self.name} 插件成功")
@@ -96,11 +93,13 @@ class MessageSummaryPlugin(MessagePluginInterface):
if gbm and gbm.get_group_permission(group_id, Feature.AI_CAPABILITY) == PermissionStatus.DISABLED: if gbm and gbm.get_group_permission(group_id, Feature.AI_CAPABILITY) == PermissionStatus.DISABLED:
return True, None return True, None
# 从消息历史中获取群聊记录 # 从消息历史中获取群聊记录
chat_content = self.message_storage.get_messages(group_id, self.all_contacts) all_contacts: dict = message.get("all_contacts")
chat_content = self.message_storage.get_messages(group_id, all_contacts)
if len(chat_content) < 100: if len(chat_content) < 100:
return False, None return False, None
# 生成总结 # 生成总结
summary, image_path = self._generate_summary(chat_content, self.all_contacts.get(group_id, group_id)) summary, image_path = self._generate_summary(chat_content, all_contacts.get(group_id, group_id))
# 发送总结结果 # 发送总结结果
wcf = message.get("wcf") wcf = message.get("wcf")

View File

@@ -0,0 +1 @@
from .main import MusicPlugin

View File

@@ -0,0 +1,8 @@
[Music]
enable = true
command = ["点歌", "音乐", "音乐点播", "点播音乐", "音乐点歌"]
command-format = """
-----Bot-----
🎵点歌指令:
点歌 歌曲名
"""

290
plugins/music/main.py Normal file
View File

@@ -0,0 +1,290 @@
import logging
import requests
import lz4.block as lb
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
class MusicPlugin(MessagePluginInterface):
"""音乐点播插件"""
@property
def name(self) -> str:
return "音乐点播"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供音乐点播功能支持QQ音乐和网易云音乐"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
# 加载配置
self.load_config()
self._commands = self._config.get("Music", {}).get("command", ["点歌", "音乐"])
self.command_format = self._config.get("Music", {}).get("command-format", "点歌 歌曲名")
self.enable = self._config.get("Music", {}).get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 检查命令格式
if len(content.split(" ")) == 1:
wcf.send_text(f"-----Bot-----\n❌命令格式错误!\n{self.command_format}",
(roomid if roomid else sender), sender)
return True, "命令格式错误"
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.MUSIC) == PermissionStatus.DISABLED:
return False, "没有权限"
# 提取歌曲名
user_song_name = content[len(command):].strip()
try:
# 搜索歌曲
song_info = self._search_song(user_song_name)
if not song_info or not song_info.get("play_url"):
wcf.send_text(f"-----Bot-----\n❌未找到歌曲:{user_song_name}",
(roomid if roomid else sender), sender)
return True, "未找到歌曲"
# 发送音乐
self._send_music_message(wcf, song_info, roomid or sender)
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理音乐请求出错: {e}")
wcf.send_text(f"-----Bot-----\n❌请求出错:{e}",
(roomid if roomid else sender), sender)
return True, f"处理出错: {e}"
def _search_song(self, song_name: str) -> Dict[str, Any]:
"""搜索歌曲信息"""
try:
# 尝试QQ音乐API
short_play_api = f"https://qqmusic.qqovo.cn/getSearchByKey?key={song_name}&page=1&limit=1"
fallback_api = f"https://www.hhlqilongzhu.cn/api/dg_wyymusic.php?gm={song_name}&n=1&num=1&type=json"
response = requests.get(short_play_api)
if response.status_code == 400:
response = requests.get(fallback_api)
if response.status_code != 200:
self.LOG.error(f"API 请求失败,状态码: {response.status_code}")
return {}
json_data = response.json()
result = json_data.get('response', {}).get('data', {}).get('song', {}).get('list', [])
if not result:
# 尝试备用API
response = requests.get(fallback_api)
if response.status_code == 200:
music_response = response.json()
return {
"song_name": music_response.get('title', ''),
"singer_name": music_response.get('singer', ''),
"play_url": music_response.get('music_url', ''),
"singer_pic": music_response.get('cover', ''),
"data_url": music_response.get('link', '')
}
return {}
# 解析QQ音乐结果
first_song = result[0]
song_name = first_song.get('songname', '')
song_mid = first_song.get('songmid', '')
first_singer_name = first_song.get('singer', [{}])[0].get('name', '')
zhida_singer = json_data.get('response', {}).get('data', {}).get('zhida', {}).get('zhida_singer', {})
singer_pic = zhida_singer.get('singerPic', '') if zhida_singer else None
# 获取播放链接
music_play_api = f"https://qqmusic.qqovo.cn/getMusicPlay?songmid={song_mid}&quality=m4a"
music_response = requests.get(music_play_api)
if music_response.status_code == 400:
# 尝试备用API
response = requests.get(fallback_api)
if response.status_code == 200:
music_response = response.json()
return {
"song_name": music_response.get('title', song_name),
"singer_name": music_response.get('singer', first_singer_name),
"play_url": music_response.get('music_url', ''),
"singer_pic": music_response.get('cover', singer_pic),
"data_url": music_response.get('link', '')
}
return {}
if music_response.status_code != 200:
self.LOG.error(f"获取播放链接失败,状态码: {music_response.status_code}")
return {}
music_data = music_response.json()
play_url = music_data.get('data', {}).get('playUrl', {}).get(song_mid, {}).get('url', '')
if not play_url:
# 尝试备用API
response = requests.get(fallback_api)
if response.status_code == 200:
music_response = response.json()
return {
"song_name": music_response.get('title', song_name),
"singer_name": music_response.get('singer', first_singer_name),
"play_url": music_response.get('music_url', ''),
"singer_pic": music_response.get('cover', singer_pic),
"data_url": music_response.get('link', '')
}
return {}
data_url = f"https://y.qq.com/n/ryqq/songDetail/{song_mid}"
return {
"song_name": song_name,
"singer_name": first_singer_name,
"play_url": play_url,
"singer_pic": singer_pic,
"data_url": data_url
}
except Exception as e:
self.LOG.error(f"搜索歌曲出错: {e}")
return {}
def _send_music_message(self, wcf, song_info: Dict[str, Any], receiver: str) -> bool:
"""发送音乐消息"""
try:
song_name = song_info.get("song_name", "")
singer_name = song_info.get("singer_name", "")
play_url = song_info.get("play_url", "")
singer_pic = song_info.get("singer_pic", "")
data_url = song_info.get("data_url", "")
xml_message = f"""
<?xml version="1.0"?>
<msg>
<appmsg appid="wx8dd6ecd81906fd84" sdkver="0">
<title>{song_name}</title>
<des>{singer_name}\n❤Bot-祝您天天开心❤</des>
<action>view</action>
<type>3</type>
<showtype>0</showtype>
<content />
<url>{data_url}</url>
<dataurl>{play_url}</dataurl>
<lowurl/>
<lowdataurl/>
<recorditem />
<thumburl />
<messageaction />
<laninfo />
<extinfo />
<sourceusername />
<sourcedisplayname />
<commenturl />
<appattach>
<totallen>0</totallen>
<attachid />
<emoticonmd5></emoticonmd5>
<fileext />
<aeskey></aeskey>
</appattach>
<webviewshared>
<publisherId />
<publisherReqId>0</publisherReqId>
</webviewshared>
<weappinfo>
<pagepath />
<username />
<appid />
<appservicetype>0</appservicetype>
</weappinfo>
<websearch />
<songalbumurl>{singer_pic}</songalbumurl>
</appmsg>
<scene>0</scene>
<appinfo>
<version>49</version>
<appname></appname>
</appinfo>
<commenturl />
</msg>"""
# 修改消息数据库里面的消息content内容
text_bytes = xml_message.encode('utf-8')
compressed_data = lb.compress(text_bytes, store_size=False).hex()
data = wcf.query_sql('MSG0.db', "SELECT * FROM MSG where type = 49 limit 1")
wcf.query_sql('MSG0.db',
f"""UPDATE MSG SET CompressContent = x'{compressed_data}', BytesExtra=x'', type=49, SubType=3,
IsSender=0, TalkerId=2 WHERE MsgSvrID={data[0]['MsgSvrID']}"""
)
result = wcf.forward_msg(data[0]["MsgSvrID"], receiver)
self.LOG.info(f"点歌发送结果: {result}")
return True
except Exception as e:
self.LOG.error(f"发送音乐消息出错: {e}")
return False

View File

@@ -66,6 +66,9 @@ from xiuren.xiuren_pdf import generate_pdf_from_images
from db.connection import DBConnectionManager from db.connection import DBConnectionManager
from message_util import MessageUtil from message_util import MessageUtil
# 在文件顶部导入装饰器
from job_decorators import scheduled_job, register_scheduled_jobs
class Robot(Job): class Robot(Job):
"""个性化自己的机器人 """个性化自己的机器人
@@ -75,6 +78,9 @@ class Robot(Job):
self.wcf = wcf self.wcf = wcf
self.config = config self.config = config
self.LOG = logging.getLogger("Robot") self.LOG = logging.getLogger("Robot")
# 在初始化结束时注册所有被装饰的定时任务
register_scheduled_jobs(self)
self.wxid = self.wcf.get_self_wxid() self.wxid = self.wcf.get_self_wxid()
self.allContacts = self.get_all_contacts() self.allContacts = self.get_all_contacts()
self.LOG.info(f"DB+REDIS 连接池开始初始化") self.LOG.info(f"DB+REDIS 连接池开始初始化")
@@ -111,7 +117,6 @@ class Robot(Job):
"plugin_registry": self.plugin_registry, "plugin_registry": self.plugin_registry,
"db_pool": self.db_pool, "db_pool": self.db_pool,
"redis_pool": self.redis_pool, "redis_pool": self.redis_pool,
"all_contacts": self.allContacts,
"message_util": self.message_util "message_util": self.message_util
} }
@@ -571,7 +576,8 @@ class Robot(Job):
"timestamp": time.time(), "timestamp": time.time(),
"wcf": self.wcf, # 提供wcf对象让插件可以直接发送消息 "wcf": self.wcf, # 提供wcf对象让插件可以直接发送消息
"message_util": self.message_util, # 提供消息工具类 "message_util": self.message_util, # 提供消息工具类
"gbm": self.gbm # 每次从程序变量中取,保证最新 "gbm": self.gbm, # 每次从程序变量中取,保证最新
"all_contacts": self.allContacts
} }
# 检查插件是否可以处理该消息 # 检查插件是否可以处理该消息
@@ -591,7 +597,7 @@ class Robot(Job):
return False return False
# ============================================== 业务内容========================================================== # ============================================== 业务内容==========================================================
@scheduled_job(cron="0 0 8 * * *", name="每日新闻推送")
def news_baidu_report_auto(self) -> None: def news_baidu_report_auto(self) -> None:
try: try:
news = News().get_baidu_news() news = News().get_baidu_news()
@@ -613,6 +619,8 @@ class Robot(Job):
except Exception as e: except Exception as e:
self.LOG.error(f"newsEnReport error{e}") self.LOG.error(f"newsEnReport error{e}")
# 使用装饰器标记定时任务 星期五 10:30 执行
@scheduled_job(cron="0 30 10 * * 5", name="Epic免费游戏推送")
def send_epic_free_games(self): def send_epic_free_games(self):
try: try:
if is_friday(): if is_friday():
@@ -621,12 +629,15 @@ class Robot(Job):
except Exception as e: except Exception as e:
self.LOG.error(f"sendEpicFreeGames error{e}") self.LOG.error(f"sendEpicFreeGames error{e}")
# 使用装饰器标记定时任务
@scheduled_job(cron="0 0 * * * *", name="消息统计入库")
def message_count_to_db(self): def message_count_to_db(self):
try: try:
self.message_storage.write_to_db() self.message_storage.write_to_db()
except Exception as e: except Exception as e:
self.LOG.error(f"write_to_db error{e}") self.LOG.error(f"write_to_db error{e}")
@scheduled_job(cron="0 0 15 * * *", name="发送色花堂")
def generate_sehuatang_pdf(self): def generate_sehuatang_pdf(self):
try: try:
path = pdf_file_path() path = pdf_file_path()
@@ -635,6 +646,7 @@ class Robot(Job):
except Exception as e: except Exception as e:
self.LOG.error(f"generateSehuatangPdf error{e}") self.LOG.error(f"generateSehuatangPdf error{e}")
@scheduled_job(cron="0 30 9 * * *", name="发送消息排行榜")
def generate_and_send_ranking(self): def generate_and_send_ranking(self):
try: try:
receivers = self.gbm.get_group_list() receivers = self.gbm.get_group_list()
@@ -661,6 +673,7 @@ class Robot(Job):
self.LOG.error(f"message_summary_robot error{e}") self.LOG.error(f"message_summary_robot error{e}")
# 设置定时任务 # 设置定时任务
@scheduled_job(cron="0 0 18 * * *", name="每天发一个游戏任务")
def game_auto_tasks(self): def game_auto_tasks(self):
try: try:
group_ids = get_group_ids() group_ids = get_group_ids()
@@ -675,6 +688,7 @@ class Robot(Job):
except Exception as e: except Exception as e:
self.LOG.error(f"message_summary_robot error{e}") self.LOG.error(f"message_summary_robot error{e}")
@scheduled_job(cron="0 30 1 * * *", name="每天下载10组图")
def xiu_ren_download_task(self): def xiu_ren_download_task(self):
try: try:
# 每天下载10组图然后发一个帖子PDF # 每天下载10组图然后发一个帖子PDF
@@ -682,9 +696,9 @@ class Robot(Job):
except Exception as e: except Exception as e:
self.LOG.error(f"xiu_ren_download_task error{e}") self.LOG.error(f"xiu_ren_download_task error{e}")
@scheduled_job(cron="0 30 17 * * *", name="发送秀人PDF")
def xiu_ren_pdf_send(self): def xiu_ren_pdf_send(self):
try: try:
pub_path = generate_pdf_from_images("xiuren") pub_path = generate_pdf_from_images("xiuren")
self.wcf.send_file(pub_path, "45317011307@chatroom") self.wcf.send_file(pub_path, "45317011307@chatroom")
except Exception as e: except Exception as e: