# -*- coding: utf-8 -*- import logging import random import time from typing import Optional from wcferry import Wcf class MessageUtil: """ 消息发送工具类,封装了发送文本和文件的方法 """ def __init__(self, wcf: Wcf, contacts: Optional[dict] = None) -> None: """ 初始化消息工具类 :param wcf: WCFerry实例 :param contacts: 联系人字典,格式为 {"wxid": "NickName"} """ self.wcf = wcf self.contacts = contacts or {} self.LOG = logging.getLogger("MessageUtil") def send_text_msg(self, msg: str, receiver: str, at_list: str = "") -> None: """ 发送文本消息 :param msg: 消息字符串 :param receiver: 接收人wxid或者群id :param at_list: 要@的wxid, @所有人的wxid为:notify@all """ # 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险 time.sleep(random.uniform(0.3, 1.0)) ats = "" if at_list: if at_list == "notify@all": # @所有人 ats = " @所有人" else: wxids = at_list.split(",") for wxid in wxids: # 根据 wxid 查找群昵称 ats += f" @{self.wcf.get_alias_in_chatroom(wxid, receiver)}" # {msg}{ats} 表示要发送的消息内容后面紧跟@,例如 北京天气情况为:xxx @张三 if ats == "": self.LOG.info(f"To {receiver}: {msg}") self.wcf.send_text(f"{msg}", receiver, at_list) else: self.LOG.info(f"To {receiver}: {ats}\r{msg}") self.wcf.send_text(f"{ats}\n{msg}", receiver, at_list) def send_file(self, file_path: str, receiver: str) -> None: """ 发送文件消息 :param file_path: 文件路径 :param receiver: 接收人wxid或者群id """ # 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险 time.sleep(random.uniform(0.5, 1.5)) self.LOG.info(f"Sending file to {receiver}: {file_path}") self.wcf.send_file(file_path, receiver) def update_contacts(self, contacts: dict) -> None: """ 更新联系人字典 :param contacts: 联系人字典,格式为 {"wxid": "NickName"} """ self.contacts.update(contacts)