Files
WechatHookBot/plugins/TravelPlanner/amap_client.py
2026-01-08 18:46:14 +08:00

876 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
高德地图 API 客户端封装
提供以下功能:
- 地理编码:地址 → 坐标
- 逆地理编码:坐标 → 地址
- 行政区域查询:获取城市 adcode
- 天气查询:实况/预报天气
- POI 搜索:关键字搜索、周边搜索
- 路径规划:驾车、公交、步行、骑行
"""
from __future__ import annotations
import hashlib
import aiohttp
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Literal
from loguru import logger
@dataclass
class AmapConfig:
"""高德 API 配置"""
api_key: str
secret: str = "" # 安全密钥,用于数字签名
timeout: int = 30
class AmapClient:
"""高德地图 API 客户端"""
BASE_URL = "https://restapi.amap.com"
def __init__(self, config: AmapConfig):
self.config = config
self._session: Optional[aiohttp.ClientSession] = None
@staticmethod
def _safe_int(value, default: int = 0) -> int:
"""安全地将值转换为整数处理列表、None、空字符串等情况"""
if value is None:
return default
if isinstance(value, list):
return default
if isinstance(value, (int, float)):
return int(value)
if isinstance(value, str):
if not value.strip():
return default
try:
return int(float(value))
except (ValueError, TypeError):
return default
return default
@staticmethod
def _safe_float(value, default: float = 0.0) -> float:
"""安全地将值转换为浮点数"""
if value is None:
return default
if isinstance(value, list):
return default
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
if not value.strip():
return default
try:
return float(value)
except (ValueError, TypeError):
return default
return default
@staticmethod
def _safe_str(value, default: str = "") -> str:
"""安全地将值转换为字符串,处理列表等情况"""
if value is None:
return default
if isinstance(value, list):
return default
return str(value)
async def _get_session(self) -> aiohttp.ClientSession:
"""获取或创建 HTTP 会话"""
if self._session is None or self._session.closed:
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
self._session = aiohttp.ClientSession(timeout=timeout)
return self._session
async def close(self):
"""关闭 HTTP 会话"""
if self._session and not self._session.closed:
await self._session.close()
def _generate_signature(self, params: Dict[str, Any]) -> str:
"""
生成数字签名
算法:
1. 将请求参数按参数名升序排序
2. 按 key=value 格式拼接,用 & 连接
3. 最后拼接上私钥secret
4. 对整个字符串进行 MD5 加密
Args:
params: 请求参数(不含 sig
Returns:
MD5 签名字符串
"""
# 按参数名升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接成 key=value&key=value 格式
param_str = "&".join(f"{k}={v}" for k, v in sorted_params)
# 拼接私钥
sign_str = param_str + self.config.secret
# MD5 加密
return hashlib.md5(sign_str.encode('utf-8')).hexdigest()
async def _request(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""
发送 API 请求
Args:
endpoint: API 端点路径
params: 请求参数
Returns:
API 响应数据
"""
params["key"] = self.config.api_key
params["output"] = "JSON"
# 如果配置了安全密钥,生成数字签名
if self.config.secret:
params["sig"] = self._generate_signature(params)
url = f"{self.BASE_URL}{endpoint}"
session = await self._get_session()
try:
async with session.get(url, params=params) as response:
data = await response.json()
# 检查 API 状态
status = data.get("status", "0")
if status != "1":
info = data.get("info", "未知错误")
infocode = data.get("infocode", "")
logger.warning(f"高德 API 错误: {info} (code: {infocode})")
return {"success": False, "error": info, "code": infocode}
return {"success": True, "data": data}
except aiohttp.ClientError as e:
logger.error(f"高德 API 请求失败: {e}")
return {"success": False, "error": str(e)}
except Exception as e:
logger.error(f"高德 API 未知错误: {e}")
return {"success": False, "error": str(e)}
# ==================== 地理编码 ====================
async def geocode(self, address: str, city: str = None) -> Dict[str, Any]:
"""
地理编码:将地址转换为坐标
Args:
address: 结构化地址,如 "北京市朝阳区阜通东大街6号"
city: 指定城市(可选)
Returns:
{
"success": True,
"location": "116.480881,39.989410",
"adcode": "110105",
"city": "北京市",
"district": "朝阳区",
"level": "门址"
}
"""
params = {"address": address}
if city:
params["city"] = city
result = await self._request("/v3/geocode/geo", params)
if not result["success"]:
return result
geocodes = result["data"].get("geocodes", [])
if not geocodes:
return {"success": False, "error": "未找到该地址"}
geo = geocodes[0]
# 处理高德 API 返回空列表的情况(如直辖市)
city_val = geo.get("city", "")
if isinstance(city_val, list):
city_val = ""
province_val = geo.get("province", "")
if isinstance(province_val, list):
province_val = ""
district_val = geo.get("district", "")
if isinstance(district_val, list):
district_val = ""
# 如果城市为空,使用省份(直辖市情况)
if not city_val and province_val:
city_val = province_val
return {
"success": True,
"location": geo.get("location", ""),
"adcode": self._safe_str(geo.get("adcode", "")),
"province": province_val,
"city": city_val,
"district": district_val,
"level": self._safe_str(geo.get("level", "")),
"formatted_address": self._safe_str(geo.get("formatted_address", address))
}
async def reverse_geocode(
self,
location: str,
radius: int = 1000,
extensions: str = "base"
) -> Dict[str, Any]:
"""
逆地理编码:将坐标转换为地址
Args:
location: 经纬度坐标,格式 "lng,lat"
radius: 搜索半径0-3000
extensions: base 或 all
Returns:
地址信息
"""
params = {
"location": location,
"radius": min(radius, 3000),
"extensions": extensions
}
result = await self._request("/v3/geocode/regeo", params)
if not result["success"]:
return result
regeocode = result["data"].get("regeocode", {})
address_component = regeocode.get("addressComponent", {})
return {
"success": True,
"formatted_address": regeocode.get("formatted_address", ""),
"province": address_component.get("province", ""),
"city": address_component.get("city", ""),
"district": address_component.get("district", ""),
"adcode": address_component.get("adcode", ""),
"township": address_component.get("township", ""),
"pois": regeocode.get("pois", []) if extensions == "all" else []
}
# ==================== 行政区域查询 ====================
async def get_district(
self,
keywords: str = None,
subdistrict: int = 1
) -> Dict[str, Any]:
"""
行政区域查询
Args:
keywords: 查询关键字城市名、adcode 等)
subdistrict: 返回下级行政区级数0-3
Returns:
行政区域信息,包含 adcode、citycode 等
"""
params = {"subdistrict": subdistrict}
if keywords:
params["keywords"] = keywords
result = await self._request("/v3/config/district", params)
if not result["success"]:
return result
districts = result["data"].get("districts", [])
if not districts:
return {"success": False, "error": "未找到该行政区域"}
district = districts[0]
return {
"success": True,
"name": district.get("name", ""),
"adcode": district.get("adcode", ""),
"citycode": district.get("citycode", ""),
"center": district.get("center", ""),
"level": district.get("level", ""),
"districts": district.get("districts", [])
}
# ==================== 天气查询 ====================
async def get_weather(
self,
city: str,
extensions: Literal["base", "all"] = "all"
) -> Dict[str, Any]:
"""
天气查询
Args:
city: 城市 adcode如 110000或城市名
extensions: base=实况天气all=预报天气未来4天
Returns:
天气信息
"""
# 如果传入的是城市名,先获取 adcode
if not city.isdigit():
district_result = await self.get_district(city)
if not district_result["success"]:
return {"success": False, "error": f"无法识别城市: {city}"}
city = district_result["adcode"]
params = {
"city": city,
"extensions": extensions
}
result = await self._request("/v3/weather/weatherInfo", params)
if not result["success"]:
return result
data = result["data"]
if extensions == "base":
# 实况天气
lives = data.get("lives", [])
if not lives:
return {"success": False, "error": "未获取到天气数据"}
live = lives[0]
return {
"success": True,
"type": "live",
"city": live.get("city", ""),
"weather": live.get("weather", ""),
"temperature": live.get("temperature", ""),
"winddirection": live.get("winddirection", ""),
"windpower": live.get("windpower", ""),
"humidity": live.get("humidity", ""),
"reporttime": live.get("reporttime", "")
}
else:
# 预报天气
forecasts = data.get("forecasts", [])
if not forecasts:
return {"success": False, "error": "未获取到天气预报数据"}
forecast = forecasts[0]
casts = forecast.get("casts", [])
return {
"success": True,
"type": "forecast",
"city": forecast.get("city", ""),
"province": forecast.get("province", ""),
"reporttime": forecast.get("reporttime", ""),
"forecasts": [
{
"date": cast.get("date", ""),
"week": cast.get("week", ""),
"dayweather": cast.get("dayweather", ""),
"nightweather": cast.get("nightweather", ""),
"daytemp": cast.get("daytemp", ""),
"nighttemp": cast.get("nighttemp", ""),
"daywind": cast.get("daywind", ""),
"nightwind": cast.get("nightwind", ""),
"daypower": cast.get("daypower", ""),
"nightpower": cast.get("nightpower", "")
}
for cast in casts
]
}
# ==================== POI 搜索 ====================
async def search_poi(
self,
keywords: str = None,
types: str = None,
city: str = None,
citylimit: bool = True,
offset: int = 20,
page: int = 1,
extensions: str = "all"
) -> Dict[str, Any]:
"""
关键字搜索 POI
Args:
keywords: 查询关键字
types: POI 类型代码,多个用 | 分隔
city: 城市名或 adcode
citylimit: 是否仅返回指定城市
offset: 每页数量建议不超过25
page: 页码
extensions: base 或 all
Returns:
POI 列表
"""
params = {
"offset": min(offset, 25),
"page": page,
"extensions": extensions
}
if keywords:
params["keywords"] = keywords
if types:
params["types"] = types
if city:
params["city"] = city
params["citylimit"] = "true" if citylimit else "false"
result = await self._request("/v3/place/text", params)
if not result["success"]:
return result
pois = result["data"].get("pois", [])
count = self._safe_int(result["data"].get("count", 0))
return {
"success": True,
"count": count,
"pois": [self._format_poi(poi) for poi in pois]
}
async def search_around(
self,
location: str,
keywords: str = None,
types: str = None,
radius: int = 3000,
offset: int = 20,
page: int = 1,
extensions: str = "all"
) -> Dict[str, Any]:
"""
周边搜索 POI
Args:
location: 中心点坐标,格式 "lng,lat"
keywords: 查询关键字
types: POI 类型代码
radius: 搜索半径0-50000
offset: 每页数量
page: 页码
extensions: base 或 all
Returns:
POI 列表
"""
params = {
"location": location,
"radius": min(radius, 50000),
"offset": min(offset, 25),
"page": page,
"extensions": extensions,
"sortrule": "distance"
}
if keywords:
params["keywords"] = keywords
if types:
params["types"] = types
result = await self._request("/v3/place/around", params)
if not result["success"]:
return result
pois = result["data"].get("pois", [])
count = self._safe_int(result["data"].get("count", 0))
return {
"success": True,
"count": count,
"pois": [self._format_poi(poi) for poi in pois]
}
def _format_poi(self, poi: Dict[str, Any]) -> Dict[str, Any]:
"""格式化 POI 数据"""
biz_ext = poi.get("biz_ext", {}) or {}
return {
"id": poi.get("id", ""),
"name": poi.get("name", ""),
"type": poi.get("type", ""),
"address": poi.get("address", ""),
"location": poi.get("location", ""),
"tel": poi.get("tel", ""),
"distance": poi.get("distance", ""),
"pname": poi.get("pname", ""),
"cityname": poi.get("cityname", ""),
"adname": poi.get("adname", ""),
"rating": biz_ext.get("rating", ""),
"cost": biz_ext.get("cost", "")
}
# ==================== 路径规划 ====================
async def route_driving(
self,
origin: str,
destination: str,
strategy: int = 10,
waypoints: str = None,
extensions: str = "base"
) -> Dict[str, Any]:
"""
驾车路径规划
Args:
origin: 起点坐标 "lng,lat"
destination: 终点坐标 "lng,lat"
strategy: 驾车策略10=躲避拥堵13=不走高速14=避免收费)
waypoints: 途经点,多个用 ; 分隔
extensions: base 或 all
Returns:
路径规划结果
"""
params = {
"origin": origin,
"destination": destination,
"strategy": strategy,
"extensions": extensions
}
if waypoints:
params["waypoints"] = waypoints
result = await self._request("/v3/direction/driving", params)
if not result["success"]:
return result
route = result["data"].get("route", {})
paths = route.get("paths", [])
if not paths:
return {"success": False, "error": "未找到驾车路线"}
path = paths[0]
return {
"success": True,
"mode": "driving",
"origin": route.get("origin", ""),
"destination": route.get("destination", ""),
"distance": self._safe_int(path.get("distance", 0)),
"duration": self._safe_int(path.get("duration", 0)),
"tolls": self._safe_float(path.get("tolls", 0)),
"toll_distance": self._safe_int(path.get("toll_distance", 0)),
"traffic_lights": self._safe_int(path.get("traffic_lights", 0)),
"taxi_cost": self._safe_str(route.get("taxi_cost", "")),
"strategy": path.get("strategy", ""),
"steps": self._format_driving_steps(path.get("steps", []))
}
async def route_transit(
self,
origin: str,
destination: str,
city: str,
cityd: str = None,
strategy: int = 0,
extensions: str = "all"
) -> Dict[str, Any]:
"""
公交路径规划(含火车、地铁)
Args:
origin: 起点坐标 "lng,lat"
destination: 终点坐标 "lng,lat"
city: 起点城市
cityd: 终点城市(跨城时必填)
strategy: 0=最快1=最省钱2=最少换乘3=最少步行
extensions: base 或 all
Returns:
公交路径规划结果
"""
params = {
"origin": origin,
"destination": destination,
"city": city,
"strategy": strategy,
"extensions": extensions
}
if cityd:
params["cityd"] = cityd
result = await self._request("/v3/direction/transit/integrated", params)
if not result["success"]:
return result
route = result["data"].get("route", {})
transits = route.get("transits", [])
if not transits:
return {"success": False, "error": "未找到公交路线"}
# 返回前3个方案
formatted_transits = []
for transit in transits[:3]:
segments = transit.get("segments", [])
formatted_segments = []
for seg in segments:
# 步行段
walking = seg.get("walking", {})
if walking and walking.get("distance"):
formatted_segments.append({
"type": "walking",
"distance": self._safe_int(walking.get("distance", 0)),
"duration": self._safe_int(walking.get("duration", 0))
})
# 公交/地铁段
bus_info = seg.get("bus", {})
buslines = bus_info.get("buslines", [])
if buslines:
line = buslines[0]
formatted_segments.append({
"type": "bus",
"name": self._safe_str(line.get("name", "")),
"departure_stop": self._safe_str(line.get("departure_stop", {}).get("name", "")),
"arrival_stop": self._safe_str(line.get("arrival_stop", {}).get("name", "")),
"via_num": self._safe_int(line.get("via_num", 0)),
"distance": self._safe_int(line.get("distance", 0)),
"duration": self._safe_int(line.get("duration", 0))
})
# 火车段
railway = seg.get("railway", {})
if railway and railway.get("name"):
formatted_segments.append({
"type": "railway",
"name": self._safe_str(railway.get("name", "")),
"trip": self._safe_str(railway.get("trip", "")),
"departure_stop": self._safe_str(railway.get("departure_stop", {}).get("name", "")),
"arrival_stop": self._safe_str(railway.get("arrival_stop", {}).get("name", "")),
"departure_time": self._safe_str(railway.get("departure_stop", {}).get("time", "")),
"arrival_time": self._safe_str(railway.get("arrival_stop", {}).get("time", "")),
"distance": self._safe_int(railway.get("distance", 0)),
"time": self._safe_str(railway.get("time", ""))
})
formatted_transits.append({
"cost": self._safe_str(transit.get("cost", "")),
"duration": self._safe_int(transit.get("duration", 0)),
"walking_distance": self._safe_int(transit.get("walking_distance", 0)),
"segments": formatted_segments
})
return {
"success": True,
"mode": "transit",
"origin": route.get("origin", ""),
"destination": route.get("destination", ""),
"distance": self._safe_int(route.get("distance", 0)),
"taxi_cost": self._safe_str(route.get("taxi_cost", "")),
"transits": formatted_transits
}
async def route_walking(
self,
origin: str,
destination: str
) -> Dict[str, Any]:
"""
步行路径规划
Args:
origin: 起点坐标 "lng,lat"
destination: 终点坐标 "lng,lat"
Returns:
步行路径规划结果
"""
params = {
"origin": origin,
"destination": destination
}
result = await self._request("/v3/direction/walking", params)
if not result["success"]:
return result
route = result["data"].get("route", {})
paths = route.get("paths", [])
if not paths:
return {"success": False, "error": "未找到步行路线"}
path = paths[0]
return {
"success": True,
"mode": "walking",
"origin": route.get("origin", ""),
"destination": route.get("destination", ""),
"distance": self._safe_int(path.get("distance", 0)),
"duration": self._safe_int(path.get("duration", 0))
}
async def route_bicycling(
self,
origin: str,
destination: str
) -> Dict[str, Any]:
"""
骑行路径规划
Args:
origin: 起点坐标 "lng,lat"
destination: 终点坐标 "lng,lat"
Returns:
骑行路径规划结果
"""
params = {
"origin": origin,
"destination": destination
}
# 骑行用 v4 接口
result = await self._request("/v4/direction/bicycling", params)
if not result["success"]:
return result
data = result["data"].get("data", {})
paths = data.get("paths", [])
if not paths:
return {"success": False, "error": "未找到骑行路线"}
path = paths[0]
return {
"success": True,
"mode": "bicycling",
"origin": data.get("origin", ""),
"destination": data.get("destination", ""),
"distance": self._safe_int(path.get("distance", 0)),
"duration": self._safe_int(path.get("duration", 0))
}
def _format_driving_steps(self, steps: List[Dict]) -> List[Dict]:
"""格式化驾车步骤"""
return [
{
"instruction": step.get("instruction", ""),
"road": step.get("road", ""),
"distance": self._safe_int(step.get("distance", 0)),
"duration": self._safe_int(step.get("duration", 0)),
"orientation": step.get("orientation", "")
}
for step in steps[:10] # 只返回前10步
]
# ==================== 距离测量 ====================
async def get_distance(
self,
origins: str,
destination: str,
type: int = 1
) -> Dict[str, Any]:
"""
距离测量
Args:
origins: 起点坐标,多个用 | 分隔
destination: 终点坐标
type: 0=直线距离1=驾车距离3=步行距离
Returns:
距离信息
"""
params = {
"origins": origins,
"destination": destination,
"type": type
}
result = await self._request("/v3/distance", params)
if not result["success"]:
return result
results = result["data"].get("results", [])
if not results:
return {"success": False, "error": "无法计算距离"}
return {
"success": True,
"results": [
{
"origin_id": r.get("origin_id", ""),
"distance": self._safe_int(r.get("distance", 0)),
"duration": self._safe_int(r.get("duration", 0))
}
for r in results
]
}
# ==================== 输入提示 ====================
async def input_tips(
self,
keywords: str,
city: str = None,
citylimit: bool = False,
datatype: str = "all"
) -> Dict[str, Any]:
"""
输入提示
Args:
keywords: 查询关键字
city: 城市名或 adcode
citylimit: 是否仅返回指定城市
datatype: all/poi/bus/busline
Returns:
提示列表
"""
params = {
"keywords": keywords,
"datatype": datatype
}
if city:
params["city"] = city
params["citylimit"] = "true" if citylimit else "false"
result = await self._request("/v3/assistant/inputtips", params)
if not result["success"]:
return result
tips = result["data"].get("tips", [])
return {
"success": True,
"tips": [
{
"id": tip.get("id", ""),
"name": tip.get("name", ""),
"district": tip.get("district", ""),
"adcode": tip.get("adcode", ""),
"location": tip.get("location", ""),
"address": tip.get("address", "")
}
for tip in tips
if tip.get("location") # 过滤无坐标的结果
]
}