新增消息工具包,防止代码重复,方便@ 逻辑
This commit is contained in:
74
message_util.py
Normal file
74
message_util.py
Normal file
@@ -0,0 +1,74 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from wcferry import Wcf
|
||||
|
||||
|
||||
class MessageUtil:
|
||||
"""
|
||||
消息发送工具类,封装了发送文本和文件的方法
|
||||
"""
|
||||
|
||||
def __init__(self, wcf: Wcf, contacts: Optional[dict] = None) -> None:
|
||||
"""
|
||||
初始化消息工具类
|
||||
|
||||
:param wcf: WCFerry实例
|
||||
:param contacts: 联系人字典,格式为 {"wxid": "NickName"}
|
||||
"""
|
||||
self.wcf = wcf
|
||||
self.contacts = contacts or {}
|
||||
self.LOG = logging.getLogger("MessageUtil")
|
||||
|
||||
def send_text_msg(self, msg: str, receiver: str, at_list: str = "") -> None:
|
||||
"""
|
||||
发送文本消息
|
||||
|
||||
:param msg: 消息字符串
|
||||
:param receiver: 接收人wxid或者群id
|
||||
:param at_list: 要@的wxid, @所有人的wxid为:notify@all
|
||||
"""
|
||||
# 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险
|
||||
time.sleep(random.uniform(0.3, 1.0))
|
||||
|
||||
ats = ""
|
||||
if at_list:
|
||||
if at_list == "notify@all": # @所有人
|
||||
ats = " @所有人"
|
||||
else:
|
||||
wxids = at_list.split(",")
|
||||
for wxid in wxids:
|
||||
# 根据 wxid 查找群昵称
|
||||
ats += f" @{self.wcf.get_alias_in_chatroom(wxid, receiver)}"
|
||||
|
||||
# {msg}{ats} 表示要发送的消息内容后面紧跟@,例如 北京天气情况为:xxx @张三
|
||||
if ats == "":
|
||||
self.LOG.info(f"To {receiver}: {msg}")
|
||||
self.wcf.send_text(f"{msg}", receiver, at_list)
|
||||
else:
|
||||
self.LOG.info(f"To {receiver}: {ats}\r{msg}")
|
||||
self.wcf.send_text(f"{ats}\n\n{msg}", receiver, at_list)
|
||||
|
||||
def send_file(self, file_path: str, receiver: str) -> None:
|
||||
"""
|
||||
发送文件消息
|
||||
|
||||
:param file_path: 文件路径
|
||||
:param receiver: 接收人wxid或者群id
|
||||
"""
|
||||
# 风控处理,随机延迟发送,解决群消息高频发送导致的微信风险
|
||||
time.sleep(random.uniform(0.5, 1.5))
|
||||
|
||||
self.LOG.info(f"Sending file to {receiver}: {file_path}")
|
||||
self.wcf.send_file(file_path, receiver)
|
||||
|
||||
def update_contacts(self, contacts: dict) -> None:
|
||||
"""
|
||||
更新联系人字典
|
||||
|
||||
:param contacts: 联系人字典,格式为 {"wxid": "NickName"}
|
||||
"""
|
||||
self.contacts.update(contacts)
|
||||
Reference in New Issue
Block a user