Files
abot/wechat_ipad/client/friend_circle.py
2026-04-07 12:50:50 +08:00

146 lines
6.2 KiB
Python

import base64
import os
from typing import Union
import aiofiles
import aiohttp
from wechat_ipad import UserLoggedOut
from wechat_ipad.client.base import WechatAPIClientBase
from wechat_ipad.models.friend_circle_info import build_friend_circle_xml
class FriendCircleMixin(WechatAPIClientBase):
async def get_friend_circle_list(self, max_id: int = 0, first_page_md5: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Maxid": max_id, "Fristpagemd5": first_page_md5}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/GetList", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def get_friend_circle_detail(self, towxid: str, max_id: int = 0, first_page_md5: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {
"Wxid": self.wxid,
"Towxid": towxid,
"Maxid": max_id,
"Fristpagemd5": first_page_md5
}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/GetDetail", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def get_friend_circle_id_detail(self, object_id: Union[str, int], towxid: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Towxid": towxid, "Id": int(object_id)}
response = await session.post(
f"http://{self.ip}:{self.port}/api/FriendCircle/GetIdDetail",
json=json_param
)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def publish_friend_circle(self, content: str, media_items: list[dict] | None = None,
blacklist: str = "", with_user_list: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
xml_content = build_friend_circle_xml(self.wxid, content, media_items=media_items)
async with aiohttp.ClientSession() as session:
json_param = {
"Wxid": self.wxid,
"Content": xml_content,
"BlackList": blacklist,
"WithUserList": with_user_list
}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Messages", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def friend_circle_comment(self, object_id: str, content: str = "", type: int = 2,
reply_comment_id: int = 0) -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {
"Wxid": self.wxid,
"Id": str(object_id),
"Type": int(type),
"Content": content,
"ReplyCommnetId": int(reply_comment_id)
}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Comment", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def friend_circle_operation(self, object_id: str, type: int, comment_id: int = 0) -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {
"Wxid": self.wxid,
"Id": str(object_id),
"Type": int(type),
"CommnetId": int(comment_id)
}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Operation", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def sync_friend_circle(self, sync_key: str = "") -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Synckey": sync_key}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/MmSnsSync", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)
async def upload_friend_circle_media(self, media: Union[str, bytes, os.PathLike]) -> dict:
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(media, str):
media_base64 = media.split(",", 1)[1] if "," in media else media
elif isinstance(media, bytes):
media_base64 = base64.b64encode(media).decode()
elif isinstance(media, os.PathLike):
async with aiofiles.open(media, "rb") as f:
media_base64 = base64.b64encode(await f.read()).decode()
else:
raise ValueError("media should be str, bytes, or path")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Base64": media_base64}
response = await session.post(f"http://{self.ip}:{self.port}/api/FriendCircle/Upload", json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data", {})
self.error_handler(json_resp)