使用异步任务注解
This commit is contained in:
79
async_job.py
Normal file
79
async_job.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
import asyncio
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import Callable, Awaitable, List, Dict
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncJob:
|
||||||
|
def __init__(self):
|
||||||
|
self.tasks: List[Callable[[], Awaitable]] = []
|
||||||
|
|
||||||
|
def every_seconds(self, seconds: int):
|
||||||
|
def decorator(func: Callable):
|
||||||
|
async def wrapper():
|
||||||
|
while True:
|
||||||
|
await func()
|
||||||
|
await asyncio.sleep(seconds)
|
||||||
|
|
||||||
|
self.tasks.append(wrapper)
|
||||||
|
return func
|
||||||
|
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
def every_minutes(self, minutes: int):
|
||||||
|
return self.every_seconds(minutes * 60)
|
||||||
|
|
||||||
|
def every_hours(self, hours: int):
|
||||||
|
return self.every_seconds(hours * 3600)
|
||||||
|
|
||||||
|
def at_times(self, time_list: List[str]):
|
||||||
|
def decorator(func: Callable):
|
||||||
|
async def wrapper():
|
||||||
|
while True:
|
||||||
|
now = datetime.now()
|
||||||
|
for t in time_list:
|
||||||
|
target = datetime.strptime(t, "%H:%M").replace(year=now.year, month=now.month, day=now.day)
|
||||||
|
if target < now:
|
||||||
|
target += timedelta(days=1)
|
||||||
|
wait_seconds = (target - now).total_seconds()
|
||||||
|
await asyncio.sleep(wait_seconds)
|
||||||
|
await func()
|
||||||
|
|
||||||
|
self.tasks.append(wrapper)
|
||||||
|
return func
|
||||||
|
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
def every_weekday_time(self, weekday: int, time_str: str):
|
||||||
|
"""
|
||||||
|
每周 weekday(0=周一) 的 time_str(如10:00)时间执行
|
||||||
|
"""
|
||||||
|
|
||||||
|
def decorator(func: Callable):
|
||||||
|
async def wrapper():
|
||||||
|
while True:
|
||||||
|
now = datetime.now()
|
||||||
|
target_time = datetime.strptime(time_str, "%H:%M").time()
|
||||||
|
|
||||||
|
# 构造下一个执行时间
|
||||||
|
days_ahead = (weekday - now.weekday() + 7) % 7
|
||||||
|
target_date = now.date() + timedelta(days=days_ahead)
|
||||||
|
target_dt = datetime.combine(target_date, target_time)
|
||||||
|
|
||||||
|
if target_dt <= now:
|
||||||
|
target_dt += timedelta(days=7)
|
||||||
|
|
||||||
|
sleep_secs = (target_dt - now).total_seconds()
|
||||||
|
await asyncio.sleep(sleep_secs)
|
||||||
|
await func()
|
||||||
|
|
||||||
|
self.tasks.append(wrapper)
|
||||||
|
return func
|
||||||
|
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
async def run_all(self):
|
||||||
|
await asyncio.gather(*(task() for task in self.tasks))
|
||||||
|
|
||||||
|
|
||||||
|
# 全局唯一 job 管理器
|
||||||
|
async_job = AsyncJob()
|
||||||
54
main.py
54
main.py
@@ -1,6 +1,9 @@
|
|||||||
#! /usr/bin/env python3
|
#! /usr/bin/env python3
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
import asyncio
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
from async_job import async_job
|
||||||
from configuration import Config
|
from configuration import Config
|
||||||
from robot import Robot
|
from robot import Robot
|
||||||
|
|
||||||
@@ -46,29 +49,8 @@ def main():
|
|||||||
robot.LOG.info("wechat_ipad客户端启动成功")
|
robot.LOG.info("wechat_ipad客户端启动成功")
|
||||||
else:
|
else:
|
||||||
robot.LOG.error("wechat_ipad客户端启动失败")
|
robot.LOG.error("wechat_ipad客户端启动失败")
|
||||||
# # 每天 8:30 发送新闻
|
# 注册定时任务
|
||||||
robot.onEveryTime("08:30", robot.news_baidu_report_auto)
|
# jobs(robot)
|
||||||
#
|
|
||||||
# # epic
|
|
||||||
robot.onEveryTime("10:30", robot.send_epic_free_games)
|
|
||||||
#
|
|
||||||
# # message report 1:数据自动从redis 转到sqllite
|
|
||||||
robot.onEveryTime("02:30", robot.message_count_to_db)
|
|
||||||
# # 从db中提取并发送给相关群
|
|
||||||
robot.onEveryTime("09:30", robot.generate_and_send_ranking)
|
|
||||||
#
|
|
||||||
# # sehuatang
|
|
||||||
robot.onEveryTime("15:30", robot.generate_sehuatang_pdf)
|
|
||||||
#
|
|
||||||
# # 秀人网每天自动下载帖子
|
|
||||||
robot.onEveryTime("01:30", robot.xiu_ren_download_task)
|
|
||||||
#
|
|
||||||
# # 秀人网每天自动发pdf
|
|
||||||
# robot.onEveryTime("17:30", robot.xiu_ren_pdf_send)
|
|
||||||
|
|
||||||
# 每天进行二次登录检查
|
|
||||||
robot.onEveryHours(3, robot.login_twice_auto_auth)
|
|
||||||
|
|
||||||
# 启动Dashboard服务器
|
# 启动Dashboard服务器
|
||||||
dashboard_server = None
|
dashboard_server = None
|
||||||
try:
|
try:
|
||||||
@@ -83,9 +65,35 @@ def main():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
robot.LOG.error(f"Dashboard服务器启动失败: {e}")
|
robot.LOG.error(f"Dashboard服务器启动失败: {e}")
|
||||||
|
|
||||||
|
asyncio.run(async_job.run_all())
|
||||||
# 让机器人一直跑
|
# 让机器人一直跑
|
||||||
robot.keep_running_and_block_process()
|
robot.keep_running_and_block_process()
|
||||||
|
|
||||||
|
|
||||||
|
# def jobs(robot: Robot):
|
||||||
|
# # # 每天 8:30 发送新闻
|
||||||
|
# robot.onEveryTime("08:30", robot.news_baidu_report_auto)
|
||||||
|
# #
|
||||||
|
# # # epic
|
||||||
|
# robot.onEveryTime("10:30", robot.send_epic_free_games)
|
||||||
|
# #
|
||||||
|
# # # message report 1:数据自动从redis 转到sqllite
|
||||||
|
# robot.onEveryTime("02:30", robot.message_count_to_db)
|
||||||
|
# # # 从db中提取并发送给相关群
|
||||||
|
# robot.onEveryTime("09:30", robot.generate_and_send_ranking)
|
||||||
|
# #
|
||||||
|
# # # sehuatang
|
||||||
|
# robot.onEveryTime("15:30", robot.generate_sehuatang_pdf)
|
||||||
|
# #
|
||||||
|
# # # 秀人网每天自动下载帖子
|
||||||
|
# robot.onEveryTime("01:30", robot.xiu_ren_download_task)
|
||||||
|
# #
|
||||||
|
# # # 秀人网每天自动发pdf
|
||||||
|
# # robot.onEveryTime("17:30", robot.xiu_ren_pdf_send)
|
||||||
|
#
|
||||||
|
# # 每天进行二次登录检查
|
||||||
|
# robot.onEveryHours(3, robot.login_twice_auto_auth)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
53
robot.py
53
robot.py
@@ -8,10 +8,10 @@ import toml
|
|||||||
import wechat_ipad
|
import wechat_ipad
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
|
from async_job import async_job
|
||||||
from base.func_epic import is_friday, get_free
|
from base.func_epic import is_friday, get_free
|
||||||
from base.func_news import News
|
from base.func_news import News
|
||||||
from configuration import Config
|
from configuration import Config
|
||||||
from job_mgmt import Job
|
|
||||||
from plugin_common.event_system import EventType, EventSystem
|
from plugin_common.event_system import EventType, EventSystem
|
||||||
from plugin_common.message_plugin_interface import MessagePluginInterface
|
from plugin_common.message_plugin_interface import MessagePluginInterface
|
||||||
from plugin_common.plugin_interface import PluginStatus
|
from plugin_common.plugin_interface import PluginStatus
|
||||||
@@ -30,7 +30,7 @@ from wechat_ipad.models.message import WxMessage, MessageType
|
|||||||
from xiuren.meitu_dl import meitu_dowload_pub_pic
|
from xiuren.meitu_dl import meitu_dowload_pub_pic
|
||||||
|
|
||||||
|
|
||||||
class Robot(Job):
|
class Robot:
|
||||||
"""个性化自己的机器人
|
"""个性化自己的机器人
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -448,7 +448,7 @@ class Robot(Job):
|
|||||||
保持机器人运行,不让进程退出
|
保持机器人运行,不让进程退出
|
||||||
"""
|
"""
|
||||||
while True:
|
while True:
|
||||||
self.runPendingJobs()
|
# self.runPendingJobs()
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# 添加一个方法用于刷新联系人信息
|
# 添加一个方法用于刷新联系人信息
|
||||||
@@ -632,70 +632,71 @@ class Robot(Job):
|
|||||||
|
|
||||||
# ============================================== 业务内容==========================================================
|
# ============================================== 业务内容==========================================================
|
||||||
|
|
||||||
def news_baidu_report_auto(self) -> None:
|
@async_job.at_times(["08:30"])
|
||||||
|
async def news_baidu_report_auto(self) -> None:
|
||||||
try:
|
try:
|
||||||
news = News().get_baidu_news()
|
news = News().get_baidu_news()
|
||||||
self.send_group_txt_message(news, Feature.DAILY_NEWS)
|
await self.send_group_txt_message(news, Feature.DAILY_NEWS)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.LOG.error(f"newsBaiduReportAuto error:{e}")
|
self.LOG.error(f"newsBaiduReportAuto error:{e}")
|
||||||
|
|
||||||
def news_en_report(self, website, sender: str = None) -> None:
|
async def news_en_report(self, website, sender: str = None) -> None:
|
||||||
try:
|
try:
|
||||||
news = News().get_eng_news(website)
|
news = News().get_eng_news(website)
|
||||||
self.ipad_bot.send_text_message(sender, news)
|
await self.ipad_bot.send_text_message(sender, news)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.LOG.error(f"newsEnReport error:{e}")
|
self.LOG.error(f"newsEnReport error:{e}")
|
||||||
|
|
||||||
# 使用装饰器标记定时任务 星期五 10:30 执行
|
# 使用装饰器标记定时任务 星期五 10:30 执行
|
||||||
|
|
||||||
def send_epic_free_games(self):
|
@async_job.every_weekday_time(weekday=4, time_str="10:00") # 0=周一,4=周五
|
||||||
|
async def send_epic_free_games(self):
|
||||||
try:
|
try:
|
||||||
if is_friday():
|
if is_friday():
|
||||||
games = get_free()
|
games = get_free()
|
||||||
asyncio.run(self.send_group_txt_message(games, Feature.EPIC))
|
await self.send_group_txt_message(games, Feature.EPIC)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.LOG.error(f"sendEpicFreeGames error:{e}")
|
self.LOG.error(f"sendEpicFreeGames error:{e}")
|
||||||
|
|
||||||
# 使用装饰器标记定时任务
|
# 使用装饰器标记定时任务
|
||||||
|
@async_job.at_times(["02:30"])
|
||||||
def message_count_to_db(self):
|
async def message_count_to_db(self):
|
||||||
try:
|
try:
|
||||||
self.message_storage.write_to_db()
|
self.message_storage.write_to_db()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.LOG.error(f"write_to_db error:{e}")
|
self.LOG.error(f"write_to_db error:{e}")
|
||||||
|
|
||||||
def generate_sehuatang_pdf(self):
|
@async_job.at_times(["15:30"])
|
||||||
|
async def generate_sehuatang_pdf(self):
|
||||||
try:
|
try:
|
||||||
self.LOG.info("开始生成PDF,generate_sehuatang_pdf")
|
self.LOG.info("开始生成PDF,generate_sehuatang_pdf")
|
||||||
path = pdf_file_path()
|
path = pdf_file_path()
|
||||||
# 暂时只发4K群
|
# 暂时只发4K群
|
||||||
asyncio.run(self.send_group_file_message(path, Feature.PDF_CAPABILITY))
|
await self.send_group_file_message(path, Feature.PDF_CAPABILITY)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.LOG.error(f"generateSehuatangPdf error:{e}")
|
self.LOG.error(f"generateSehuatangPdf error:{e}")
|
||||||
|
|
||||||
def xiu_ren_download_task(self):
|
@async_job.at_times(["01:30"])
|
||||||
|
async def xiu_ren_download_task(self):
|
||||||
try:
|
try:
|
||||||
# 每天下载10组图,然后发一个帖子PDF
|
# 每天下载10组图,然后发一个帖子PDF
|
||||||
meitu_dowload_pub_pic()
|
meitu_dowload_pub_pic()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.LOG.error(f"xiu_ren_download_task error:{e}")
|
self.LOG.error(f"xiu_ren_download_task error:{e}")
|
||||||
|
|
||||||
def generate_and_send_ranking(self):
|
@async_job.at_times(["09:30"])
|
||||||
|
async def generate_and_send_ranking(self):
|
||||||
try:
|
try:
|
||||||
receivers = self.gbm.get_group_list()
|
receivers = self.gbm.get_group_list()
|
||||||
if not receivers:
|
if not receivers:
|
||||||
return
|
return
|
||||||
|
for r in receivers:
|
||||||
async def send_all():
|
if self.gbm.get_group_permission(r, Feature.DAILY_SUMMARY) == PermissionStatus.ENABLED:
|
||||||
tasks = []
|
output = self.message_storage.generate_and_send_ranking(r, self.allContacts)
|
||||||
for r in receivers:
|
await self.ipad_bot.send_text_message(r, output)
|
||||||
if self.gbm.get_group_permission(r, Feature.DAILY_SUMMARY) == PermissionStatus.ENABLED:
|
|
||||||
output = self.message_storage.generate_and_send_ranking(r, self.allContacts)
|
|
||||||
tasks.append(self.ipad_bot.send_text_message(r, output))
|
|
||||||
await asyncio.gather(*tasks)
|
|
||||||
|
|
||||||
asyncio.run(send_all())
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.LOG.error(f"SendRanking error:{e}")
|
self.LOG.error(f"SendRanking error:{e}")
|
||||||
|
|
||||||
|
@async_job.at_times(["10:23"])
|
||||||
|
async def job_test(self):
|
||||||
|
await self.ipad_bot.send_text_message("Jyunere","测试任务!")
|
||||||
Reference in New Issue
Block a user