From 050ff9e6cf72a4a2c63dbe5cfcb0f65e5eaaab48 Mon Sep 17 00:00:00 2001 From: Changhua Date: Tue, 27 Sep 2022 21:48:20 +0800 Subject: [PATCH] Demo Weather Robot --- main.py | 29 +++++++++++++--- requirements.txt | 1 + robot/base_robot.py | 22 ++++++++++-- robot/job_mgmt.py | 82 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 7 deletions(-) create mode 100644 robot/job_mgmt.py diff --git a/main.py b/main.py index 049beb3..385d6aa 100644 --- a/main.py +++ b/main.py @@ -2,7 +2,6 @@ # -*- coding: utf-8 -*- import re -import time import robot.sdk.wcferry as WxSDK from robot.base_robot import BaseRobot @@ -10,6 +9,9 @@ from robot.configuration import Config class Robot(BaseRobot): + """个性化自己的机器人 + """ + def __init__(self, sdk: WxSDK, config: Config) -> None: super().__init__(sdk, config) @@ -36,14 +38,33 @@ class Robot(BaseRobot): self.allContacts[msg.wxId] = nickName +def weather_report(robot: Robot): + """模拟发送天气预报 + """ + # 获取接收人 + receivers = ["filehelper"] + + # 获取天气,需要自己实现,可以参考 https://gitee.com/lch0821/WeatherScrapy 获取天气。 + report = "这就是获取到的天气情况了" + + for r in receivers: + robot.sendTextMsg(r, report) + + def main(): robot = Robot(WxSDK, Config()) + + # 初始化机器人 robot.initSDK() + + # 接收消息 robot.enableRecvMsg() - while True: - time.sleep(1) - # 不让进程退出,否则机器人就退出了 + # 每天7点发送天气预报 + robot.onEveryTime("07:00", weather_report, robot=robot) + + # 让机器人一直跑 + robot.keepRunningAndBlockProcess() if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index 5500f00..d80ccf3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ PyYAML +schedule diff --git a/robot/base_robot.py b/robot/base_robot.py index d71016b..9581ccb 100644 --- a/robot/base_robot.py +++ b/robot/base_robot.py @@ -1,11 +1,18 @@ # -*- coding: utf-8 -*- import re +import time import logging import xml.etree.ElementTree as ET +from robot.job_mgmt import Job + + +class BaseRobot(Job): + """ + 机器人基类。用户需要实现 `processMsg` 方法以个性化处理消息。 + """ -class BaseRobot(object): def __init__(self, sdk, config) -> None: self.sdk = sdk self.config = config @@ -67,8 +74,9 @@ class BaseRobot(object): self.LOG.info(rmsg) def getAllContacts(self): - """ 获取联系人(包括好友、公众号、服务号、群成员……) - {"wxid": "NickName"} + """ + 获取联系人(包括好友、公众号、服务号、群成员……) + 格式: {"wxid": "NickName"} """ contacts = self.sdk.WxExecDbQuery("MicroMsg.db", "SELECT UserName, NickName FROM Contact;") return {contact["UserName"]: contact["NickName"] for contact in contacts} @@ -85,3 +93,11 @@ class BaseRobot(object): def processMsg(self, msg) -> None: raise NotImplementedError("Method [processMsg] should be implemented.") + + def keepRunningAndBlockProcess(self) -> None: + """ + 保持机器人运行,不让进程退出 + """ + while True: + self.runPendingJobs() + time.sleep(1) diff --git a/robot/job_mgmt.py b/robot/job_mgmt.py new file mode 100644 index 0000000..24a1499 --- /dev/null +++ b/robot/job_mgmt.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- + +import time +import schedule + + +class Job(object): + def __init__(self) -> None: + pass + + def onEverySeconds(self, seconds, task, *args, **kwargs): + """ + 每 seconds 秒执行 + :param seconds: 间隔,秒 + :param task: 定时执行的方法 + :return: None + """ + schedule.every(seconds).seconds.do(task, *args, **kwargs) + + def onEveryMinutes(self, minutes, task, *args, **kwargs): + """ + 每 minutes 分钟执行 + :param minutes: 间隔,分钟 + :param task: 定时执行的方法 + :return: None + """ + schedule.every(minutes).minutes.do(task, *args, **kwargs) + + def onEveryHours(self, hours, task, *args, **kwargs): + """ + 每 hours 小时执行 + :param hours: 间隔,小时 + :param task: 定时执行的方法 + :return: None + """ + schedule.every(hours).hours.do(task, *args, **kwargs) + + def onEveryDays(self, days, task, *args, **kwargs): + """ + 每 days 天执行 + :param days: 间隔,天 + :param task: 定时执行的方法 + :return: None + """ + schedule.every(days).days.do(task, *args, **kwargs) + + def onEveryTime(self, times, task, *args, **kwargs): + """ + 每天定时执行 + :param times: 时间字符串列表,格式: + - For daily jobs -> HH:MM:SS or HH:MM + - For hourly jobs -> MM:SS or :MM + - For minute jobs -> :SS + :param task: 定时执行的方法 + :return: None + + 例子: times=["10:30", "10:45", "11:00"] + """ + if not isinstance(times, list): + times = [times] + + for t in times: + schedule.every(1).days.at(t).do(task, *args, **kwargs) + + def runPendingJobs(self): + schedule.run_pending() + + +if __name__ == "__main__": + def printStr(s): + print(s) + + job = Job() + job.onEverySeconds(59, printStr, "onEverySeconds 59") + job.onEveryMinutes(59, printStr, "onEveryMinutes 59") + job.onEveryHours(23, printStr, "onEveryHours 23") + job.onEveryDays(1, printStr, "onEveryDays 1") + job.onEveryTime("23:59", printStr, "onEveryTime 23:59") + + while True: + job.runPendingJobs() + time.sleep(1)