diff --git a/admin/dashboard/templates/index.html b/admin/dashboard/templates/index.html index ab428e9..f3f8c65 100644 --- a/admin/dashboard/templates/index.html +++ b/admin/dashboard/templates/index.html @@ -14,7 +14,7 @@
- 用户头像 + 用户头像
diff --git a/plugins/message_summary/main.py b/plugins/message_summary/main.py index 25fcab5..e123954 100644 --- a/plugins/message_summary/main.py +++ b/plugins/message_summary/main.py @@ -106,7 +106,7 @@ class MessageSummaryPlugin(MessagePluginInterface): @plugin_stats_decorator(plugin_name="群聊总结") @plugin_points_cost(10, "群聊总结消耗积分", FEATURE_KEY) - @group_feature_rate_limit(max_per_minute=1, feature_key=FEATURE_KEY) + @group_feature_rate_limit(max_per_minute=30, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" try: diff --git a/plugins/openclaw/__init__.py b/plugins/openclaw/__init__.py new file mode 100644 index 0000000..11a9ba1 --- /dev/null +++ b/plugins/openclaw/__init__.py @@ -0,0 +1,7 @@ +# Import OpenClawPlugin from the current package +from .main import OpenClawPlugin + + +def get_plugin(): + """Return plugin instance.""" + return OpenClawPlugin() diff --git a/plugins/openclaw/config.toml b/plugins/openclaw/config.toml new file mode 100644 index 0000000..89bed99 --- /dev/null +++ b/plugins/openclaw/config.toml @@ -0,0 +1,18 @@ +[OpenClaw] +enable = true + +# Command prefix that triggers forwarding to OpenClaw +command_prefix = "龙虾>" + +# OpenClaw inbound webhook (forward WeChat messages to OpenClaw) +webhook_url = "http://192.168.2.142:18789/wechat_abot/webhook" +webhook_secret = "change_me" +webhook_timeout = 5 + +# HTTP server for OpenClaw to call back (send messages to WeChat) +server_host = "0.0.0.0" +server_port = 18790 +bot_token = "change_me" + +# Optional IP allowlist for the callback server (empty = allow all) +allowed_ips = [] diff --git a/plugins/openclaw/main.py b/plugins/openclaw/main.py new file mode 100644 index 0000000..7a7d4b8 --- /dev/null +++ b/plugins/openclaw/main.py @@ -0,0 +1,300 @@ +# -*- coding: utf-8 -*- +import asyncio +import hmac +import hashlib +import json +import logging +import os +import threading +import time +from collections import deque +from typing import Dict, Any, Tuple, Optional, List + +import requests +from flask import Flask, jsonify, request +from loguru import logger +from werkzeug.serving import make_server + +from base.plugin_common.message_plugin_interface import MessagePluginInterface +from base.plugin_common.plugin_interface import PluginStatus + + +class OpenClawPlugin(MessagePluginInterface): + """OpenClaw webhook bridge plugin.""" + + FEATURE_KEY = "OPENCLAW" + FEATURE_DESCRIPTION = "🦞 OpenClaw 指令转发 [龙虾>]" + + @property + def name(self) -> str: + return "OpenClaw" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "将指定指令消息转发到 OpenClaw,并提供回调接口发送微信消息" + + @property + def author(self) -> str: + return "liu.wei" + + @property + def command_prefix(self) -> Optional[str]: + return self._command_prefix + + @property + def feature_key(self) -> Optional[str]: + return self.FEATURE_KEY + + @property + def feature_description(self) -> Optional[str]: + return self.FEATURE_DESCRIPTION + + def __init__(self): + super().__init__() + self.feature = self.register_feature() + self._command_prefix = "龙虾>" + self._recent_ids = deque(maxlen=200) + self._recent_ids_set = set() + self._recent_lock = threading.Lock() + + self._server = None + self._server_thread = None + self._app = None + self._warned_no_token = False + + def initialize(self, context: Dict[str, Any]) -> bool: + """Initialize plugin.""" + self.LOG = logger + cfg = self._config.get("OpenClaw", {}) + + self.enable = cfg.get("enable", True) + self._command_prefix = cfg.get("command_prefix", "龙虾>") + + self.webhook_url = os.getenv( + "OPENCLAW_WEBHOOK_URL", + cfg.get("webhook_url", "http://127.0.0.1:18789/wechat_abot/webhook"), + ) + self.webhook_secret = os.getenv( + "OPENCLAW_WEBHOOK_SECRET", + cfg.get("webhook_secret", ""), + ) + self.webhook_timeout = cfg.get("webhook_timeout", 5) + + self.server_host = cfg.get("server_host", "0.0.0.0") + self.server_port = cfg.get("server_port", 18790) + self.bot_token = os.getenv("OPENCLAW_BOT_TOKEN", cfg.get("bot_token", "")) + self.allowed_ips = cfg.get("allowed_ips", []) + + self._app = self._create_app() + self.LOG.debug(f"[{self.name}] Initialized with webhook_url={self.webhook_url}") + return True + + def start(self) -> bool: + """Start plugin.""" + if not self._app: + self.LOG.error(f"[{self.name}] Flask app not initialized") + self.status = PluginStatus.ERROR + return False + + if self._server_thread and self._server_thread.is_alive(): + self.status = PluginStatus.RUNNING + return True + + self._server_thread = threading.Thread(target=self._run_server, daemon=True) + self._server_thread.start() + self.status = PluginStatus.RUNNING + self.LOG.debug(f"[{self.name}] Plugin started") + return True + + def stop(self) -> bool: + """Stop plugin.""" + if self._server: + try: + self._server.shutdown() + except Exception as e: + self.LOG.error(f"[{self.name}] Server shutdown error: {e}") + + self.status = PluginStatus.STOPPED + self.LOG.info(f"[{self.name}] Plugin stopped") + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + if not self.enable: + return False + + content = str(message.get("content", "")).strip() + return content.startswith(self._command_prefix) + + async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """Forward message to OpenClaw when command is triggered.""" + try: + content = str(message.get("content", "")).strip() + if not content.startswith(self._command_prefix): + return False, None + + full_msg = message.get("full_wx_msg") + raw_data = getattr(full_msg, "raw_data", None) + if not isinstance(raw_data, dict): + self.LOG.warning(f"[{self.name}] Missing raw_data, fallback to message dict") + raw_data = {"content": content} + + msg_id = self._extract_message_id(raw_data) + if msg_id and self._is_duplicate(msg_id): + self.LOG.info(f"[{self.name}] Duplicate message ignored: {msg_id}") + return True, "duplicate" + + raw_body = json.dumps( + raw_data, + ensure_ascii=False, + separators=(",", ":"), + sort_keys=False, + ).encode("utf-8") + + ok, info = await asyncio.to_thread(self._post_to_openclaw, raw_body) + if ok: + if msg_id: + self._mark_recent(msg_id) + return True, "forwarded" + + return False, info + except Exception as e: + self.LOG.error(f"[{self.name}] Process message error: {e}") + return False, f"error: {e}" + + def _create_app(self) -> Flask: + app = Flask(__name__) + + # Reduce werkzeug logs + logging.getLogger("werkzeug").setLevel(logging.ERROR) + + @app.get("/healthz") + def healthz(): + return jsonify({"ok": True, "name": self.name, "ts": int(time.time())}) + + @app.post("/api/messages/send") + def api_messages_send(): + if not self._ip_allowed(request): + return jsonify({"error": "forbidden"}), 403 + + auth = request.headers.get("Authorization", "") + if not self._verify_bearer(auth): + return jsonify({"error": "unauthorized"}), 401 + + data = request.get_json(silent=True) or {} + to = data.get("to") + text = data.get("text") or data.get("content") + reply_to = data.get("replyToMessageId") + + if not to or not text: + return jsonify({"error": "missing to/text"}), 400 + + if not getattr(self, "bot", None) or not getattr(self.bot, "wxid", ""): + return jsonify({"error": "bot_not_ready"}), 503 + + at_user = data.get("at") or data.get("at_user") or data.get("atUser") or "" + ok, message_id = self._send_text_via_wechat_api(to, text, at_user) + if not ok: + return jsonify({"error": "send_failed", "detail": message_id}), 502 + + if reply_to: + self.LOG.debug(f"[{self.name}] replyToMessageId ignored: {reply_to}") + + return jsonify({"messageId": message_id}), 200 + + return app + + def _run_server(self): + self.LOG.info(f"[{self.name}] Webhook server starting: {self.server_host}:{self.server_port}") + try: + self._server = make_server(self.server_host, self.server_port, self._app) + self._server.serve_forever() + except Exception as e: + self.LOG.error(f"[{self.name}] Webhook server error: {e}") + + def _post_to_openclaw(self, raw_body: bytes) -> Tuple[bool, str]: + headers = {"Content-Type": "application/json"} + if self.webhook_secret is not None: + signature = self._sign_sha256(raw_body, self.webhook_secret) + headers["X-Wechat-Abot-Signature"] = f"sha256={signature}" + + try: + resp = requests.post( + self.webhook_url, + data=raw_body, + headers=headers, + timeout=self.webhook_timeout, + ) + if resp.status_code >= 200 and resp.status_code < 300: + return True, f"ok:{resp.status_code}" + return False, f"status:{resp.status_code}" + except requests.RequestException as e: + self.LOG.error(f"[{self.name}] Webhook request failed: {e}") + return False, f"request_error:{e}" + + def _send_text_via_wechat_api(self, to: str, text: str, at_user: Any = "") -> Tuple[bool, str]: + """Use existing bot API to send message.""" + if not getattr(self, "bot", None): + return False, "bot_not_ready" + + try: + client_msg_id, create_time, new_msg_id = asyncio.run( + self.bot.send_text_message(to, text, at_user) + ) + message_id = client_msg_id or new_msg_id or create_time + return True, str(message_id) + except Exception as e: + self.LOG.error(f"[{self.name}] bot.send_text_message error: {e}") + return False, str(e) + + def _verify_bearer(self, auth_header: str) -> bool: + if not self.bot_token: + # If no token configured, allow (but log once) + if not self._warned_no_token: + self.LOG.warning(f"[{self.name}] bot_token not set, allowing all requests") + self._warned_no_token = True + return True + if not auth_header or not auth_header.startswith("Bearer "): + return False + token = auth_header[7:].strip() + return hmac.compare_digest(token, self.bot_token) + + def _ip_allowed(self, req) -> bool: + if not self.allowed_ips: + return True + # Try X-Forwarded-For first + forwarded = req.headers.get("X-Forwarded-For", "") + if forwarded: + ip = forwarded.split(",")[0].strip() + else: + ip = req.remote_addr + return ip in self.allowed_ips + + @staticmethod + def _sign_sha256(raw_body: bytes, secret: str) -> str: + return hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest() + + @staticmethod + def _extract_message_id(raw_data: Dict[str, Any]) -> Optional[str]: + for key in ("MsgId", "NewMsgId", "ClientMsgId", "clientMsgId", "newMsgId"): + value = raw_data.get(key) + if value: + return str(value) + return None + + def _is_duplicate(self, msg_id: str) -> bool: + with self._recent_lock: + return msg_id in self._recent_ids_set + + def _mark_recent(self, msg_id: str): + with self._recent_lock: + if msg_id in self._recent_ids_set: + return + if len(self._recent_ids) >= self._recent_ids.maxlen: + old = self._recent_ids.popleft() + self._recent_ids_set.discard(old) + self._recent_ids.append(msg_id) + self._recent_ids_set.add(msg_id) diff --git a/plugins/openclaw/webhook.MD b/plugins/openclaw/webhook.MD new file mode 100644 index 0000000..91b20f4 --- /dev/null +++ b/plugins/openclaw/webhook.MD @@ -0,0 +1,126 @@ +你要的 IM实现逻辑清单(按这个做就能稳定接): + +入站推送到 OpenClaw +调用:POST /wechat_abot/webhook +Body:直接发你现在 message.py 的原始 JSON +Header:X-Wechat-Abot-Signature: sha256= +超时建议:3~5s;失败重试(指数退避) +签名实现(必须) +算法:HMAC-SHA256 +输入:HTTP原始body字节(不要重排JSON) +secret:和 channels.wechat_abot.webhookSecret 一致 +防重放:建议带时间戳并限制窗口(可选增强) +出站发送接口(你IM提供) +POST /messages/send +Header:Authorization: Bearer +入参:{ to, text, replyToMessageId? } +返回:{ messageId } +错误码要清晰(401/403/429/5xx) +幂等与去重 +入站按 MsgId/NewMsgId 去重(你侧做一次,插件侧也做一次更稳) +出站按你生成的 requestId/messageId 防重复发送 +群聊处理 +保持原始 Content.string(sender:\n正文)格式即可 +插件会自动提取真实发送者并清理前缀 +群里是否触发可在你侧先做过滤(如必须@机器人) +可靠性 +出站接口支持重试(网络错误/502/503) +429 限流要返回 retry_after +全链路打日志:eventId、sender、conversationId、status、耗时 +安全 +webhook 只允许你的 IM 源IP(可选) +token/secret 不写死代码,走环境变量 +关闭调试日志里的敏感字段(token、签名原文) + + + + +可以,给你一份 Python 直连接口参考逻辑(入站推 OpenClaw + 出站发送接口),你按自己项目替换发送实现就行。 + +import os +import json +import hmac +import hashlib +import requests +from flask import Flask, request, jsonify + +app = Flask(__name__) + +# ===== 配置 ===== +OPENCLAW_WEBHOOK_URL = os.getenv("OPENCLAW_WEBHOOK_URL", "http://127.0.0.1:18789/wechat_abot/webhook") +OPENCLAW_WEBHOOK_SECRET = os.getenv("OPENCLAW_WEBHOOK_SECRET", "change_me") + +IM_BOT_TOKEN = os.getenv("IM_BOT_TOKEN", "change_me") # 给 OpenClaw 插件鉴权用 +# 你自己的 IM SDK/client 在这里初始化 +# im_client = YourIMClient(...) + +def sign_sha256(raw_body: bytes, secret: str) -> str: +return hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest() + +def verify_bearer(auth_header: str, expected_token: str) -> bool: +if not auth_header or not auth_header.startswith("Bearer "): +return False +token = auth_header[7:].strip() +return hmac.compare_digest(token, expected_token) + +# 1) 你 IM 收到消息后的入口(示例) +@app.post("/im/inbound") +def im_inbound(): +# 这里 body 就是你 message.py 能解析的原始 JSON +raw = request.get_data() # 原始字节,签名必须用它 +if not raw: +return jsonify({"error": "empty body"}), 400 + +# 直接转发给 OpenClaw 插件 webhook +sig = sign_sha256(raw, OPENCLAW_WEBHOOK_SECRET) +headers = { +"Content-Type": "application/json", +"X-Wechat-Abot-Signature": f"sha256={sig}", +} + +try: +resp = requests.post( +OPENCLAW_WEBHOOK_URL, +data=raw, +headers=headers, +timeout=5 +) +# 建议你自己加重试策略(429/5xx) +return jsonify({ +"ok": resp.ok, +"status": resp.status_code, +"body": resp.json() if "application/json" in resp.headers.get("Content-Type", "") else resp.text +}), 200 +except requests.RequestException as e: +return jsonify({"ok": False, "error": str(e)}), 502 + +# 2) 给 OpenClaw 插件调用的发送接口 +@app.post("/api/messages/send") +def api_messages_send(): +# 鉴权:对应 channels.wechat_abot.botToken +auth = request.headers.get("Authorization", "") +if not verify_bearer(auth, IM_BOT_TOKEN): +return jsonify({"error": "unauthorized"}), 401 + +data = request.get_json(silent=True) or {} +to = data.get("to") +text = data.get("text", "") +reply_to = data.get("replyToMessageId") + +if not to or not text: +return jsonify({"error": "missing to/text"}), 400 + +# TODO: 这里换成你真实 IM 发送逻辑 +# result = im_client.send_text(to=to, text=text, reply_to=reply_to) +# message_id = result["message_id"] + +message_id = f"out_{hashlib.md5((to + text).encode()).hexdigest()[:12]}" # demo +return jsonify({"messageId": message_id}), 200 + + +if __name__ == "__main__": +app.run(host="0.0.0.0", port=8088, debug=False) +你只需要改两处: + +/im/inbound:挂到你现有消息接收流程里 +/api/messages/send:替换成你真实 IM 发消息函数 \ No newline at end of file