# -*- coding: utf-8 -*- from loguru import logger import os.path import random import time from typing import Any, Tuple, Coroutine from utils.wechat.contact_manager import ContactManager from wechat_ipad import WechatAPIClient class MessageUtil: """ 消息发送工具类,封装了发送文本和文件的方法 """ # 修改 MessageUtil 类的初始化方法,接受联系人管理器而不是联系人字典 def __init__(self, client: WechatAPIClient = None): self.client = client self.contact_manager = ContactManager.get_instance() self.LOG = logger def send_text(self, msg: str, receiver: str, at_list: str = "") -> None: """ 发送文本消息 :param msg: 消息字符串 :param receiver: 接收人wxid或者群id :param at_list: 要@的wxid, @所有人的wxid为:notify@all """ # 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险 time.sleep(random.uniform(0.3, 1.0)) ats = "" if at_list: if at_list == "notify@all": # @所有人 ats = " @所有人" else: wxids = at_list.split(",") if len(wxids) > 0: ats += self.client.get_nickname(receiver) # {msg}{ats} 表示要发送的消息内容后面紧跟@,例如 北京天气情况为:xxx @张三 if ats == "": self.LOG.info(f"To {receiver}: {msg}") self.client.send_text_message(receiver, "{msg}", "") else: self.LOG.info(f"To {receiver}: {ats}\r{msg}") self.client.send_text_message(receiver, f"{ats}\n{msg}", at_list) def send_file(self, file_path: str, receiver: str) -> str: """ 发送文件消息 :param file_path: 文件路径 :param receiver: 接收人wxid或者群id """ # 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险 time.sleep(random.uniform(0.5, 1.5)) self.LOG.info(f"Sending file to {receiver}: {file_path}") (path, filename) = os.path.split(file_path) self.LOG.info(f"Sending file to {path}: {filename}") return self.client.post_file(self.app_id, receiver, file_path, filename) def send_image(self, image_path: str, receiver: str) -> Coroutine[Any, Any, tuple[int, int, int]]: """ 发送文件消息 :param image_path: 文件路径 :param receiver: 接收人wxid或者群id """ # 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险 time.sleep(random.uniform(0.5, 1.5)) self.LOG.info(f"Sending file to {receiver}: {image_path}") return self.client.send_image_message(receiver, image_path) def send_rich_text(self, name: str, account: str, title: str, digest: str, url: str, thumburl: str, receiver: str) -> int: """ 发送富文本消息 卡片样式: |-------------------------------------| |title, 最长两行 | |(长标题, 标题短的话这行没有) | |digest, 最多三行,会占位 |--------| |digest, 最多三行,会占位 |thumburl| |digest, 最多三行,会占位 |--------| |(account logo) name | |-------------------------------------| :param name: 左下显示的名字 :param account: 填公众号 id 可以显示对应的头像(gh_ 开头的) :param title: 标题,最多两行 :param digest: 摘要,三行 :param url: 点击后跳转的链接 :param thumburl: 缩略图的链接 :param receiver: 接收人, wxid 或者 roomid :return: 0 为成功,其他失败 """ # 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险 time.sleep(random.uniform(0.5, 1.5)) self.LOG.info(f"Sending rich text to {receiver}: {title}") return self.client.send_link_message(self.app_id, receiver, title, digest, url, thumburl) def invite_member(self, group_id, sender): return self.client.invite_chatroom_member( sender, group_id) def get_chatroom_members(self, group_id) -> dict: data = self.client.get_chatroom_member_list(self.app_id, group_id) members = {member["wxid"]: member["nickName"] for member in data["data"]["memberList"]} return members