diff --git a/async_job.py b/async_job.py new file mode 100644 index 0000000..4ef679a --- /dev/null +++ b/async_job.py @@ -0,0 +1,79 @@ +import asyncio +from datetime import datetime, timedelta +from typing import Callable, Awaitable, List, Dict + + +class AsyncJob: + def __init__(self): + self.tasks: List[Callable[[], Awaitable]] = [] + + def every_seconds(self, seconds: int): + def decorator(func: Callable): + async def wrapper(): + while True: + await func() + await asyncio.sleep(seconds) + + self.tasks.append(wrapper) + return func + + return decorator + + def every_minutes(self, minutes: int): + return self.every_seconds(minutes * 60) + + def every_hours(self, hours: int): + return self.every_seconds(hours * 3600) + + def at_times(self, time_list: List[str]): + def decorator(func: Callable): + async def wrapper(): + while True: + now = datetime.now() + for t in time_list: + target = datetime.strptime(t, "%H:%M").replace(year=now.year, month=now.month, day=now.day) + if target < now: + target += timedelta(days=1) + wait_seconds = (target - now).total_seconds() + await asyncio.sleep(wait_seconds) + await func() + + self.tasks.append(wrapper) + return func + + return decorator + + def every_weekday_time(self, weekday: int, time_str: str): + """ + 每周 weekday(0=周一) 的 time_str(如10:00)时间执行 + """ + + def decorator(func: Callable): + async def wrapper(): + while True: + now = datetime.now() + target_time = datetime.strptime(time_str, "%H:%M").time() + + # 构造下一个执行时间 + days_ahead = (weekday - now.weekday() + 7) % 7 + target_date = now.date() + timedelta(days=days_ahead) + target_dt = datetime.combine(target_date, target_time) + + if target_dt <= now: + target_dt += timedelta(days=7) + + sleep_secs = (target_dt - now).total_seconds() + await asyncio.sleep(sleep_secs) + await func() + + self.tasks.append(wrapper) + return func + + return decorator + + async def run_all(self): + await asyncio.gather(*(task() for task in self.tasks)) + + +# 全局唯一 job 管理器 +async_job = AsyncJob() diff --git a/main.py b/main.py index b9f20fe..eaf22f0 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,9 @@ #! /usr/bin/env python3 # -*- coding: utf-8 -*- +import asyncio import threading + +from async_job import async_job from configuration import Config from robot import Robot @@ -46,29 +49,8 @@ def main(): robot.LOG.info("wechat_ipad客户端启动成功") else: robot.LOG.error("wechat_ipad客户端启动失败") - # # 每天 8:30 发送新闻 - robot.onEveryTime("08:30", robot.news_baidu_report_auto) - # - # # epic - robot.onEveryTime("10:30", robot.send_epic_free_games) - # - # # message report 1:数据自动从redis 转到sqllite - robot.onEveryTime("02:30", robot.message_count_to_db) - # # 从db中提取并发送给相关群 - robot.onEveryTime("09:30", robot.generate_and_send_ranking) - # - # # sehuatang - robot.onEveryTime("15:30", robot.generate_sehuatang_pdf) - # - # # 秀人网每天自动下载帖子 - robot.onEveryTime("01:30", robot.xiu_ren_download_task) - # - # # 秀人网每天自动发pdf - # robot.onEveryTime("17:30", robot.xiu_ren_pdf_send) - - # 每天进行二次登录检查 - robot.onEveryHours(3, robot.login_twice_auto_auth) - + # 注册定时任务 + # jobs(robot) # 启动Dashboard服务器 dashboard_server = None try: @@ -83,9 +65,35 @@ def main(): except Exception as e: robot.LOG.error(f"Dashboard服务器启动失败: {e}") + asyncio.run(async_job.run_all()) # 让机器人一直跑 robot.keep_running_and_block_process() +# def jobs(robot: Robot): +# # # 每天 8:30 发送新闻 +# robot.onEveryTime("08:30", robot.news_baidu_report_auto) +# # +# # # epic +# robot.onEveryTime("10:30", robot.send_epic_free_games) +# # +# # # message report 1:数据自动从redis 转到sqllite +# robot.onEveryTime("02:30", robot.message_count_to_db) +# # # 从db中提取并发送给相关群 +# robot.onEveryTime("09:30", robot.generate_and_send_ranking) +# # +# # # sehuatang +# robot.onEveryTime("15:30", robot.generate_sehuatang_pdf) +# # +# # # 秀人网每天自动下载帖子 +# robot.onEveryTime("01:30", robot.xiu_ren_download_task) +# # +# # # 秀人网每天自动发pdf +# # robot.onEveryTime("17:30", robot.xiu_ren_pdf_send) +# +# # 每天进行二次登录检查 +# robot.onEveryHours(3, robot.login_twice_auto_auth) + + if __name__ == "__main__": main() diff --git a/robot.py b/robot.py index 04f1e11..ef91b5b 100644 --- a/robot.py +++ b/robot.py @@ -8,10 +8,10 @@ import toml import wechat_ipad from loguru import logger +from async_job import async_job from base.func_epic import is_friday, get_free from base.func_news import News from configuration import Config -from job_mgmt import Job from plugin_common.event_system import EventType, EventSystem from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus @@ -30,7 +30,7 @@ from wechat_ipad.models.message import WxMessage, MessageType from xiuren.meitu_dl import meitu_dowload_pub_pic -class Robot(Job): +class Robot: """个性化自己的机器人 """ @@ -448,7 +448,7 @@ class Robot(Job): 保持机器人运行,不让进程退出 """ while True: - self.runPendingJobs() + # self.runPendingJobs() time.sleep(1) # 添加一个方法用于刷新联系人信息 @@ -632,70 +632,71 @@ class Robot(Job): # ============================================== 业务内容========================================================== - def news_baidu_report_auto(self) -> None: + @async_job.at_times(["08:30"]) + async def news_baidu_report_auto(self) -> None: try: news = News().get_baidu_news() - self.send_group_txt_message(news, Feature.DAILY_NEWS) + await self.send_group_txt_message(news, Feature.DAILY_NEWS) except Exception as e: self.LOG.error(f"newsBaiduReportAuto error:{e}") - def news_en_report(self, website, sender: str = None) -> None: + async def news_en_report(self, website, sender: str = None) -> None: try: news = News().get_eng_news(website) - self.ipad_bot.send_text_message(sender, news) + await self.ipad_bot.send_text_message(sender, news) except Exception as e: self.LOG.error(f"newsEnReport error:{e}") # 使用装饰器标记定时任务 星期五 10:30 执行 - def send_epic_free_games(self): + @async_job.every_weekday_time(weekday=4, time_str="10:00") # 0=周一,4=周五 + async def send_epic_free_games(self): try: if is_friday(): games = get_free() - asyncio.run(self.send_group_txt_message(games, Feature.EPIC)) + await self.send_group_txt_message(games, Feature.EPIC) except Exception as e: self.LOG.error(f"sendEpicFreeGames error:{e}") # 使用装饰器标记定时任务 - - def message_count_to_db(self): + @async_job.at_times(["02:30"]) + async def message_count_to_db(self): try: self.message_storage.write_to_db() except Exception as e: self.LOG.error(f"write_to_db error:{e}") - def generate_sehuatang_pdf(self): + @async_job.at_times(["15:30"]) + async def generate_sehuatang_pdf(self): try: self.LOG.info("开始生成PDF,generate_sehuatang_pdf") path = pdf_file_path() # 暂时只发4K群 - asyncio.run(self.send_group_file_message(path, Feature.PDF_CAPABILITY)) + await self.send_group_file_message(path, Feature.PDF_CAPABILITY) except Exception as e: self.LOG.error(f"generateSehuatangPdf error:{e}") - def xiu_ren_download_task(self): + @async_job.at_times(["01:30"]) + async def xiu_ren_download_task(self): try: # 每天下载10组图,然后发一个帖子PDF meitu_dowload_pub_pic() except Exception as e: self.LOG.error(f"xiu_ren_download_task error:{e}") - def generate_and_send_ranking(self): + @async_job.at_times(["09:30"]) + async def generate_and_send_ranking(self): try: receivers = self.gbm.get_group_list() if not receivers: return - - async def send_all(): - tasks = [] - for r in receivers: - if self.gbm.get_group_permission(r, Feature.DAILY_SUMMARY) == PermissionStatus.ENABLED: - output = self.message_storage.generate_and_send_ranking(r, self.allContacts) - tasks.append(self.ipad_bot.send_text_message(r, output)) - await asyncio.gather(*tasks) - - asyncio.run(send_all()) - + for r in receivers: + if self.gbm.get_group_permission(r, Feature.DAILY_SUMMARY) == PermissionStatus.ENABLED: + output = self.message_storage.generate_and_send_ranking(r, self.allContacts) + await self.ipad_bot.send_text_message(r, output) except Exception as e: self.LOG.error(f"SendRanking error:{e}") + @async_job.at_times(["10:23"]) + async def job_test(self): + await self.ipad_bot.send_text_message("Jyunere","测试任务!") \ No newline at end of file