Files
abot/admin/dashboard/blueprints/trendradar_webhook.py
liuwei ace2af7dba feat(webhook): 新增TrendRadar webhook适配入口
- 新增 /webhook/trendradar 接口,支持TrendRadar通用Webhook推送

- 支持 token 校验、默认目标群配置、可选payload覆盖目标群

- 将Webhook蓝图注册到Dashboard服务,并补充配置项

- 新增对接说明文档,提供TrendRadar环境变量模板示例
2026-04-21 16:27:55 +08:00

210 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
TrendRadar Webhook 适配蓝图。
设计目标:
1. 提供一个无需登录态的公开 webhook 接口,供 TrendRadar 推送热点内容;
2. 兼容 TrendRadar Generic Webhook 常见字段title/content以及常见别名
3. 将接收到的内容转发到配置的微信群,支持按 payload 指定目标群(可开关)。
"""
import asyncio
import json
import threading
from typing import Any, Dict, List, Tuple
from flask import Blueprint, current_app, jsonify, request
from loguru import logger
# 独立 webhook 路由,避免和后台管理接口混在一起。
trendradar_webhook_bp = Blueprint("trendradar_webhook", __name__, url_prefix="/webhook")
# 使用独立事件循环在线程中发送消息,避免阻塞 Flask 请求线程。
_shared_loop = None
_loop_lock = threading.Lock()
def _get_or_create_loop() -> asyncio.AbstractEventLoop:
"""获取或创建共享事件循环。"""
global _shared_loop
with _loop_lock:
if _shared_loop is None:
_shared_loop = asyncio.new_event_loop()
def _run_loop():
asyncio.set_event_loop(_shared_loop)
_shared_loop.run_forever()
t = threading.Thread(target=_run_loop, daemon=True, name="trendradar_webhook_loop")
t.start()
return _shared_loop
def _parse_payload() -> Dict[str, Any]:
"""解析请求负载,优先 JSON兼容 form 与原始文本。"""
payload = request.get_json(silent=True)
if isinstance(payload, dict):
return payload
if request.form:
return {k: v for k, v in request.form.items()}
raw_text = (request.get_data(cache=False, as_text=True) or "").strip()
if not raw_text:
return {}
try:
data = json.loads(raw_text)
if isinstance(data, dict):
return data
except Exception:
pass
return {"raw_text": raw_text}
def _extract_title_content(payload: Dict[str, Any]) -> Tuple[str, str]:
"""提取标题与正文,兼容多种字段写法。"""
title = str(
payload.get("title")
or payload.get("subject")
or (payload.get("data") or {}).get("title")
or "TrendRadar 热点推送"
).strip()
content = str(
payload.get("content")
or payload.get("text")
or payload.get("message")
or payload.get("summary")
or (payload.get("data") or {}).get("content")
or ""
).strip()
# 若未提取到标准字段,保底输出完整 payload避免静默丢消息。
if not content:
content = json.dumps(payload, ensure_ascii=False, indent=2)
return title, content
def _extract_target_groups(cfg: Dict[str, Any], payload: Dict[str, Any]) -> List[str]:
"""提取目标群列表。
优先级:
1. 当 allow_payload_target_groups = true 时,允许 payload 覆盖目标群;
2. 否则使用配置 default_group_ids。
"""
targets: List[str] = []
allow_payload = bool(cfg.get("allow_payload_target_groups", False))
if allow_payload:
raw_payload_targets = (
payload.get("target_group_ids")
or payload.get("group_ids")
or payload.get("target_group_id")
or payload.get("group_id")
or payload.get("roomid")
)
if isinstance(raw_payload_targets, str):
targets.extend([x.strip() for x in raw_payload_targets.split(",") if x.strip()])
elif isinstance(raw_payload_targets, list):
targets.extend([str(x).strip() for x in raw_payload_targets if str(x).strip()])
if not targets:
raw_cfg_targets = cfg.get("default_group_ids", [])
if isinstance(raw_cfg_targets, str):
targets.extend([x.strip() for x in raw_cfg_targets.split(",") if x.strip()])
elif isinstance(raw_cfg_targets, list):
targets.extend([str(x).strip() for x in raw_cfg_targets if str(x).strip()])
# 去重并保持顺序。
seen = set()
result: List[str] = []
for gid in targets:
if gid in seen:
continue
seen.add(gid)
result.append(gid)
return result
def _build_wechat_text(title: str, content: str, payload: Dict[str, Any]) -> str:
"""构造转发到微信群的文本。"""
source = str(payload.get("source") or payload.get("platform") or "TrendRadar").strip()
lines = [
"📡 TrendRadar 热点播报",
f"来源:{source}",
f"标题:{title}",
"",
content,
]
return "\n".join(lines).strip()
def _check_token(cfg: Dict[str, Any], payload: Dict[str, Any]) -> bool:
"""校验 webhook token。"""
expected = str(cfg.get("token", "") or "").strip()
if not expected:
return True
provided = str(
request.headers.get("X-Webhook-Token")
or request.args.get("token")
or payload.get("token")
or ""
).strip()
return provided == expected
@trendradar_webhook_bp.route("/trendradar", methods=["POST"])
def trendradar_webhook():
"""TrendRadar Webhook 接口。"""
try:
dashboard_server = current_app.dashboard_server
cfg = (dashboard_server.config or {}).get("trendradar_webhook", {})
if not bool(cfg.get("enabled", False)):
return jsonify({"success": False, "message": "trendradar webhook 未启用"}), 403
payload = _parse_payload()
if not _check_token(cfg, payload):
return jsonify({"success": False, "message": "token 校验失败"}), 401
title, content = _extract_title_content(payload)
target_groups = _extract_target_groups(cfg, payload)
if not target_groups:
return jsonify({"success": False, "message": "未配置目标群"}), 400
text = _build_wechat_text(title, content, payload)
loop = _get_or_create_loop()
sent_groups: List[str] = []
failed_groups: Dict[str, str] = {}
async def _send_once(group_id: str):
# sender 传空字符串即可,保持与现有插件调用风格一致。
await dashboard_server.client.send_text_message(group_id, text, "")
timeout_seconds = int(cfg.get("send_timeout_seconds", 20))
for group_id in target_groups:
try:
fut = asyncio.run_coroutine_threadsafe(_send_once(group_id), loop)
fut.result(timeout=max(timeout_seconds, 5))
sent_groups.append(group_id)
except Exception as e:
failed_groups[group_id] = str(e)
logger.info(
f"[TrendRadarWebhook] 接收推送: title={title}, targets={target_groups}, "
f"sent={len(sent_groups)}, failed={len(failed_groups)}"
)
return jsonify(
{
"success": len(failed_groups) == 0,
"title": title,
"sent_groups": sent_groups,
"failed_groups": failed_groups,
}
)
except Exception as e:
logger.error(f"[TrendRadarWebhook] 处理失败: {e}")
return jsonify({"success": False, "message": str(e)}), 500