# -*- coding: utf-8 -*- import asyncio import hmac import hashlib import json import logging import os import threading import time from collections import deque from typing import Dict, Any, Tuple, Optional, List import requests from flask import Flask, jsonify, request from loguru import logger from werkzeug.serving import make_server from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus class OpenClawPlugin(MessagePluginInterface): """OpenClaw webhook bridge plugin.""" FEATURE_KEY = "OPENCLAW" FEATURE_DESCRIPTION = "🦞 OpenClaw 指令转发 [龙虾>]" @property def name(self) -> str: return "OpenClaw" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "将指定指令消息转发到 OpenClaw,并提供回调接口发送微信消息" @property def author(self) -> str: return "liu.wei" @property def command_prefix(self) -> Optional[str]: return self._command_prefix @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.feature = self.register_feature() self._command_prefix = "龙虾>" self._recent_ids = deque(maxlen=200) self._recent_ids_set = set() self._recent_lock = threading.Lock() self._server = None self._server_thread = None self._app = None self._warned_no_token = False def initialize(self, context: Dict[str, Any]) -> bool: """Initialize plugin.""" self.LOG = logger cfg = self._config.get("OpenClaw", {}) self.enable = cfg.get("enable", True) self._command_prefix = cfg.get("command_prefix", "龙虾>") self.webhook_url = os.getenv( "OPENCLAW_WEBHOOK_URL", cfg.get("webhook_url", "http://127.0.0.1:18789/wechat_abot/webhook"), ) self.webhook_secret = os.getenv( "OPENCLAW_WEBHOOK_SECRET", cfg.get("webhook_secret", ""), ) self.webhook_timeout = cfg.get("webhook_timeout", 5) self.server_host = cfg.get("server_host", "0.0.0.0") self.server_port = cfg.get("server_port", 18790) self.bot_token = os.getenv("OPENCLAW_BOT_TOKEN", cfg.get("bot_token", "")) self.allowed_ips = cfg.get("allowed_ips", []) self._app = self._create_app() self.LOG.debug(f"[{self.name}] Initialized with webhook_url={self.webhook_url}") return True def start(self) -> bool: """Start plugin.""" if not self._app: self.LOG.error(f"[{self.name}] Flask app not initialized") self.status = PluginStatus.ERROR return False if self._server_thread and self._server_thread.is_alive(): self.status = PluginStatus.RUNNING return True self._server_thread = threading.Thread(target=self._run_server, daemon=True) self._server_thread.start() self.status = PluginStatus.RUNNING self.LOG.debug(f"[{self.name}] Plugin started") return True def stop(self) -> bool: """Stop plugin.""" if self._server: try: self._server.shutdown() except Exception as e: self.LOG.error(f"[{self.name}] Server shutdown error: {e}") self.status = PluginStatus.STOPPED self.LOG.info(f"[{self.name}] Plugin stopped") return True def can_process(self, message: Dict[str, Any]) -> bool: if not self.enable: return False content = str(message.get("content", "")).strip() return content.startswith(self._command_prefix) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """Forward message to OpenClaw when command is triggered.""" try: content = str(message.get("content", "")).strip() if not content.startswith(self._command_prefix): return False, None full_msg = message.get("full_wx_msg") raw_data = getattr(full_msg, "raw_data", None) if not isinstance(raw_data, dict): self.LOG.warning(f"[{self.name}] Missing raw_data, fallback to message dict") raw_data = {"content": content} msg_id = self._extract_message_id(raw_data) if msg_id and self._is_duplicate(msg_id): self.LOG.info(f"[{self.name}] Duplicate message ignored: {msg_id}") return True, "duplicate" raw_body = json.dumps( raw_data, ensure_ascii=False, separators=(",", ":"), sort_keys=False, ).encode("utf-8") ok, info = await asyncio.to_thread(self._post_to_openclaw, raw_body) if ok: if msg_id: self._mark_recent(msg_id) return True, "forwarded" return False, info except Exception as e: self.LOG.error(f"[{self.name}] Process message error: {e}") return False, f"error: {e}" def _create_app(self) -> Flask: app = Flask(__name__) # Reduce werkzeug logs logging.getLogger("werkzeug").setLevel(logging.ERROR) @app.get("/healthz") def healthz(): return jsonify({"ok": True, "name": self.name, "ts": int(time.time())}) @app.post("/api/messages/send") def api_messages_send(): if not self._ip_allowed(request): return jsonify({"error": "forbidden"}), 403 auth = request.headers.get("Authorization", "") if not self._verify_bearer(auth): return jsonify({"error": "unauthorized"}), 401 data = request.get_json(silent=True) or {} to = data.get("to") text = data.get("text") or data.get("content") reply_to = data.get("replyToMessageId") if not to or not text: return jsonify({"error": "missing to/text"}), 400 if not getattr(self, "bot", None) or not getattr(self.bot, "wxid", ""): return jsonify({"error": "bot_not_ready"}), 503 at_user = data.get("at") or data.get("at_user") or data.get("atUser") or "" ok, message_id = self._send_text_via_wechat_api(to, text, at_user) if not ok: return jsonify({"error": "send_failed", "detail": message_id}), 502 if reply_to: self.LOG.debug(f"[{self.name}] replyToMessageId ignored: {reply_to}") return jsonify({"messageId": message_id}), 200 return app def _run_server(self): self.LOG.info(f"[{self.name}] Webhook server starting: {self.server_host}:{self.server_port}") try: self._server = make_server(self.server_host, self.server_port, self._app) self._server.serve_forever() except Exception as e: self.LOG.error(f"[{self.name}] Webhook server error: {e}") def _post_to_openclaw(self, raw_body: bytes) -> Tuple[bool, str]: headers = {"Content-Type": "application/json"} if self.webhook_secret is not None: signature = self._sign_sha256(raw_body, self.webhook_secret) headers["X-Wechat-Abot-Signature"] = f"sha256={signature}" try: resp = requests.post( self.webhook_url, data=raw_body, headers=headers, timeout=self.webhook_timeout, ) if resp.status_code >= 200 and resp.status_code < 300: return True, f"ok:{resp.status_code}" return False, f"status:{resp.status_code}" except requests.RequestException as e: self.LOG.error(f"[{self.name}] Webhook request failed: {e}") return False, f"request_error:{e}" def _send_text_via_wechat_api(self, to: str, text: str, at_user: Any = "") -> Tuple[bool, str]: """Use existing bot API to send message.""" if not getattr(self, "bot", None): return False, "bot_not_ready" try: client_msg_id, create_time, new_msg_id = asyncio.run( self.bot.send_text_message(to, text, at_user) ) message_id = client_msg_id or new_msg_id or create_time return True, str(message_id) except Exception as e: self.LOG.error(f"[{self.name}] bot.send_text_message error: {e}") return False, str(e) def _verify_bearer(self, auth_header: str) -> bool: if not self.bot_token: # If no token configured, allow (but log once) if not self._warned_no_token: self.LOG.warning(f"[{self.name}] bot_token not set, allowing all requests") self._warned_no_token = True return True if not auth_header or not auth_header.startswith("Bearer "): return False token = auth_header[7:].strip() return hmac.compare_digest(token, self.bot_token) def _ip_allowed(self, req) -> bool: if not self.allowed_ips: return True # Try X-Forwarded-For first forwarded = req.headers.get("X-Forwarded-For", "") if forwarded: ip = forwarded.split(",")[0].strip() else: ip = req.remote_addr return ip in self.allowed_ips @staticmethod def _sign_sha256(raw_body: bytes, secret: str) -> str: return hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest() @staticmethod def _extract_message_id(raw_data: Dict[str, Any]) -> Optional[str]: for key in ("MsgId", "NewMsgId", "ClientMsgId", "clientMsgId", "newMsgId"): value = raw_data.get(key) if value: return str(value) return None def _is_duplicate(self, msg_id: str) -> bool: with self._recent_lock: return msg_id in self._recent_ids_set def _mark_recent(self, msg_id: str): with self._recent_lock: if msg_id in self._recent_ids_set: return if len(self._recent_ids) >= self._recent_ids.maxlen: old = self._recent_ids.popleft() self._recent_ids_set.discard(old) self._recent_ids.append(msg_id) self._recent_ids_set.add(msg_id)