Files
abot/admin/dashboard/blueprints/contacts.py

1110 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import base64
import os
import re
import threading
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import quote
from flask import Blueprint, render_template, jsonify, request, current_app, redirect, send_file
from .auth import login_required
from loguru import logger
from utils.wechat.emoji_semantic_parser import (
dedupe_emoji_semantic_candidates as shared_dedupe_emoji_semantic_candidates,
extract_emoji_meta as shared_extract_emoji_meta,
extract_emoji_semantic_info as shared_extract_emoji_semantic_info,
)
# 创建联系人管理蓝图
contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts')
# 创建线程池
message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="message_sender_")
# 创建共享的事件循环
shared_loop = None
loop_lock = threading.Lock()
contacts_refresh_lock = threading.Lock()
contacts_refresh_running = False
_EMOJI_MD5_RE = re.compile(r'md5\s*=\s*[\"\']([0-9a-fA-F]{16,64})[\"\']', re.IGNORECASE)
_EMOJI_TOTALLEN_RE = re.compile(r'(?:totallen|total_len|len)\s*=\s*[\"\'](\d+)[\"\']', re.IGNORECASE)
_EMOJI_BASE64_RE = re.compile(r"^[A-Za-z0-9+/=]+$")
_EMOJI_LOCALE_KEYS = {"zh_cn", "zh_tw", "zh_hk", "default", "en", "ja", "ko"}
_EMOJI_SEMANTIC_STOPWORDS = {
"default",
"zh_cn",
"zh_tw",
"zh_hk",
"en",
"ja",
"ko",
"opus",
"gif",
"png",
"jpg",
"jpeg",
"webp",
}
def get_or_create_loop():
"""获取或创建共享的事件循环"""
global shared_loop
with loop_lock:
if shared_loop is None:
shared_loop = asyncio.new_event_loop()
# 在新线程中运行事件循环
def run_loop():
asyncio.set_event_loop(shared_loop)
shared_loop.run_forever()
loop_thread = threading.Thread(target=run_loop, daemon=True)
loop_thread.start()
return shared_loop
def send_message_in_thread(func, *args, **kwargs):
"""使用共享事件循环发送消息"""
def run():
try:
loop = get_or_create_loop()
# 创建异步任务
async def send():
try:
await func(*args, **kwargs)
except Exception as e:
logger.error(f"发送消息失败: {e}")
# 在共享事件循环中运行任务
future = asyncio.run_coroutine_threadsafe(send(), loop)
# 等待任务完成,设置超时时间
future.result(timeout=30)
except Exception as e:
logger.error(f"消息发送任务执行失败: {e}")
# 使用线程池提交任务
message_thread_pool.submit(run)
def run_member_context_refresh_in_thread(func, *args, **kwargs):
"""在线程池中异步刷新成员交互摘要,避免阻塞请求线程"""
def run():
try:
func(*args, **kwargs)
except Exception as e:
logger.error(f"成员交互摘要后台刷新失败: {e}")
message_thread_pool.submit(run)
def run_contacts_refresh_in_thread(server):
"""将通讯录刷新放到后台线程执行,避免阻塞后台请求与系统主链路。"""
global contacts_refresh_running
with contacts_refresh_lock:
if contacts_refresh_running:
return False
contacts_refresh_running = True
def run():
global contacts_refresh_running
try:
logger.info("通讯录后台刷新任务开始执行")
asyncio.run(server.robot.refresh_contacts_db())
logger.info("通讯录后台刷新任务执行完成")
except Exception as e:
logger.error(f"通讯录后台刷新任务执行失败: {e}")
finally:
with contacts_refresh_lock:
contacts_refresh_running = False
message_thread_pool.submit(run)
return True
def _safe_text(value):
return "" if value is None else str(value)
def _parse_app_message_payload(content: str):
payload = {
"title": "",
"description": "",
"url": "",
"app_type": ""
}
if not content:
return payload
text = _safe_text(content).strip()
if not text.startswith("<"):
payload["description"] = text
return payload
try:
root = ET.fromstring(text)
payload["title"] = _safe_text(root.findtext('.//title')).strip()
payload["description"] = _safe_text(root.findtext('.//des')).strip()
payload["url"] = _safe_text(root.findtext('.//url')).strip()
payload["app_type"] = _safe_text(root.findtext('.//type')).strip()
except Exception:
payload["description"] = text
return payload
def _parse_sys_message_payload(content: str):
payload = {
"sysmsg_type": "",
"summary": "",
"replace_msg": "",
"session": "",
"msgid": "",
"newmsgid": ""
}
text = _safe_text(content).strip()
if not text.startswith("<sysmsg"):
payload["summary"] = text
return payload
try:
root = ET.fromstring(text)
except Exception:
payload["summary"] = text
return payload
payload["sysmsg_type"] = _safe_text(root.attrib.get("type")).strip()
if payload["sysmsg_type"] == "revokemsg":
revoke_node = root.find("revokemsg")
if revoke_node is not None:
payload["session"] = _safe_text(revoke_node.findtext("session")).strip()
payload["msgid"] = _safe_text(revoke_node.findtext("msgid")).strip()
payload["newmsgid"] = _safe_text(revoke_node.findtext("newmsgid")).strip()
payload["replace_msg"] = _safe_text(revoke_node.findtext("replacemsg")).strip()
payload["summary"] = payload["replace_msg"] or "撤回了一条消息"
return payload
payload["summary"] = _safe_text(root.findtext(".//content")).strip() or text
return payload
def _compact_media_caption(content: str, fallback: str) -> str:
text = _safe_text(content).strip()
if not text:
return fallback
if text.startswith("<"):
return fallback
return text
def _extract_emoji_meta(attachment_url: str, image_path: str):
# 这里保留原函数签名,内部切到共享解析模块,避免后台和自动回复两边出现语义漂移。
return shared_extract_emoji_meta(attachment_url)
def _read_protobuf_varint(payload: bytes, offset: int):
"""读取 protobuf varint。
说明:
1. 微信表情的 desc / emojiattr 经常是 base64 后的 protobuf 片段;
2. 这里不依赖 schema只做最小化的通用 varint 解析,便于递归提取字符串字段;
3. 一旦遇到异常字节,直接抛错交给上层兜底,避免误读出脏语义。
"""
result = 0
shift = 0
index = offset
while index < len(payload) and shift <= 63:
current = payload[index]
index += 1
result |= (current & 0x7F) << shift
if not (current & 0x80):
return result, index
shift += 7
raise ValueError("protobuf varint 读取失败")
def _extract_protobuf_strings(payload: bytes, depth: int = 0):
"""递归提取 protobuf length-delimited 字段中的 UTF-8 文本。
说明:
1. 这里的目标不是完整反序列化,而是从未知结构中尽量稳定地把“可读文本”捞出来;
2. desc 常见格式是 zh_cn/default 语言包嵌套结构,递归 2 层就足够覆盖;
3. 如果字段本身是纯文本,递归会自然停掉,不会影响最终结果。
"""
if not payload:
return []
results = []
index = 0
while index < len(payload):
try:
tag, index = _read_protobuf_varint(payload, index)
except Exception:
break
if tag <= 0:
break
wire_type = tag & 0x07
if wire_type == 0:
try:
_, index = _read_protobuf_varint(payload, index)
except Exception:
break
continue
if wire_type == 1:
index += 8
continue
if wire_type == 5:
index += 4
continue
if wire_type != 2:
break
try:
length, index = _read_protobuf_varint(payload, index)
except Exception:
break
if length < 0 or index + length > len(payload):
break
chunk = payload[index:index + length]
index += length
if not chunk:
continue
try:
decoded = chunk.decode("utf-8")
except Exception:
decoded = ""
if decoded:
results.append(decoded)
if depth < 2:
results.extend(_extract_protobuf_strings(chunk, depth + 1))
return results
def _sanitize_emoji_semantic_text(value: str):
"""清洗候选语义文本,去掉控制字符和多余空白。"""
text = "".join(ch for ch in _safe_text(value) if ch.isprintable()).strip()
text = re.sub(r"\s+", " ", text)
return text.strip()
def _is_emoji_semantic_candidate(value: str):
"""判断一个候选文本是否像“可读的表情语义”。
说明:
1. 过滤 locale key、文件扩展名、产品 ID 这类元数据;
2. 只保留包含中文或英文字母的短文本,避免把长链接、哈希、协议字段误当语义;
3. 单字语义也允许保留,例如“害”这类表情实际就有意义。
"""
text = _sanitize_emoji_semantic_text(value)
if not text:
return False
lowered = text.lower()
if lowered in _EMOJI_LOCALE_KEYS or lowered in _EMOJI_SEMANTIC_STOPWORDS:
return False
if any(locale_key in lowered for locale_key in _EMOJI_LOCALE_KEYS):
return False
if lowered.startswith("com.tencent.") or lowered.startswith("finder:"):
return False
if re.fullmatch(r"[0-9a-f]{16,64}", lowered):
return False
if len(text) >= 8 and _EMOJI_BASE64_RE.fullmatch(text):
return False
if len(text) > 40:
return False
return bool(re.search(r"[\u4e00-\u9fffA-Za-z]", text))
def _dedupe_emoji_semantic_candidates(values):
"""按出现顺序去重候选语义文本。"""
return shared_dedupe_emoji_semantic_candidates(values)
def _maybe_decode_base64_payload(value: str):
"""尽量把字段值解成 base64 原始字节,失败时返回空字节。
说明:
1. 微信的 desc / emojiattr 并不总是明文,有不少是 base64 包起来的 protobuf
2. 这里先做格式筛选,避免把普通中文直接当 base64 解坏;
3. 允许缺省 padding兼容历史数据里的非标准尾部。
"""
normalized = re.sub(r"\s+", "", _safe_text(value))
if len(normalized) < 4 or not _EMOJI_BASE64_RE.fullmatch(normalized):
return b""
normalized += "=" * (-len(normalized) % 4)
try:
return base64.b64decode(normalized, validate=False)
except Exception:
return b""
def _decode_emoji_semantic_value(value: str):
"""解析单个表情语义字段,输出候选语义文本列表。
说明:
1. 若字段本身就是明文中文,直接保留;
2. 若字段是 base64则先尝试整段 UTF-8再递归提取 protobuf 内嵌字符串;
3. 最终统一做去重和脏值过滤,避免把 locale key 一起带回前端。
"""
raw_text = _safe_text(value).strip()
if not raw_text:
return []
candidates = []
if _is_emoji_semantic_candidate(raw_text):
candidates.append(raw_text)
decoded_bytes = _maybe_decode_base64_payload(raw_text)
if not decoded_bytes:
return _dedupe_emoji_semantic_candidates(candidates)
protobuf_texts = _extract_protobuf_strings(decoded_bytes)
candidates.extend(protobuf_texts)
# 某些 emojiattr 不是 protobuf而是“base64 后的纯文本”。
# 只有在 protobuf 路径没抽到结果时,才退回整段 UTF-8 解码,避免把外层语言包拼接串带进来。
if not _dedupe_emoji_semantic_candidates(candidates):
try:
decoded_text = decoded_bytes.decode("utf-8")
except Exception:
decoded_text = ""
if decoded_text:
candidates.append(decoded_text)
return _dedupe_emoji_semantic_candidates(candidates)
def _extract_emoji_semantic_info(attachment_url: str):
"""从表情 XML 中提取“可读语义”。
这里统一走共享解析模块,保证后台展示、持久化回填和自动回复使用同一套语义规则。
"""
return shared_extract_emoji_semantic_info(attachment_url)
def _parse_positive_int(value):
"""将任意输入尽量解析为正整数,失败时返回 0。
说明:
1. 前端可能传 total_length / totalLength / len类型也可能是字符串
2. 统一在这里收口,避免每个分支都重复写 try/except。
"""
try:
parsed = int(value)
except Exception:
return 0
return parsed if parsed > 0 else 0
def _get_emoji_asset_by_md5(message_storage, md5: str):
"""从消息存储中按 md5 反查表情原始记录。
说明:
1. 优先走 message_storage 自身方法,兼容未来把查询逻辑上移;
2. 若当前实例没有该方法,则回退到底层 message_db
3. 查不到时返回 None让上层决定是否报错。
"""
if not message_storage or not md5:
return None
# 优先读取持久化表情资产:
# 1. 这张表已经做过语义和参数收敛,命中速度更快;
# 2. 若拿不到,再回退到原始 messages 表反查,兼容老数据和初始化阶段;
# 3. 这样后台发送、表情库展示、自动回复三条链路都共享统一资产源。
emoji_asset_db = getattr(message_storage, "emoji_asset_db", None)
if emoji_asset_db and hasattr(emoji_asset_db, "get_persisted_emoji_asset_by_md5"):
asset = emoji_asset_db.get_persisted_emoji_asset_by_md5(md5)
if asset:
return {
"attachment_url": "",
"image_path": _safe_text(asset.get("preview_url")).strip(),
"message_id": _safe_text(asset.get("sample_message_id")).strip(),
"group_id": _safe_text(asset.get("sample_group_id")).strip(),
"sender": _safe_text(asset.get("sample_sender")).strip(),
"md5": _safe_text(asset.get("md5")).strip(),
"total_length": asset.get("total_length"),
}
if hasattr(message_storage, "get_emoji_asset_by_md5"):
return message_storage.get_emoji_asset_by_md5(md5)
message_db = getattr(message_storage, "message_db", None)
if message_db and hasattr(message_db, "get_emoji_asset_by_md5"):
return message_db.get_emoji_asset_by_md5(md5)
return None
def _resolve_emoji_send_meta(message_storage, md5: str, total_length: int):
"""补全发送表情所需的 md5 与 total_length。
说明:
1. wechat_ipad 的 SendEmoji 接口并不是只要 md5还必须带 TotalLen
2. 当前端只传了 md5 或长度为空时,这里尝试从历史消息里反查原始 XML
3. 返回值始终是“规整后的 md5 + total_length”方便发送分支直接使用。
"""
normalized_md5 = _safe_text(md5).strip().lower()
normalized_total_length = _parse_positive_int(total_length)
if not re.fullmatch(r"[0-9a-f]{16,64}", normalized_md5):
return "", 0
if normalized_total_length > 0:
return normalized_md5, normalized_total_length
asset = _get_emoji_asset_by_md5(message_storage, normalized_md5)
if not asset:
return normalized_md5, 0
resolved_total_length = _parse_positive_int(asset.get("total_length"))
resolved_md5 = _safe_text(asset.get("md5")).strip().lower()
if not resolved_md5 or resolved_total_length <= 0:
resolved_md5, resolved_total_length = _extract_emoji_meta(
_safe_text(asset.get("attachment_url")),
_safe_text(asset.get("image_path"))
)
if resolved_md5 and resolved_md5 != normalized_md5:
# 历史数据如果出现大小写或异常值,以前端传入的 md5 为准,避免串表情。
logger.warning(f"表情参数回填命中 md5 不一致request_md5={normalized_md5}, record_md5={resolved_md5}")
return normalized_md5, _parse_positive_int(resolved_total_length)
def _normalize_recent_message(server, raw_message: dict, chat_type: str, target_wxid: str):
sender = _safe_text(raw_message.get("sender")).strip()
message_type = str(raw_message.get("message_type", ""))
content = _safe_text(raw_message.get("content")).strip()
image_path = _safe_text(raw_message.get("image_path")).strip()
attachment_url = _safe_text(raw_message.get("attachment_url")).strip()
message_thumb = _safe_text(raw_message.get("message_thumb")).strip()
self_wxid = _safe_text(getattr(server.robot, "wxid", "") or getattr(server.client, "wxid", "")).strip()
sender_name = sender or "未知发送者"
if chat_type == "group":
sender_name = server.contact_manager.get_group_name(target_wxid, sender) or sender_name
elif sender:
sender_name = server.contact_manager.get_nickname(sender) or sender_name
display_type = "text"
display_content = content
media_url = image_path or attachment_url or message_thumb
link_payload = None
if message_type == "3":
display_type = "image"
display_content = _compact_media_caption(content, "[图片]")
elif message_type in {"47", "1048625", "1090519089"}:
display_type = "image" if media_url else "text"
display_content = _compact_media_caption(content, "[表情]")
elif message_type == "34":
display_type = "voice"
display_content = _compact_media_caption(content, "[语音]")
elif message_type == "43":
display_type = "video"
display_content = _compact_media_caption(content, "[视频]")
elif message_type == "49":
app_payload = _parse_app_message_payload(content)
if app_payload.get("url") or app_payload.get("title"):
display_type = "link"
link_payload = app_payload
display_content = app_payload.get("title") or app_payload.get("description") or "[链接]"
else:
display_type = "text"
display_content = app_payload.get("description") or content or "[应用消息]"
elif message_type in {"10000", "10002"}:
display_type = "system"
sys_payload = _parse_sys_message_payload(content)
display_content = sys_payload.get("summary") or content or "[系统消息]"
link_payload = sys_payload
return {
"timestamp": _safe_text(raw_message.get("timestamp")),
"sender": sender,
"sender_name": sender_name,
"content": content,
"message_type": message_type,
"display_type": display_type,
"display_content": display_content,
"image_path": image_path,
"attachment_url": attachment_url,
"media_url": media_url,
"message_thumb": message_thumb,
"message_id": raw_message.get("message_id"),
"link_payload": link_payload,
"is_self": bool(self_wxid and sender == self_wxid),
"sysmsg_type": (link_payload or {}).get("sysmsg_type", "") if display_type == "system" else "",
}
def _build_dashboard_head_images(contact_manager):
"""构造后台可直接使用的头像地址映射。
说明:
1. 前端统一访问本蓝图的头像代理接口,这样可以优先命中本地缓存;
2. 头像 URL 哈希会拼到查询参数里,头像变更后浏览器会自然拉取最新版本;
3. 即便本地缓存暂时不存在,代理接口也还能回退到远端头像地址,不影响页面展示。
"""
result = {}
for wxid, remote_url in (contact_manager.get_all_head_images() or {}).items():
if not remote_url:
result[wxid] = ""
continue
version = contact_manager.get_head_image_version(wxid)
avatar_url = f"/contacts/api/avatar/{quote(str(wxid), safe='')}"
if version:
avatar_url = f"{avatar_url}?v={version}"
result[wxid] = avatar_url
return result
# 联系人管理页面
@contacts_bp.route('/')
@login_required
def contacts_management():
"""通讯录管理页面"""
return render_template('contacts_management.html')
# API路由
@contacts_bp.route('/api/all', methods=['GET'])
@login_required
def api_contacts_all():
"""获取所有联系人信息API"""
try:
server = current_app.dashboard_server
contacts = server.contact_manager.get_contacts()
return jsonify({
"success": True,
"data": {
"contacts": contacts
}
})
except Exception as e:
logger.error(f"获取所有联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/statistics', methods=['GET'])
@login_required
def api_contacts_statistics():
"""获取联系人统计信息API"""
try:
server = current_app.dashboard_server
# 使用新的联系人分类方法获取统计信息
total, groups, personal, public, official = server.contact_manager.get_contact_statistics()
return jsonify({
"success": True,
"data": {
"total": total,
"groups": groups,
"personal": personal,
"public": public,
"official": official
}
})
except Exception as e:
logger.error(f"获取联系人统计信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/groups', methods=['GET'])
@login_required
def api_contacts_groups():
"""获取群组联系人信息API"""
try:
server = current_app.dashboard_server
group_contacts = server.contact_manager.get_group_contacts()
return jsonify({
"success": True,
"data": {
"groups": group_contacts
}
})
except Exception as e:
logger.error(f"获取群组联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/personal', methods=['GET'])
@login_required
def api_contacts_personal():
"""获取个人联系人信息API"""
try:
server = current_app.dashboard_server
personal_contacts = server.contact_manager.get_personal_contacts()
return jsonify({
"success": True,
"data": {
"personal": personal_contacts
}
})
except Exception as e:
logger.error(f"获取个人联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/official', methods=['GET'])
@login_required
def api_contacts_official():
"""获取公众号联系人信息API"""
try:
server = current_app.dashboard_server
official_accounts = server.contact_manager.get_official_accounts()
return jsonify({
"success": True,
"data": {
"official": official_accounts
}
})
except Exception as e:
logger.error(f"获取公众号联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/public', methods=['GET'])
@login_required
def api_contacts_public():
"""获取公共好友信息API"""
try:
server = current_app.dashboard_server
public_contacts = server.contact_manager.get_public_contacts()
return jsonify({
"success": True,
"data": {
"public": public_contacts
}
})
except Exception as e:
logger.error(f"获取公共好友信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/head_images', methods=['GET'])
@login_required
def api_head_images():
"""获取联系人头像信息API"""
try:
server = current_app.dashboard_server
# 后台页拿到的是“可展示地址”而不是原始远端 URL
# 这样通讯录页会优先读本地缓存,头像变化时也能自动刷新最新版本。
head_images = _build_dashboard_head_images(server.contact_manager)
return jsonify({
"success": True,
"data": {
"head_images": head_images
}
})
except Exception as e:
logger.error(f"获取联系人头像信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/avatar/<path:wxid>', methods=['GET'])
@login_required
def api_contact_avatar(wxid):
"""返回通讯录头像,本地缓存优先,远端地址兜底。"""
try:
server = current_app.dashboard_server
# 先尝试把头像补齐到本地缓存。
# 这样页面首次访问某个联系人时,也能顺手把缓存热起来。
cached_path = server.contact_manager.ensure_head_image_cached(wxid)
if cached_path and os.path.exists(cached_path):
return send_file(cached_path, conditional=True, max_age=86400)
remote_url = str(server.contact_manager.get_head_image(wxid) or "").strip()
if remote_url:
return redirect(remote_url, code=302)
return jsonify({"success": False, "error": "头像不存在"}), 404
except Exception as e:
logger.error(f"读取联系人头像失败 wxid={wxid}: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/group_members/<roomid>', methods=['GET'])
@login_required
def api_group_members(roomid):
"""获取指定群的成员列表API
Args:
roomid: 群ID
"""
try:
server = current_app.dashboard_server
group_members = server.contact_db.get_chatroom_small_member_list(roomid)
context_enabled = bool(server.member_context_service) and server.member_context_service.is_group_enabled(roomid)
if context_enabled:
contexts = server.member_context_db.list_group_member_contexts(roomid)
context_map = {item.get("wxid"): item for item in contexts}
for member in group_members:
context = context_map.get(member.get("wxid"), {})
member["activity_level"] = context.get("activity_level", "")
member["response_style_hint"] = context.get("response_style_hint", "")
member["summary_text"] = context.get("summary_text", "")
member["last_profiled_at"] = context.get("last_profiled_at", "")
else:
for member in group_members:
member["activity_level"] = ""
member["response_style_hint"] = ""
member["summary_text"] = ""
member["last_profiled_at"] = ""
return jsonify({
"success": True,
"data": {
"members": group_members,
"member_context_enabled": context_enabled
}
})
except Exception as e:
logger.error(f"获取群成员列表失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/group_profile/<roomid>', methods=['GET'])
@login_required
def api_group_profile(roomid):
"""获取指定群的资料信息(群公告、群主、管理员、成员数)"""
try:
server = current_app.dashboard_server
# 直接复用联系人库中已有身份字段,按群聚合成页面可展示的资料结构。
profile = server.contact_db.get_chatroom_profile(roomid)
return jsonify({
"success": True,
"data": profile
})
except Exception as e:
logger.error(f"获取群资料失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/group_profile/<roomid>/sync_announcement', methods=['POST'])
@login_required
def api_sync_group_announcement(roomid):
"""手动同步指定群公告(调用 /Group/GetChatRoomInfoDetail"""
try:
server = current_app.dashboard_server
if not roomid:
return jsonify({"success": False, "error": "缺少群ID"}), 400
if not getattr(server, "robot", None) or not getattr(server.robot, "ipad_bot", None):
return jsonify({"success": False, "error": "机器人实例未初始化"}), 503
async def fetch_and_merge():
# 先拉基础群信息,再拉 Detail 信息,最后合并,避免只用 Detail 导致字段不完整。
base_info = await server.robot.ipad_bot.get_chatroom_info(roomid)
detail_info = await server.robot.ipad_bot.get_chatroom_announce(roomid)
merged_info = dict(base_info or {})
detail_contact = None
if isinstance(detail_info, dict):
contact_list = detail_info.get("ContactList")
if isinstance(contact_list, list) and contact_list:
first = contact_list[0]
if isinstance(first, dict):
detail_contact = first
if detail_contact:
merged_info.update(detail_contact)
if isinstance(detail_info, dict):
merged_info.update(detail_info)
# 统一公告字段命名,供 contacts_db.save_chatroom_info 直接提取入库。
announcement = (
merged_info.get("ChatRoomAnnouncement")
or merged_info.get("Announcement")
or merged_info.get("Annoucement")
or merged_info.get("AnnouncementContent")
or merged_info.get("chatRoomAnnouncement")
)
if announcement:
merged_info["ChatRoomAnnouncement"] = announcement
# 保底补上群ID避免少数字段缺失导致无法更新到对应群。
if not merged_info.get("UserName"):
merged_info["UserName"] = roomid
return merged_info
merged_info = asyncio.run(fetch_and_merge())
if not merged_info:
return jsonify({"success": False, "error": "获取群详情失败"}), 500
save_ok = server.contact_db.save_chatroom_info(merged_info)
if not save_ok:
return jsonify({"success": False, "error": "保存群公告失败"}), 500
profile = server.contact_db.get_chatroom_profile(roomid)
return jsonify({
"success": True,
"message": "群公告同步成功",
"data": {
"profile": profile
}
})
except Exception as e:
logger.error(f"手动同步群公告失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/group_member_context/<roomid>/<wxid>', methods=['GET'])
@login_required
def api_group_member_context(roomid, wxid):
"""获取群成员交互摘要"""
try:
server = current_app.dashboard_server
if not server.member_context_service:
return jsonify({"success": False, "error": "成员交互摘要插件未加载"}), 503
if not server.member_context_service.is_group_enabled(roomid):
return jsonify({"success": False, "error": "该群未启用成员交互摘要功能"}), 403
context = server.member_context_db.get_member_context(roomid, wxid)
return jsonify({
"success": True,
"data": {
"context": context,
"ready": bool(context)
}
})
except Exception as e:
logger.error(f"获取群成员交互摘要失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/group_member_context/refresh', methods=['POST'])
@login_required
def api_refresh_group_member_context():
"""刷新群成员交互摘要"""
try:
server = current_app.dashboard_server
if not server.member_context_service:
return jsonify({"success": False, "error": "成员交互摘要插件未加载"}), 503
data = request.json or {}
roomid = data.get("roomid")
wxid = data.get("wxid")
if roomid and wxid:
if not server.member_context_service.is_group_enabled(roomid):
return jsonify({"success": False, "error": "该群未启用成员交互摘要功能"}), 403
run_member_context_refresh_in_thread(server.member_context_service.refresh_member_context, roomid, wxid)
return jsonify({"success": True, "message": "成员交互摘要刷新任务已提交"})
if roomid:
if not server.member_context_service.is_group_enabled(roomid):
return jsonify({"success": False, "error": "该群未启用成员交互摘要功能"}), 403
run_member_context_refresh_in_thread(server.member_context_service.refresh_group_contexts, roomid)
return jsonify({"success": True, "message": "本群成员交互摘要刷新任务已提交"})
run_member_context_refresh_in_thread(server.member_context_service.refresh_all_chatrooms)
return jsonify({"success": True, "message": "全量成员交互摘要刷新任务已提交"})
except Exception as e:
logger.error(f"刷新群成员交互摘要失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/update', methods=['POST'])
@login_required
def api_contacts_update():
"""更新通讯录信息API"""
try:
server = current_app.dashboard_server
# 通讯录刷新改成后台异步任务:
# 1. 远端拉联系人详情 + 群成员详情 + 头像缓存同步都可能较慢;
# 2. 若接口同步等待,会直接卡住后台请求线程,影响整体使用体验;
# 3. 这里提交任务后立刻返回,等后台线程慢慢完成刷新。
submitted = run_contacts_refresh_in_thread(server)
if submitted:
return jsonify({"success": True, "message": "通讯录更新任务已提交,请稍后手动刷新查看结果"})
return jsonify({"success": True, "message": "通讯录更新任务已在执行中,请稍后再刷新"})
except Exception as e:
logger.error(f"更新通讯录失败: {e}")
return jsonify({"success": False, "message": f"更新通讯录失败: {str(e)}"}), 500
@contacts_bp.route('/api/recent_messages', methods=['GET'])
@login_required
def api_recent_messages():
"""获取最近聊天消息"""
try:
server = current_app.dashboard_server
wxid = _safe_text(request.args.get("wxid")).strip()
chat_type = _safe_text(request.args.get("chat_type")).strip() or "personal"
limit = min(max(int(request.args.get("limit", 20)), 1), 50)
if not wxid:
return jsonify({"success": False, "message": "缺少聊天对象"}), 400
if chat_type == "group":
raw_messages = server.message_storage.get_recent_group_chat_messages(wxid, limit=limit)
history_tip = f"最近 {limit} 条群消息"
else:
raw_messages = server.message_storage.get_recent_personal_messages(wxid, limit=limit)
history_tip = f"最近 {limit} 条已归档消息(私聊历史可能不完整)"
messages = [
_normalize_recent_message(server, item, chat_type, wxid)
for item in raw_messages
]
return jsonify({
"success": True,
"data": {
"messages": messages,
"chat_type": chat_type,
"history_tip": history_tip
}
})
except Exception as e:
logger.exception(f"获取最近聊天消息失败: {e}")
return jsonify({"success": False, "message": str(e)}), 500
@contacts_bp.route('/api/emojis', methods=['GET'])
@login_required
def api_emoji_library():
"""获取已下载表情库(从历史消息聚合)。"""
try:
server = current_app.dashboard_server
limit = min(max(int(request.args.get("limit", 200)), 1), 500)
emoji_asset_db = getattr(server, "emoji_asset_db", None)
if emoji_asset_db is None:
return jsonify({"success": False, "message": "表情资产库未初始化"}), 503
emojis = emoji_asset_db.list_emoji_assets(limit=limit, require_preview=True)
if not emojis:
# 只有当持久化表还是空的时候,才对最近一小批历史消息做一次兜底回填。
# 正常运行时,新表情会在“消息归档 + 媒体补偿”阶段自动写入资产表,不需要每次接口都回扫历史。
emoji_asset_db.sync_recent_emoji_assets(limit=min(max(limit, 50), 120))
emojis = emoji_asset_db.list_emoji_assets(limit=limit, require_preview=True)
return jsonify({
"success": True,
"data": {
"emojis": emojis,
"count": len(emojis)
}
})
except Exception as e:
logger.exception(f"获取表情库失败: {e}")
return jsonify({"success": False, "message": str(e)}), 500
@contacts_bp.route('/api/send_message', methods=['POST'])
@login_required
def api_send_message():
"""发送消息API
支持的消息类型:
- text: 文本消息
- image: 图片消息
- voice: 语音消息
- video: 视频消息
- link: 链接消息
"""
try:
data = request.form if request.files else request.json
wxid = data.get('wxid')
msg_type = data.get('type')
content = data.get('content')
if not wxid or not msg_type:
return jsonify({'success': False, 'message': '缺少必要参数'})
# 获取机器人实例
server = current_app.dashboard_server
if not server or not server.client:
return jsonify({'success': False, 'message': '机器人未初始化'})
# 根据消息类型发送消息
if msg_type == 'text':
send_message_in_thread(server.client.send_text_message, wxid, content)
return jsonify({
'success': True,
'message': '消息发送中'
})
elif msg_type == 'image':
if 'file' not in request.files:
return jsonify({'success': False, 'message': '未上传文件'})
file = request.files['file']
send_message_in_thread(server.client.send_image_message, wxid, file.read())
return jsonify({
'success': True,
'message': '消息发送中'
})
elif msg_type == 'voice':
if 'file' not in request.files:
return jsonify({'success': False, 'message': '未上传文件'})
file = request.files['file']
if file.filename.endswith('.mp3'):
format_str = "mp3"
elif file.filename.endswith('.wav'):
format_str = "wav"
else:
return jsonify({
'success': False,
'message': '不支持的音频格式'
})
send_message_in_thread(server.client.send_voice_message, wxid, file.read(), format=format_str)
return jsonify({
'success': True,
'message': '消息发送中'
})
elif msg_type == 'video':
if 'file' not in request.files:
return jsonify({'success': False, 'message': '未上传文件'})
file = request.files['file']
send_message_in_thread(server.client.send_video_message, wxid, file.read())
return jsonify({
'success': True,
'message': '消息发送中'
})
elif msg_type == 'link':
url = content.get('url')
title = content.get('title', '')
description = content.get('description', '')
send_message_in_thread(server.client.send_link_message, wxid, url, title, description)
return jsonify({
'success': True,
'message': '消息发送中'
})
elif msg_type == 'emoji':
if not isinstance(content, dict):
return jsonify({'success': False, 'message': '表情参数格式错误'})
# 表情发送必须同时具备 md5 和 total_length。
# 当前前端有时只拿得到 md5因此这里优先使用请求体里的长度
# 拿不到时再去历史消息表里反查,避免“参数明明看起来对,但接口还是发不出去”。
md5, total_length = _resolve_emoji_send_meta(
getattr(server, "message_storage", None),
content.get('md5'),
content.get('total_length') or content.get('totalLength') or content.get('len')
)
if not md5:
return jsonify({'success': False, 'message': '表情 md5 格式不正确'})
if total_length <= 0:
return jsonify({'success': False, 'message': '该表情缺少 total_length无法仅凭 md5 发送'})
# 表情发送改为和文本/图片一致的异步提交通道,避免 HTTP 请求线程
# 同步等待队列结果导致“高概率卡住”的体验问题。
logger.info(f"提交表情发送任务 wxid={wxid} md5={md5} total_length={total_length}")
send_message_in_thread(server.client.send_emoji_message, wxid, md5, total_length)
return jsonify({
'success': True,
'message': '表情消息已提交到 iPad 通道',
'data': {
'md5': md5,
'total_length': total_length
}
})
else:
return jsonify({'success': False, 'message': '不支持的消息类型'})
except Exception as e:
logger.exception(f"发送消息失败: {e}")
return jsonify({'success': False, 'message': str(e)}), 500