""" 高德地图 API 客户端封装 提供以下功能: - 地理编码:地址 → 坐标 - 逆地理编码:坐标 → 地址 - 行政区域查询:获取城市 adcode - 天气查询:实况/预报天气 - POI 搜索:关键字搜索、周边搜索 - 路径规划:驾车、公交、步行、骑行 """ from __future__ import annotations import hashlib import aiohttp from dataclasses import dataclass from typing import Any, Dict, List, Optional, Literal from loguru import logger @dataclass class AmapConfig: """高德 API 配置""" api_key: str secret: str = "" # 安全密钥,用于数字签名 timeout: int = 30 class AmapClient: """高德地图 API 客户端""" BASE_URL = "https://restapi.amap.com" def __init__(self, config: AmapConfig): self.config = config self._session: Optional[aiohttp.ClientSession] = None @staticmethod def _safe_int(value, default: int = 0) -> int: """安全地将值转换为整数,处理列表、None、空字符串等情况""" if value is None: return default if isinstance(value, list): return default if isinstance(value, (int, float)): return int(value) if isinstance(value, str): if not value.strip(): return default try: return int(float(value)) except (ValueError, TypeError): return default return default @staticmethod def _safe_float(value, default: float = 0.0) -> float: """安全地将值转换为浮点数""" if value is None: return default if isinstance(value, list): return default if isinstance(value, (int, float)): return float(value) if isinstance(value, str): if not value.strip(): return default try: return float(value) except (ValueError, TypeError): return default return default @staticmethod def _safe_str(value, default: str = "") -> str: """安全地将值转换为字符串,处理列表等情况""" if value is None: return default if isinstance(value, list): return default return str(value) async def _get_session(self) -> aiohttp.ClientSession: """获取或创建 HTTP 会话""" if self._session is None or self._session.closed: timeout = aiohttp.ClientTimeout(total=self.config.timeout) self._session = aiohttp.ClientSession(timeout=timeout) return self._session async def close(self): """关闭 HTTP 会话""" if self._session and not self._session.closed: await self._session.close() def _generate_signature(self, params: Dict[str, Any]) -> str: """ 生成数字签名 算法: 1. 将请求参数按参数名升序排序 2. 按 key=value 格式拼接,用 & 连接 3. 最后拼接上私钥(secret) 4. 对整个字符串进行 MD5 加密 Args: params: 请求参数(不含 sig) Returns: MD5 签名字符串 """ # 按参数名升序排序 sorted_params = sorted(params.items(), key=lambda x: x[0]) # 拼接成 key=value&key=value 格式 param_str = "&".join(f"{k}={v}" for k, v in sorted_params) # 拼接私钥 sign_str = param_str + self.config.secret # MD5 加密 return hashlib.md5(sign_str.encode('utf-8')).hexdigest() async def _request(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: """ 发送 API 请求 Args: endpoint: API 端点路径 params: 请求参数 Returns: API 响应数据 """ params["key"] = self.config.api_key params["output"] = "JSON" # 如果配置了安全密钥,生成数字签名 if self.config.secret: params["sig"] = self._generate_signature(params) url = f"{self.BASE_URL}{endpoint}" session = await self._get_session() try: async with session.get(url, params=params) as response: data = await response.json() # 检查 API 状态 status = data.get("status", "0") if status != "1": info = data.get("info", "未知错误") infocode = data.get("infocode", "") logger.warning(f"高德 API 错误: {info} (code: {infocode})") return {"success": False, "error": info, "code": infocode} return {"success": True, "data": data} except aiohttp.ClientError as e: logger.error(f"高德 API 请求失败: {e}") return {"success": False, "error": str(e)} except Exception as e: logger.error(f"高德 API 未知错误: {e}") return {"success": False, "error": str(e)} # ==================== 地理编码 ==================== async def geocode(self, address: str, city: str = None) -> Dict[str, Any]: """ 地理编码:将地址转换为坐标 Args: address: 结构化地址,如 "北京市朝阳区阜通东大街6号" city: 指定城市(可选) Returns: { "success": True, "location": "116.480881,39.989410", "adcode": "110105", "city": "北京市", "district": "朝阳区", "level": "门址" } """ params = {"address": address} if city: params["city"] = city result = await self._request("/v3/geocode/geo", params) if not result["success"]: return result geocodes = result["data"].get("geocodes", []) if not geocodes: return {"success": False, "error": "未找到该地址"} geo = geocodes[0] # 处理高德 API 返回空列表的情况(如直辖市) city_val = geo.get("city", "") if isinstance(city_val, list): city_val = "" province_val = geo.get("province", "") if isinstance(province_val, list): province_val = "" district_val = geo.get("district", "") if isinstance(district_val, list): district_val = "" # 如果城市为空,使用省份(直辖市情况) if not city_val and province_val: city_val = province_val return { "success": True, "location": geo.get("location", ""), "adcode": self._safe_str(geo.get("adcode", "")), "province": province_val, "city": city_val, "district": district_val, "level": self._safe_str(geo.get("level", "")), "formatted_address": self._safe_str(geo.get("formatted_address", address)) } async def reverse_geocode( self, location: str, radius: int = 1000, extensions: str = "base" ) -> Dict[str, Any]: """ 逆地理编码:将坐标转换为地址 Args: location: 经纬度坐标,格式 "lng,lat" radius: 搜索半径(米),0-3000 extensions: base 或 all Returns: 地址信息 """ params = { "location": location, "radius": min(radius, 3000), "extensions": extensions } result = await self._request("/v3/geocode/regeo", params) if not result["success"]: return result regeocode = result["data"].get("regeocode", {}) address_component = regeocode.get("addressComponent", {}) return { "success": True, "formatted_address": regeocode.get("formatted_address", ""), "province": address_component.get("province", ""), "city": address_component.get("city", ""), "district": address_component.get("district", ""), "adcode": address_component.get("adcode", ""), "township": address_component.get("township", ""), "pois": regeocode.get("pois", []) if extensions == "all" else [] } # ==================== 行政区域查询 ==================== async def get_district( self, keywords: str = None, subdistrict: int = 1 ) -> Dict[str, Any]: """ 行政区域查询 Args: keywords: 查询关键字(城市名、adcode 等) subdistrict: 返回下级行政区级数(0-3) Returns: 行政区域信息,包含 adcode、citycode 等 """ params = {"subdistrict": subdistrict} if keywords: params["keywords"] = keywords result = await self._request("/v3/config/district", params) if not result["success"]: return result districts = result["data"].get("districts", []) if not districts: return {"success": False, "error": "未找到该行政区域"} district = districts[0] return { "success": True, "name": district.get("name", ""), "adcode": district.get("adcode", ""), "citycode": district.get("citycode", ""), "center": district.get("center", ""), "level": district.get("level", ""), "districts": district.get("districts", []) } # ==================== 天气查询 ==================== async def get_weather( self, city: str, extensions: Literal["base", "all"] = "all" ) -> Dict[str, Any]: """ 天气查询 Args: city: 城市 adcode(如 110000)或城市名 extensions: base=实况天气,all=预报天气(未来4天) Returns: 天气信息 """ # 如果传入的是城市名,先获取 adcode if not city.isdigit(): district_result = await self.get_district(city) if not district_result["success"]: return {"success": False, "error": f"无法识别城市: {city}"} city = district_result["adcode"] params = { "city": city, "extensions": extensions } result = await self._request("/v3/weather/weatherInfo", params) if not result["success"]: return result data = result["data"] if extensions == "base": # 实况天气 lives = data.get("lives", []) if not lives: return {"success": False, "error": "未获取到天气数据"} live = lives[0] return { "success": True, "type": "live", "city": live.get("city", ""), "weather": live.get("weather", ""), "temperature": live.get("temperature", ""), "winddirection": live.get("winddirection", ""), "windpower": live.get("windpower", ""), "humidity": live.get("humidity", ""), "reporttime": live.get("reporttime", "") } else: # 预报天气 forecasts = data.get("forecasts", []) if not forecasts: return {"success": False, "error": "未获取到天气预报数据"} forecast = forecasts[0] casts = forecast.get("casts", []) return { "success": True, "type": "forecast", "city": forecast.get("city", ""), "province": forecast.get("province", ""), "reporttime": forecast.get("reporttime", ""), "forecasts": [ { "date": cast.get("date", ""), "week": cast.get("week", ""), "dayweather": cast.get("dayweather", ""), "nightweather": cast.get("nightweather", ""), "daytemp": cast.get("daytemp", ""), "nighttemp": cast.get("nighttemp", ""), "daywind": cast.get("daywind", ""), "nightwind": cast.get("nightwind", ""), "daypower": cast.get("daypower", ""), "nightpower": cast.get("nightpower", "") } for cast in casts ] } # ==================== POI 搜索 ==================== async def search_poi( self, keywords: str = None, types: str = None, city: str = None, citylimit: bool = True, offset: int = 20, page: int = 1, extensions: str = "all" ) -> Dict[str, Any]: """ 关键字搜索 POI Args: keywords: 查询关键字 types: POI 类型代码,多个用 | 分隔 city: 城市名或 adcode citylimit: 是否仅返回指定城市 offset: 每页数量(建议不超过25) page: 页码 extensions: base 或 all Returns: POI 列表 """ params = { "offset": min(offset, 25), "page": page, "extensions": extensions } if keywords: params["keywords"] = keywords if types: params["types"] = types if city: params["city"] = city params["citylimit"] = "true" if citylimit else "false" result = await self._request("/v3/place/text", params) if not result["success"]: return result pois = result["data"].get("pois", []) count = self._safe_int(result["data"].get("count", 0)) return { "success": True, "count": count, "pois": [self._format_poi(poi) for poi in pois] } async def search_around( self, location: str, keywords: str = None, types: str = None, radius: int = 3000, offset: int = 20, page: int = 1, extensions: str = "all" ) -> Dict[str, Any]: """ 周边搜索 POI Args: location: 中心点坐标,格式 "lng,lat" keywords: 查询关键字 types: POI 类型代码 radius: 搜索半径(米),0-50000 offset: 每页数量 page: 页码 extensions: base 或 all Returns: POI 列表 """ params = { "location": location, "radius": min(radius, 50000), "offset": min(offset, 25), "page": page, "extensions": extensions, "sortrule": "distance" } if keywords: params["keywords"] = keywords if types: params["types"] = types result = await self._request("/v3/place/around", params) if not result["success"]: return result pois = result["data"].get("pois", []) count = self._safe_int(result["data"].get("count", 0)) return { "success": True, "count": count, "pois": [self._format_poi(poi) for poi in pois] } def _format_poi(self, poi: Dict[str, Any]) -> Dict[str, Any]: """格式化 POI 数据""" biz_ext = poi.get("biz_ext", {}) or {} return { "id": poi.get("id", ""), "name": poi.get("name", ""), "type": poi.get("type", ""), "address": poi.get("address", ""), "location": poi.get("location", ""), "tel": poi.get("tel", ""), "distance": poi.get("distance", ""), "pname": poi.get("pname", ""), "cityname": poi.get("cityname", ""), "adname": poi.get("adname", ""), "rating": biz_ext.get("rating", ""), "cost": biz_ext.get("cost", "") } # ==================== 路径规划 ==================== async def route_driving( self, origin: str, destination: str, strategy: int = 10, waypoints: str = None, extensions: str = "base" ) -> Dict[str, Any]: """ 驾车路径规划 Args: origin: 起点坐标 "lng,lat" destination: 终点坐标 "lng,lat" strategy: 驾车策略(10=躲避拥堵,13=不走高速,14=避免收费) waypoints: 途经点,多个用 ; 分隔 extensions: base 或 all Returns: 路径规划结果 """ params = { "origin": origin, "destination": destination, "strategy": strategy, "extensions": extensions } if waypoints: params["waypoints"] = waypoints result = await self._request("/v3/direction/driving", params) if not result["success"]: return result route = result["data"].get("route", {}) paths = route.get("paths", []) if not paths: return {"success": False, "error": "未找到驾车路线"} path = paths[0] return { "success": True, "mode": "driving", "origin": route.get("origin", ""), "destination": route.get("destination", ""), "distance": self._safe_int(path.get("distance", 0)), "duration": self._safe_int(path.get("duration", 0)), "tolls": self._safe_float(path.get("tolls", 0)), "toll_distance": self._safe_int(path.get("toll_distance", 0)), "traffic_lights": self._safe_int(path.get("traffic_lights", 0)), "taxi_cost": self._safe_str(route.get("taxi_cost", "")), "strategy": path.get("strategy", ""), "steps": self._format_driving_steps(path.get("steps", [])) } async def route_transit( self, origin: str, destination: str, city: str, cityd: str = None, strategy: int = 0, extensions: str = "all" ) -> Dict[str, Any]: """ 公交路径规划(含火车、地铁) Args: origin: 起点坐标 "lng,lat" destination: 终点坐标 "lng,lat" city: 起点城市 cityd: 终点城市(跨城时必填) strategy: 0=最快,1=最省钱,2=最少换乘,3=最少步行 extensions: base 或 all Returns: 公交路径规划结果 """ params = { "origin": origin, "destination": destination, "city": city, "strategy": strategy, "extensions": extensions } if cityd: params["cityd"] = cityd result = await self._request("/v3/direction/transit/integrated", params) if not result["success"]: return result route = result["data"].get("route", {}) transits = route.get("transits", []) if not transits: return {"success": False, "error": "未找到公交路线"} # 返回前3个方案 formatted_transits = [] for transit in transits[:3]: segments = transit.get("segments", []) formatted_segments = [] for seg in segments: # 步行段 walking = seg.get("walking", {}) if walking and walking.get("distance"): formatted_segments.append({ "type": "walking", "distance": self._safe_int(walking.get("distance", 0)), "duration": self._safe_int(walking.get("duration", 0)) }) # 公交/地铁段 bus_info = seg.get("bus", {}) buslines = bus_info.get("buslines", []) if buslines: line = buslines[0] formatted_segments.append({ "type": "bus", "name": self._safe_str(line.get("name", "")), "departure_stop": self._safe_str(line.get("departure_stop", {}).get("name", "")), "arrival_stop": self._safe_str(line.get("arrival_stop", {}).get("name", "")), "via_num": self._safe_int(line.get("via_num", 0)), "distance": self._safe_int(line.get("distance", 0)), "duration": self._safe_int(line.get("duration", 0)) }) # 火车段 railway = seg.get("railway", {}) if railway and railway.get("name"): formatted_segments.append({ "type": "railway", "name": self._safe_str(railway.get("name", "")), "trip": self._safe_str(railway.get("trip", "")), "departure_stop": self._safe_str(railway.get("departure_stop", {}).get("name", "")), "arrival_stop": self._safe_str(railway.get("arrival_stop", {}).get("name", "")), "departure_time": self._safe_str(railway.get("departure_stop", {}).get("time", "")), "arrival_time": self._safe_str(railway.get("arrival_stop", {}).get("time", "")), "distance": self._safe_int(railway.get("distance", 0)), "time": self._safe_str(railway.get("time", "")) }) formatted_transits.append({ "cost": self._safe_str(transit.get("cost", "")), "duration": self._safe_int(transit.get("duration", 0)), "walking_distance": self._safe_int(transit.get("walking_distance", 0)), "segments": formatted_segments }) return { "success": True, "mode": "transit", "origin": route.get("origin", ""), "destination": route.get("destination", ""), "distance": self._safe_int(route.get("distance", 0)), "taxi_cost": self._safe_str(route.get("taxi_cost", "")), "transits": formatted_transits } async def route_walking( self, origin: str, destination: str ) -> Dict[str, Any]: """ 步行路径规划 Args: origin: 起点坐标 "lng,lat" destination: 终点坐标 "lng,lat" Returns: 步行路径规划结果 """ params = { "origin": origin, "destination": destination } result = await self._request("/v3/direction/walking", params) if not result["success"]: return result route = result["data"].get("route", {}) paths = route.get("paths", []) if not paths: return {"success": False, "error": "未找到步行路线"} path = paths[0] return { "success": True, "mode": "walking", "origin": route.get("origin", ""), "destination": route.get("destination", ""), "distance": self._safe_int(path.get("distance", 0)), "duration": self._safe_int(path.get("duration", 0)) } async def route_bicycling( self, origin: str, destination: str ) -> Dict[str, Any]: """ 骑行路径规划 Args: origin: 起点坐标 "lng,lat" destination: 终点坐标 "lng,lat" Returns: 骑行路径规划结果 """ params = { "origin": origin, "destination": destination } # 骑行用 v4 接口 result = await self._request("/v4/direction/bicycling", params) if not result["success"]: return result data = result["data"].get("data", {}) paths = data.get("paths", []) if not paths: return {"success": False, "error": "未找到骑行路线"} path = paths[0] return { "success": True, "mode": "bicycling", "origin": data.get("origin", ""), "destination": data.get("destination", ""), "distance": self._safe_int(path.get("distance", 0)), "duration": self._safe_int(path.get("duration", 0)) } def _format_driving_steps(self, steps: List[Dict]) -> List[Dict]: """格式化驾车步骤""" return [ { "instruction": step.get("instruction", ""), "road": step.get("road", ""), "distance": self._safe_int(step.get("distance", 0)), "duration": self._safe_int(step.get("duration", 0)), "orientation": step.get("orientation", "") } for step in steps[:10] # 只返回前10步 ] # ==================== 距离测量 ==================== async def get_distance( self, origins: str, destination: str, type: int = 1 ) -> Dict[str, Any]: """ 距离测量 Args: origins: 起点坐标,多个用 | 分隔 destination: 终点坐标 type: 0=直线距离,1=驾车距离,3=步行距离 Returns: 距离信息 """ params = { "origins": origins, "destination": destination, "type": type } result = await self._request("/v3/distance", params) if not result["success"]: return result results = result["data"].get("results", []) if not results: return {"success": False, "error": "无法计算距离"} return { "success": True, "results": [ { "origin_id": r.get("origin_id", ""), "distance": self._safe_int(r.get("distance", 0)), "duration": self._safe_int(r.get("duration", 0)) } for r in results ] } # ==================== 输入提示 ==================== async def input_tips( self, keywords: str, city: str = None, citylimit: bool = False, datatype: str = "all" ) -> Dict[str, Any]: """ 输入提示 Args: keywords: 查询关键字 city: 城市名或 adcode citylimit: 是否仅返回指定城市 datatype: all/poi/bus/busline Returns: 提示列表 """ params = { "keywords": keywords, "datatype": datatype } if city: params["city"] = city params["citylimit"] = "true" if citylimit else "false" result = await self._request("/v3/assistant/inputtips", params) if not result["success"]: return result tips = result["data"].get("tips", []) return { "success": True, "tips": [ { "id": tip.get("id", ""), "name": tip.get("name", ""), "district": tip.get("district", ""), "adcode": tip.get("adcode", ""), "location": tip.get("location", ""), "address": tip.get("address", "") } for tip in tips if tip.get("location") # 过滤无坐标的结果 ] }