Files
abot/message_util.py
2025-04-23 11:47:39 +08:00

125 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import logging
import os.path
import random
import time
from typing import Optional
from gewechat_client import GewechatClient
from gewechat.client import gewe_client
from utils.wechat.contact_manager import ContactManager
class MessageUtil:
"""
消息发送工具类,封装了发送文本和文件的方法
"""
# 修改 MessageUtil 类的初始化方法,接受联系人管理器而不是联系人字典
def __init__(self):
self.app_id = gewe_client.app_id
self.client = gewe_client.client
self.contact_manager = ContactManager.get_instance()
self.LOG = logging.getLogger("MessageUtil")
def send_text(self, msg: str, receiver: str, at_list: str = "") -> None:
"""
发送文本消息
:param msg: 消息字符串
:param receiver: 接收人wxid或者群id
:param at_list: 要@的wxid, @所有人的wxid为notify@all
"""
# 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险
time.sleep(random.uniform(0.3, 1.0))
ats = ""
if at_list:
if at_list == "notify@all": # @所有人
ats = " @所有人"
else:
wxids = at_list.split(",")
if len(wxids) > 0:
ats += self.get_user_chatroom_nickname(receiver, wxids)
# {msg}{ats} 表示要发送的消息内容后面紧跟@,例如 北京天气情况为xxx @张三
if ats == "":
self.LOG.info(f"To {receiver}: {msg}")
self.client.post_text(self.app_id, receiver, "{msg}", "")
else:
self.LOG.info(f"To {receiver}: {ats}\r{msg}")
self.client.post_text(self.app_id, receiver, f"{ats}\n{msg}", at_list)
def send_file(self, file_path: str, receiver: str) -> str:
"""
发送文件消息
:param file_path: 文件路径
:param receiver: 接收人wxid或者群id
"""
# 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险
time.sleep(random.uniform(0.5, 1.5))
self.LOG.info(f"Sending file to {receiver}: {file_path}")
(path, filename) = os.path.split(file_path)
self.LOG.info(f"Sending file to {path}: {filename}")
return self.client.post_file(self.app_id, receiver, file_path, filename)
def send_image(self, image_path: str, receiver: str) -> str:
"""
发送文件消息
:param image_path: 文件路径
:param receiver: 接收人wxid或者群id
"""
# 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险
time.sleep(random.uniform(0.5, 1.5))
self.LOG.info(f"Sending file to {receiver}: {image_path}")
return self.client.post_image(self.app_id, receiver, image_path)
def send_rich_text(self, name: str, account: str, title: str, digest: str, url: str, thumburl: str,
receiver: str) -> int:
"""
发送富文本消息
卡片样式:
|-------------------------------------|
|title, 最长两行 |
|(长标题, 标题短的话这行没有) |
|digest, 最多三行,会占位 |--------|
|digest, 最多三行,会占位 |thumburl|
|digest, 最多三行,会占位 |--------|
|(account logo) name |
|-------------------------------------|
:param name: 左下显示的名字
:param account: 填公众号 id 可以显示对应的头像gh_ 开头的)
:param title: 标题,最多两行
:param digest: 摘要,三行
:param url: 点击后跳转的链接
:param thumburl: 缩略图的链接
:param receiver: 接收人, wxid 或者 roomid
:return: 0 为成功,其他失败
"""
# 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险
time.sleep(random.uniform(0.5, 1.5))
self.LOG.info(f"Sending rich text to {receiver}: {title}")
return self.client.post_link(self.app_id, receiver, title, digest, url, thumburl)
def get_user_chatroom_nickname(self, chatroom_id: str, member_wxids: list[str]) -> str:
data = self.client.get_chatroom_member_detail(self.app_id, chatroom_id, member_wxids)
nicknames_with_at = [" @" + member["nickName"] for member in data["data"] if member.get("nickName")]
return " ".join(nicknames_with_at)
def invite_member(self, group_id, sender):
return self.client.invite_member(self.app_id, sender, group_id, "自动加群邀请")
def get_chatroom_members(self, group_id) -> dict:
data = self.client.get_chatroom_member_list(self.app_id, group_id)
members = {member["wxid"]: member["nickName"] for member in data["data"]["memberList"]}
return members