162 lines
3.9 KiB
Python
162 lines
3.9 KiB
Python
"""Cloudflare DNS API 封装"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
|
|
API_BASE = "https://api.cloudflare.com/client/v4"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CloudflareAuth:
|
|
auth_type: str # api_token | global_key
|
|
api_token: str | None = None
|
|
email: str | None = None
|
|
api_key: str | None = None
|
|
|
|
def headers(self) -> dict[str, str]:
|
|
if self.auth_type == "api_token":
|
|
if not self.api_token:
|
|
raise ValueError("Cloudflare API Token 不能为空")
|
|
return {"Authorization": f"Bearer {self.api_token}"}
|
|
|
|
if self.auth_type == "global_key":
|
|
if not self.email or not self.api_key:
|
|
raise ValueError("Cloudflare Email / Global API Key 不能为空")
|
|
return {"X-Auth-Email": self.email, "X-Auth-Key": self.api_key}
|
|
|
|
raise ValueError("cloudflare_auth_type 只能是 api_token 或 global_key")
|
|
|
|
|
|
def _check_response(resp: requests.Response) -> dict[str, Any]:
|
|
try:
|
|
data = resp.json()
|
|
except Exception:
|
|
resp.raise_for_status()
|
|
raise
|
|
|
|
if not resp.ok or not data.get("success"):
|
|
errors = data.get("errors") or []
|
|
message = errors[0].get("message") if errors else f"Cloudflare API 请求失败: {resp.status_code}"
|
|
raise RuntimeError(message)
|
|
|
|
return data
|
|
|
|
|
|
def _request(
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
auth: CloudflareAuth,
|
|
params: dict[str, Any] | None = None,
|
|
json: dict[str, Any] | None = None,
|
|
timeout_seconds: int = 15,
|
|
) -> dict[str, Any]:
|
|
url = f"{API_BASE}{path}"
|
|
headers = {"Content-Type": "application/json", **auth.headers()}
|
|
resp = requests.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
params=params,
|
|
json=json,
|
|
timeout=timeout_seconds,
|
|
)
|
|
return _check_response(resp)
|
|
|
|
|
|
def find_a_record(
|
|
*,
|
|
zone_id: str,
|
|
record_name: str,
|
|
auth: CloudflareAuth,
|
|
) -> dict[str, Any] | None:
|
|
data = _request(
|
|
"GET",
|
|
f"/zones/{zone_id}/dns_records",
|
|
auth=auth,
|
|
params={"type": "A", "name": record_name},
|
|
)
|
|
result = data.get("result") or []
|
|
return result[0] if result else None
|
|
|
|
|
|
def update_a_record(
|
|
*,
|
|
zone_id: str,
|
|
record_id: str,
|
|
record_name: str,
|
|
ip: str,
|
|
proxied: bool,
|
|
auth: CloudflareAuth,
|
|
) -> dict[str, Any]:
|
|
data = _request(
|
|
"PUT",
|
|
f"/zones/{zone_id}/dns_records/{record_id}",
|
|
auth=auth,
|
|
json={"type": "A", "name": record_name, "content": ip, "proxied": proxied},
|
|
)
|
|
return data["result"]
|
|
|
|
|
|
def create_a_record(
|
|
*,
|
|
zone_id: str,
|
|
record_name: str,
|
|
ip: str,
|
|
proxied: bool,
|
|
auth: CloudflareAuth,
|
|
) -> dict[str, Any]:
|
|
data = _request(
|
|
"POST",
|
|
f"/zones/{zone_id}/dns_records",
|
|
auth=auth,
|
|
json={"type": "A", "name": record_name, "content": ip, "proxied": proxied},
|
|
)
|
|
return data["result"]
|
|
|
|
|
|
def upsert_a_record(
|
|
*,
|
|
zone_id: str,
|
|
record_name: str,
|
|
ip: str,
|
|
proxied: bool,
|
|
record_id: str | None,
|
|
auth: CloudflareAuth,
|
|
) -> dict[str, Any]:
|
|
if record_id:
|
|
try:
|
|
return update_a_record(
|
|
zone_id=zone_id,
|
|
record_id=record_id,
|
|
record_name=record_name,
|
|
ip=ip,
|
|
proxied=proxied,
|
|
auth=auth,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
record = find_a_record(zone_id=zone_id, record_name=record_name, auth=auth)
|
|
if record:
|
|
return update_a_record(
|
|
zone_id=zone_id,
|
|
record_id=record["id"],
|
|
record_name=record_name,
|
|
ip=ip,
|
|
proxied=proxied,
|
|
auth=auth,
|
|
)
|
|
|
|
return create_a_record(
|
|
zone_id=zone_id,
|
|
record_name=record_name,
|
|
ip=ip,
|
|
proxied=proxied,
|
|
auth=auth,
|
|
)
|