"""Cloudflare DNS API 封装""" from __future__ import annotations from dataclasses import dataclass from typing import Any import requests API_BASE = "https://api.cloudflare.com/client/v4" @dataclass(frozen=True) class CloudflareAuth: auth_type: str # api_token | global_key api_token: str | None = None email: str | None = None api_key: str | None = None def headers(self) -> dict[str, str]: if self.auth_type == "api_token": if not self.api_token: raise ValueError("Cloudflare API Token 不能为空") return {"Authorization": f"Bearer {self.api_token}"} if self.auth_type == "global_key": if not self.email or not self.api_key: raise ValueError("Cloudflare Email / Global API Key 不能为空") return {"X-Auth-Email": self.email, "X-Auth-Key": self.api_key} raise ValueError("cloudflare_auth_type 只能是 api_token 或 global_key") def _check_response(resp: requests.Response) -> dict[str, Any]: try: data = resp.json() except Exception: resp.raise_for_status() raise if not resp.ok or not data.get("success"): errors = data.get("errors") or [] message = errors[0].get("message") if errors else f"Cloudflare API 请求失败: {resp.status_code}" raise RuntimeError(message) return data def _request( method: str, path: str, *, auth: CloudflareAuth, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, timeout_seconds: int = 15, ) -> dict[str, Any]: url = f"{API_BASE}{path}" headers = {"Content-Type": "application/json", **auth.headers()} resp = requests.request( method, url, headers=headers, params=params, json=json, timeout=timeout_seconds, ) return _check_response(resp) def find_a_record( *, zone_id: str, record_name: str, auth: CloudflareAuth, ) -> dict[str, Any] | None: data = _request( "GET", f"/zones/{zone_id}/dns_records", auth=auth, params={"type": "A", "name": record_name}, ) result = data.get("result") or [] return result[0] if result else None def update_a_record( *, zone_id: str, record_id: str, record_name: str, ip: str, proxied: bool, auth: CloudflareAuth, ) -> dict[str, Any]: data = _request( "PUT", f"/zones/{zone_id}/dns_records/{record_id}", auth=auth, json={"type": "A", "name": record_name, "content": ip, "proxied": proxied}, ) return data["result"] def create_a_record( *, zone_id: str, record_name: str, ip: str, proxied: bool, auth: CloudflareAuth, ) -> dict[str, Any]: data = _request( "POST", f"/zones/{zone_id}/dns_records", auth=auth, json={"type": "A", "name": record_name, "content": ip, "proxied": proxied}, ) return data["result"] def upsert_a_record( *, zone_id: str, record_name: str, ip: str, proxied: bool, record_id: str | None, auth: CloudflareAuth, ) -> dict[str, Any]: if record_id: try: return update_a_record( zone_id=zone_id, record_id=record_id, record_name=record_name, ip=ip, proxied=proxied, auth=auth, ) except Exception: pass record = find_a_record(zone_id=zone_id, record_name=record_name, auth=auth) if record: return update_a_record( zone_id=zone_id, record_id=record["id"], record_name=record_name, ip=ip, proxied=proxied, auth=auth, ) return create_a_record( zone_id=zone_id, record_name=record_name, ip=ip, proxied=proxied, auth=auth, )