- 新建 trendradar_permission 空插件,仅注册 TRENDRADAR_WEBHOOK 功能用于后台群级开关 - webhook 发送前强制校验群权限,未开启群加入 blocked_groups 并拦截 - 更新对接文档,补充权限开关的启用步骤与返回字段说明
244 lines
8.4 KiB
Python
244 lines
8.4 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
TrendRadar Webhook 适配蓝图。
|
||
|
||
设计目标:
|
||
1. 提供一个无需登录态的公开 webhook 接口,供 TrendRadar 推送热点内容;
|
||
2. 兼容 TrendRadar Generic Webhook 常见字段(title/content)以及常见别名;
|
||
3. 将接收到的内容转发到配置的微信群,支持按 payload 指定目标群(可开关)。
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import threading
|
||
from typing import Any, Dict, List, Tuple
|
||
|
||
from flask import Blueprint, current_app, jsonify, request
|
||
from loguru import logger
|
||
from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus
|
||
|
||
|
||
# 独立 webhook 路由,避免和后台管理接口混在一起。
|
||
trendradar_webhook_bp = Blueprint("trendradar_webhook", __name__, url_prefix="/webhook")
|
||
|
||
|
||
# 使用独立事件循环在线程中发送消息,避免阻塞 Flask 请求线程。
|
||
_shared_loop = None
|
||
_loop_lock = threading.Lock()
|
||
|
||
|
||
def _get_or_create_loop() -> asyncio.AbstractEventLoop:
|
||
"""获取或创建共享事件循环。"""
|
||
global _shared_loop
|
||
with _loop_lock:
|
||
if _shared_loop is None:
|
||
_shared_loop = asyncio.new_event_loop()
|
||
|
||
def _run_loop():
|
||
asyncio.set_event_loop(_shared_loop)
|
||
_shared_loop.run_forever()
|
||
|
||
t = threading.Thread(target=_run_loop, daemon=True, name="trendradar_webhook_loop")
|
||
t.start()
|
||
return _shared_loop
|
||
|
||
|
||
def _parse_payload() -> Dict[str, Any]:
|
||
"""解析请求负载,优先 JSON,兼容 form 与原始文本。"""
|
||
payload = request.get_json(silent=True)
|
||
if isinstance(payload, dict):
|
||
return payload
|
||
|
||
if request.form:
|
||
return {k: v for k, v in request.form.items()}
|
||
|
||
raw_text = (request.get_data(cache=False, as_text=True) or "").strip()
|
||
if not raw_text:
|
||
return {}
|
||
try:
|
||
data = json.loads(raw_text)
|
||
if isinstance(data, dict):
|
||
return data
|
||
except Exception:
|
||
pass
|
||
return {"raw_text": raw_text}
|
||
|
||
|
||
def _extract_title_content(payload: Dict[str, Any]) -> Tuple[str, str]:
|
||
"""提取标题与正文,兼容多种字段写法。"""
|
||
title = str(
|
||
payload.get("title")
|
||
or payload.get("subject")
|
||
or (payload.get("data") or {}).get("title")
|
||
or "TrendRadar 热点推送"
|
||
).strip()
|
||
|
||
content = str(
|
||
payload.get("content")
|
||
or payload.get("text")
|
||
or payload.get("message")
|
||
or payload.get("summary")
|
||
or (payload.get("data") or {}).get("content")
|
||
or ""
|
||
).strip()
|
||
|
||
# 若未提取到标准字段,保底输出完整 payload,避免静默丢消息。
|
||
if not content:
|
||
content = json.dumps(payload, ensure_ascii=False, indent=2)
|
||
return title, content
|
||
|
||
|
||
def _extract_target_groups(cfg: Dict[str, Any], payload: Dict[str, Any]) -> List[str]:
|
||
"""提取目标群列表。
|
||
|
||
优先级:
|
||
1. 当 allow_payload_target_groups = true 时,允许 payload 覆盖目标群;
|
||
2. 否则使用配置 default_group_ids。
|
||
"""
|
||
targets: List[str] = []
|
||
allow_payload = bool(cfg.get("allow_payload_target_groups", False))
|
||
|
||
if allow_payload:
|
||
raw_payload_targets = (
|
||
payload.get("target_group_ids")
|
||
or payload.get("group_ids")
|
||
or payload.get("target_group_id")
|
||
or payload.get("group_id")
|
||
or payload.get("roomid")
|
||
)
|
||
if isinstance(raw_payload_targets, str):
|
||
targets.extend([x.strip() for x in raw_payload_targets.split(",") if x.strip()])
|
||
elif isinstance(raw_payload_targets, list):
|
||
targets.extend([str(x).strip() for x in raw_payload_targets if str(x).strip()])
|
||
|
||
if not targets:
|
||
raw_cfg_targets = cfg.get("default_group_ids", [])
|
||
if isinstance(raw_cfg_targets, str):
|
||
targets.extend([x.strip() for x in raw_cfg_targets.split(",") if x.strip()])
|
||
elif isinstance(raw_cfg_targets, list):
|
||
targets.extend([str(x).strip() for x in raw_cfg_targets if str(x).strip()])
|
||
|
||
# 去重并保持顺序。
|
||
seen = set()
|
||
result: List[str] = []
|
||
for gid in targets:
|
||
if gid in seen:
|
||
continue
|
||
seen.add(gid)
|
||
result.append(gid)
|
||
return result
|
||
|
||
|
||
def _build_wechat_text(title: str, content: str, payload: Dict[str, Any]) -> str:
|
||
"""构造转发到微信群的文本。"""
|
||
source = str(payload.get("source") or payload.get("platform") or "TrendRadar").strip()
|
||
lines = [
|
||
"📡 TrendRadar 热点播报",
|
||
f"来源:{source}",
|
||
f"标题:{title}",
|
||
"",
|
||
content,
|
||
]
|
||
return "\n".join(lines).strip()
|
||
|
||
|
||
def _filter_groups_by_permission(groups: List[str]) -> Tuple[List[str], List[str]]:
|
||
"""按 TrendRadar 功能权限过滤目标群。
|
||
|
||
返回:
|
||
1. allowed_groups: 有权限可发送的群
|
||
2. blocked_groups: 未开启权限被拦截的群
|
||
"""
|
||
feature = Feature.get_feature("TRENDRADAR_WEBHOOK")
|
||
# 若功能尚未注册,按“全部拦截”处理,避免误发;同时给出明确告警。
|
||
if not feature:
|
||
logger.warning("[TrendRadarWebhook] 未发现 TRENDRADAR_WEBHOOK 功能注册,当前请求将被拦截")
|
||
return [], list(groups)
|
||
|
||
allowed: List[str] = []
|
||
blocked: List[str] = []
|
||
for gid in groups:
|
||
status = GroupBotManager.get_group_permission(gid, feature)
|
||
if status == PermissionStatus.ENABLED:
|
||
allowed.append(gid)
|
||
else:
|
||
blocked.append(gid)
|
||
return allowed, blocked
|
||
|
||
|
||
def _check_token(cfg: Dict[str, Any], payload: Dict[str, Any]) -> bool:
|
||
"""校验 webhook token。"""
|
||
expected = str(cfg.get("token", "") or "").strip()
|
||
if not expected:
|
||
return True
|
||
|
||
provided = str(
|
||
request.headers.get("X-Webhook-Token")
|
||
or request.args.get("token")
|
||
or payload.get("token")
|
||
or ""
|
||
).strip()
|
||
return provided == expected
|
||
|
||
|
||
@trendradar_webhook_bp.route("/trendradar", methods=["POST"])
|
||
def trendradar_webhook():
|
||
"""TrendRadar Webhook 接口。"""
|
||
try:
|
||
dashboard_server = current_app.dashboard_server
|
||
cfg = (dashboard_server.config or {}).get("trendradar_webhook", {})
|
||
if not bool(cfg.get("enabled", False)):
|
||
return jsonify({"success": False, "message": "trendradar webhook 未启用"}), 403
|
||
|
||
payload = _parse_payload()
|
||
if not _check_token(cfg, payload):
|
||
return jsonify({"success": False, "message": "token 校验失败"}), 401
|
||
|
||
title, content = _extract_title_content(payload)
|
||
target_groups = _extract_target_groups(cfg, payload)
|
||
if not target_groups:
|
||
return jsonify({"success": False, "message": "未配置目标群"}), 400
|
||
allowed_groups, blocked_groups = _filter_groups_by_permission(target_groups)
|
||
if not allowed_groups:
|
||
return jsonify(
|
||
{
|
||
"success": False,
|
||
"message": "目标群未开启 TrendRadar webhook 权限",
|
||
"blocked_groups": blocked_groups,
|
||
}
|
||
), 403
|
||
|
||
text = _build_wechat_text(title, content, payload)
|
||
loop = _get_or_create_loop()
|
||
sent_groups: List[str] = []
|
||
failed_groups: Dict[str, str] = {}
|
||
|
||
async def _send_once(group_id: str):
|
||
# sender 传空字符串即可,保持与现有插件调用风格一致。
|
||
await dashboard_server.client.send_text_message(group_id, text, "")
|
||
|
||
timeout_seconds = int(cfg.get("send_timeout_seconds", 20))
|
||
for group_id in allowed_groups:
|
||
try:
|
||
fut = asyncio.run_coroutine_threadsafe(_send_once(group_id), loop)
|
||
fut.result(timeout=max(timeout_seconds, 5))
|
||
sent_groups.append(group_id)
|
||
except Exception as e:
|
||
failed_groups[group_id] = str(e)
|
||
|
||
logger.info(
|
||
f"[TrendRadarWebhook] 接收推送: title={title}, targets={target_groups}, "
|
||
f"sent={len(sent_groups)}, failed={len(failed_groups)}"
|
||
)
|
||
return jsonify(
|
||
{
|
||
"success": len(failed_groups) == 0,
|
||
"title": title,
|
||
"sent_groups": sent_groups,
|
||
"blocked_groups": blocked_groups,
|
||
"failed_groups": failed_groups,
|
||
}
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"[TrendRadarWebhook] 处理失败: {e}")
|
||
return jsonify({"success": False, "message": str(e)}), 500
|