天气订阅功能优化

This commit is contained in:
liuwei
2025-12-17 13:08:30 +08:00
parent f1df8eb372
commit 7d6986ae3d

View File

@@ -1,6 +1,9 @@
from loguru import logger from loguru import logger
import aiohttp import aiohttp
import os import os
import json
import asyncio
import datetime
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.message_plugin_interface import MessagePluginInterface
@@ -11,12 +14,65 @@ from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient from wechat_ipad import WechatAPIClient
class WeatherPlugin(MessagePluginInterface): # ================= Redis 管理器 =================
"""天气查询插件"""
class WeatherRedisManager:
"""处理 Redis 数据存取"""
def __init__(self, db_manager):
self.redis = db_manager
self.prefix = "bot:weather:"
async def add_subscription(self, key: str, data: dict):
"""添加订阅 (存入标准城市名和ID)"""
redis_key = f"{self.prefix}subs"
# 存入 JSON 字符串
await self.redis.hset(redis_key, key, json.dumps(data, ensure_ascii=False))
async def remove_subscription(self, key: str):
"""取消订阅"""
redis_key = f"{self.prefix}subs"
await self.redis.hdel(redis_key, key)
async def get_all_subscriptions(self) -> Dict[str, dict]:
"""获取所有订阅数据"""
redis_key = f"{self.prefix}subs"
try:
raw_data = await self.redis.hgetall(redis_key)
result = {}
if raw_data:
for k, v in raw_data.items():
k_str = k.decode('utf-8') if isinstance(k, bytes) else k
v_str = v.decode('utf-8') if isinstance(v, bytes) else v
result[k_str] = json.loads(v_str)
return result
except Exception as e:
logger.error(f"Redis 读取订阅失败: {e}")
return {}
async def get_history(self, city_id: str) -> Optional[dict]:
"""获取某城市(ID)昨天的历史数据"""
# 注意:这里改用 city_id 作为 Key确保唯一性
redis_key = f"{self.prefix}history:{city_id}"
data = await self.redis.get(redis_key)
if data:
return json.loads(data)
return None
async def save_history(self, city_id: str, data: dict):
"""保存今天的核心数据 (Key 为 city_id)"""
redis_key = f"{self.prefix}history:{city_id}"
# 有效期 48 小时
await self.redis.set(redis_key, json.dumps(data, ensure_ascii=False), ex=172800)
# ================= 插件主体类 =================
class WeatherPlugin(MessagePluginInterface):
"""天气查询插件 (v3.0 - 城市ID精准订阅版)"""
# 功能权限常量
FEATURE_KEY = "WEATHER" FEATURE_KEY = "WEATHER"
FEATURE_DESCRIPTION = "🌤️ 天气查询 [上海天气, 天气上海]" FEATURE_DESCRIPTION = "🌤️ 天气查询与订阅 [天气 长沙, 订阅天气 长沙]"
@property @property
def name(self) -> str: def name(self) -> str:
@@ -24,11 +80,11 @@ class WeatherPlugin(MessagePluginInterface):
@property @property
def version(self) -> str: def version(self) -> str:
return "1.0.0" return "3.0.0"
@property @property
def description(self) -> str: def description(self) -> str:
return "提供天气查询功能,支持查询城市实时天气和未来天气预报" return "提供精准的天气订阅服务,支持城市模糊搜索自动纠正。"
@property @property
def author(self) -> str: def author(self) -> str:
@@ -36,7 +92,7 @@ class WeatherPlugin(MessagePluginInterface):
@property @property
def command_prefix(self) -> Optional[str]: def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令 return ""
@property @property
def commands(self) -> List[str]: def commands(self) -> List[str]:
@@ -53,203 +109,415 @@ class WeatherPlugin(MessagePluginInterface):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.plugin_dir = os.path.dirname(os.path.abspath(__file__)) self.plugin_dir = os.path.dirname(os.path.abspath(__file__))
# 注册功能权限
self.feature = self.register_feature() self.feature = self.register_feature()
self.redis_manager = None
self.bot_client = None
self.task = None
def initialize(self, context: Dict[str, Any]) -> bool: def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件""" """初始化"""
self.LOG = logger self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...") self.LOG.info(f"正在初始化 {self.name} 插件...")
# 从TOML配置文件加载配置 self._config = context.get("config", {})
self._commands = self._config.get("Weather", {}).get("command", ["天气"])
self.command_format = self._config.get("Weather", {}).get("command-format", """⚙️获取天气: # Redis 初始化
天气 城市名 db_manager = context.get("db_manager")
天气城市名 if db_manager:
城市名天气 self.redis_manager = WeatherRedisManager(db_manager)
城市名 天气""") else:
self.LOG.error("❌ 未找到 db_manager功能受限")
self._commands = self._config.get("Weather", {}).get("command", ["天气", "订阅天气", "取消订阅"])
self.enable = self._config.get("Weather", {}).get("enable", True) self.enable = self._config.get("Weather", {}).get("enable", True)
# 加载API配置
self.api_key = self._config.get("Weather", {}).get("API_KEY", "") self.api_key = self._config.get("Weather", {}).get("API_KEY", "")
self.api_domain = self._config.get("Weather", {}).get("API_DOMAIN", "") self.api_domain = self._config.get("Weather", {}).get("API_DOMAIN", "https://api.qweather.com")
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True return True
def start(self) -> bool: def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动") self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING self.status = PluginStatus.RUNNING
self.task = asyncio.create_task(self._daily_scheduler())
return True return True
def stop(self) -> bool: def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止") self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED self.status = PluginStatus.STOPPED
if self.task:
self.task.cancel()
return True return True
def can_process(self, message: Dict[str, Any]) -> bool: def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息""" if not self.enable: return False
if not self.enable:
return False
content = str(message.get("content", "")).strip() content = str(message.get("content", "")).strip()
for cmd in ["天气", "订阅天气", "取消订阅"]:
if content.startswith("天气") or content.endswith("天气"): if content.startswith(cmd): return True
return True return content.endswith("天气")
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="天气查询") @plugin_stats_decorator(plugin_name="天气查询")
@plugin_points_cost(10, "天气查询消耗积分", FEATURE_KEY) @plugin_points_cost(10, "天气服务消耗积分", FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip() content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender") sender = message.get("sender")
roomid = message.get("roomid", "") roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm") gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot") bot: WechatAPIClient = message.get("bot")
# 检查权限 if not self.bot_client: self.bot_client = bot
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限" return False, "没有权限"
# 处理消息内容 - 不再使用jieba分词
city_name = self._extract_city_name(content)
self.LOG.debug(f"城市名称:{city_name}")
if not city_name:
# await bot.send_text_message((roomid if roomid else sender), f"\n{self.command_format}", sender)
return False, "命令格式错误"
# 配置密钥检查
if not self.api_key: if not self.api_key:
await bot.send_text_message((roomid if roomid else sender),
"\n你还没配置天气API密钥请在config.toml中配置Weather.API_KEY", sender)
return False, "API密钥未配置" return False, "API密钥未配置"
try: # --- 业务分流 ---
# 获取天气信息
weather_info = await self._get_weather_info(city_name)
if not weather_info:
self.LOG.debug("\n⚠️查询天气失败!")
return False, "查询天气失败"
# 发送天气信息 # 1. 订阅逻辑 (核心修改点)
await bot.send_text_message((roomid if roomid else sender), weather_info, sender) if content.startswith("订阅天气"):
if not self.redis_manager: return False, "数据库未连接"
input_city = content.replace("订阅天气", "").strip()
if not input_city: return False, "请指定城市"
# Step A: 模糊查询获取 唯一ID 和 标准名
city_info = await self._lookup_city_info(input_city)
if not city_info:
return False, f"找不到城市 [{input_city}],请检查输入。"
std_city_name = city_info['name'] # 例如 "长沙"
city_id = city_info['id'] # 例如 "101250101"
# Step B: 存入 Redis (存储标准信息)
unique_id = f"{roomid}_{sender}" if roomid else f"private_{sender}"
sub_data = {
"city_id": city_id, # 核心:以 ID 为准
"city_name": std_city_name, # 存标准名用于展示
"input_name": input_city, # 用户原始输入(可选)
"room_id": roomid,
"sender_id": sender,
"created_at": str(datetime.datetime.now())
}
await self.redis_manager.add_subscription(unique_id, sub_data)
# 反馈时告诉用户标准名
msg = f"✅ 订阅成功!\n已锁定城市:{city_info['adm1']} - {std_city_name}\n每天 08:00 推送早报。"
await bot.send_text_message(roomid or sender, msg, sender)
# Step C: 顺便存历史 (用于明天对比)
# 使用 ID 获取天气,避免再次 Geo 查询
weather_data = await self._fetch_weather_by_id(city_id)
if weather_data:
await self._save_today_as_history(city_id, weather_data)
return True, "订阅成功"
# 2. 取消订阅
elif content.startswith("取消订阅"):
if not self.redis_manager: return False, "数据库不可用"
unique_id = f"{roomid}_{sender}" if roomid else f"private_{sender}"
await self.redis_manager.remove_subscription(unique_id)
await bot.send_text_message(roomid or sender, "✅ 已取消天气订阅。", sender)
return True, "取消成功"
# 3. 普通查询 (保持原逻辑,但也建议先 lookup 再查,或者复用 lookup)
else:
input_city = self._extract_city_name(content)
if not input_city: return False, "无法识别城市"
# 这里为了简单,依然使用旧的 Name -> Search -> Weather 流程
# 或者你可以复用 _lookup_city_info + _fetch_weather_by_id 组合
weather_text = await self._get_weather_text_response(input_city)
if not weather_text: return False, "查询失败"
await bot.send_text_message(roomid or sender, weather_text, sender)
return True, "发送成功" return True, "发送成功"
# ================= 定时任务系统 =================
async def _daily_scheduler(self):
"""每日 08:00 调度器"""
self.LOG.info("⏰ 天气订阅定时任务已启动...")
while self.status == PluginStatus.RUNNING:
try:
now = datetime.datetime.now()
target_time = now.replace(hour=8, minute=0, second=0, microsecond=0)
if now > target_time:
target_time += datetime.timedelta(days=1)
wait_seconds = (target_time - now).total_seconds()
self.LOG.info(f"⏳ 下一次天气推送将在 {wait_seconds:.0f} 秒后")
await asyncio.sleep(wait_seconds)
if self.bot_client and self.redis_manager:
await self._execute_daily_push()
await asyncio.sleep(60)
except asyncio.CancelledError:
break
except Exception as e:
self.LOG.error(f"定时任务异常: {e}")
await asyncio.sleep(60)
async def _execute_daily_push(self):
"""执行全量推送 (基于 ID 聚合)"""
self.LOG.info("🚀 执行每日推送...")
subs = await self.redis_manager.get_all_subscriptions()
if not subs: return
# 1. 按 [city_id] 聚合 (真正的去重)
# 结构: {"101250101": {"name": "长沙", "users": [...]}, ...}
agg_map = {}
for key, sub in subs.items():
cid = sub.get('city_id')
cname = sub.get('city_name')
if not cid: continue # 兼容旧数据或错误数据
if cid not in agg_map:
agg_map[cid] = {"name": cname, "users": []}
agg_map[cid]["users"].append(sub)
# 2. 遍历 ID 获取天气
for city_id, info in agg_map.items():
try:
city_name = info["name"]
user_list = info["users"]
# 直接用 ID 查天气 (无需 Geo API极速)
api_data = await self._fetch_weather_by_id(city_id)
if not api_data: continue
# 获取 Redis 历史 (Key 也是 ID)
history_data = await self.redis_manager.get_history(city_id)
# 生成文案
push_content = self._analyze_weather_change(city_name, api_data, history_data)
if push_content:
today_str = datetime.datetime.now().strftime("%m月%d")
final_msg = f"📅 {today_str} 天气早报\n{push_content}"
for user in user_list:
room_id = user.get('room_id')
sender_id = user.get('sender_id')
target_id = room_id if room_id else sender_id
await asyncio.sleep(0.5)
try:
if self.bot_client:
await self.bot_client.send_text_message(target_id, final_msg,
sender_id if room_id else None)
except Exception as send_e:
self.LOG.error(f"推送给 {target_id} 失败: {send_e}")
# 存档
await self._save_today_as_history(city_id, api_data)
except Exception as e:
self.LOG.error(f"处理城市ID {city_id} 推送失败: {e}")
# ================= 核心分析算法 (逻辑不变) =================
def _analyze_weather_change(self, city_name: str, api_data: dict, history_data: Optional[dict]) -> str:
"""分析天气变化"""
daily_forecast = api_data.get('daily_forecast', [])
if not daily_forecast: return ""
today = daily_forecast[0]
t_max = int(today['tempMax'])
t_min = int(today['tempMin'])
text_day = today['textDay']
alerts = self._check_alerts_and_history(today, history_data)
life_tips = self._generate_life_tips(today)
astronomy = self._get_astronomy_info(today)
clothing = self._get_clothing_advice((t_max + t_min) / 2)
msg = f"📍 {city_name} | {text_day} | {t_min}°C ~ {t_max}°C\n"
if alerts: msg += "\n⚠️ **特别关注**\n" + "\n".join(alerts) + "\n"
if life_tips: msg += "\n💡 **温馨提示**\n" + "\n".join(life_tips) + "\n"
msg += f"\n👔 **穿衣**{clothing}"
if astronomy: msg += f"\n🌙 **天文**{astronomy}"
msg += "\n\n(回复 '取消订阅' 退订)"
return msg
def _check_alerts_and_history(self, today: dict, history_data: Optional[dict]) -> List[str]:
alerts = []
t_max = int(today['tempMax'])
text_day = today['textDay']
wind_scale = today.get('windScaleDay', '0')
if history_data:
last_max = int(history_data.get('tempMax', t_max))
diff = t_max - last_max
if diff <= -6:
alerts.append(f"🥶 **气温骤降**:比昨天冷了 {abs(diff)}°C")
elif diff >= 6:
alerts.append(f"🥵 **气温飙升**:比昨天热了 {diff}°C。")
if "暴雨" in text_day:
alerts.append("🌧️ **暴雨预警**:减少户外活动。")
elif "" in text_day:
alerts.append("☔️ 今天有雨,出门带伞。")
if "" in text_day: alerts.append("❄️ 雪天路滑,注意安全。")
try:
max_wind = int(wind_scale.split('-')[-1])
if max_wind >= 5: alerts.append(f"🌬️ **大风警报**{wind_scale}级大风。")
except:
pass
return alerts
def _generate_life_tips(self, data: dict) -> List[str]:
tips = []
uv = int(data.get('uvIndex', 0))
if uv >= 8:
tips.append("☀️ 紫外线极强(UV>8),防晒!")
elif uv >= 5:
tips.append("☂️ 紫外线较强,注意防晒。")
hum = int(data.get('humidity', 0))
if hum >= 90:
tips.append("💧 潮湿(>90%),注意防潮。")
elif hum <= 20:
tips.append("🌵 干燥(<20%),多喝水。")
vis = int(data.get('vis', 25))
if vis < 1:
tips.append("🌫️ **大雾**:能见度<1km慢行")
elif vis < 5:
tips.append("👀 轻雾或霾,能见度一般。")
moon = data.get('moonPhase', '')
text_night = data.get('textNight', '')
if "" in text_night and ("满月" in moon or "" in moon):
tips.append(f"🌕 天晴,宜赏月({moon})。")
return tips
def _get_astronomy_info(self, data: dict) -> str:
sunrise = data.get('sunrise', '-')
sunset = data.get('sunset', '-')
moon = data.get('moonPhase', '')
parts = []
if sunrise != '-' and sunset != '-': parts.append(f"日出{sunrise}|日落{sunset}")
if moon: parts.append(moon)
return " ".join(parts)
def _get_clothing_advice(self, temp: float) -> str:
if temp < 0: return "羽绒服+围巾 (严寒)"
if temp < 10: return "棉衣/大衣 (冷)"
if temp < 18: return "风衣/卫衣 (凉)"
if temp < 26: return "衬衫/T恤 (舒适)"
return "短袖/裙子 (热)"
# ================= 新版数据获取 (分离 ID 和 搜索) =================
async def _lookup_city_info(self, city_name: str) -> Optional[Dict]:
"""[Step 1] 通过名称查找城市 ID ( Geo API )"""
try:
headers = {"X-QW-Api-Key": f'{self.api_key}'}
params = {"location": city_name}
geo_url = f"{self.api_domain}/geo/v2/city/lookup"
conn = aiohttp.TCPConnector(ssl=False)
async with aiohttp.request('GET', geo_url, connector=conn, headers=headers, params=params) as resp:
geo_json = await resp.json()
if geo_json.get('code') != '200' or not geo_json.get('location'):
return None
# 返回最匹配的第一个结果
return geo_json["location"][0]
except Exception as e: except Exception as e:
self.LOG.error(f"处理天气请求出错: {e}") self.LOG.error(f"GeoAPI 失败: {e}")
# await bot.send_text_message((roomid if roomid else sender), f"\n⚠处理出错: {str(e)}", sender) return None
return False, f"处理出错: {e}"
async def _fetch_weather_by_id(self, city_id: str) -> Optional[Dict]:
"""[Step 2] 通过 ID 直接获取天气 ( Now + 7D ) - 极速模式"""
try:
# 获取实时天气
conn = aiohttp.TCPConnector(ssl=False)
now_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', now_url, connector=conn) as resp:
now_json = await resp.json()
# 获取预报
conn = aiohttp.TCPConnector(ssl=False)
daily_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', daily_url, connector=conn) as resp:
daily_json = await resp.json()
if now_json.get('code') != '200' or daily_json.get('code') != '200':
return None
return {
"now": now_json.get('now'),
"daily_forecast": daily_json.get('daily'),
"updateTime": now_json.get('updateTime')
# 注意:这里没有 Geo 信息了,因为是直接用 ID 查的
}
except Exception as e:
self.LOG.error(f"WeatherAPI(ID) 失败: {e}")
return None
async def _save_today_as_history(self, city_id: str, api_data: dict):
"""保存历史 (Key使用 ID)"""
if not self.redis_manager: return
try:
today_fc = api_data.get('daily_forecast', [])[0]
data = {
"date": datetime.datetime.now().strftime("%Y-%m-%d"),
"tempMax": today_fc['tempMax'],
"tempMin": today_fc['tempMin'],
"text": today_fc['textDay']
}
await self.redis_manager.save_history(city_id, data)
except Exception as e:
self.LOG.error(f"存档失败: {e}")
# --- 旧版兼容方法,用于普通查询 ---
async def _get_weather_text_response(self, city_name: str) -> str:
"""普通查询流程:查找 -> 获取 -> 拼接"""
# 1. 查 ID
city_info = await self._lookup_city_info(city_name)
if not city_info: return ""
cid = city_info['id']
# 2. 查天气
raw = await self._fetch_weather_by_id(cid)
if not raw: return ""
# 3. 拼接
geo_str = f"{city_info['country']}{city_info['adm1']}{city_info['adm2']}"
now = raw['now']
daily = raw['daily_forecast']
msg = (
f"{geo_str} 实时天气☁️\n"
f"{raw['updateTime'][11:16]}\n\n"
f"🌡️ {now['temp']}℃ (体感{now['feelsLike']})\n"
f"☁️ {now['text']}\n"
f"🌬️ {now['windDir']} {now['windScale']}\n"
f"💦 湿度{now['humidity']}%\n"
f"👀 能见度{now['vis']}km\n\n"
f"☁️ 未来预报:\n"
)
for day in daily[0:3]:
d = day['fxDate'][5:]
msg += f"{d} {day['textDay']}|{day['tempMin']}~{day['tempMax']}\n"
return msg
def _extract_city_name(self, content: str) -> str: def _extract_city_name(self, content: str) -> str:
"""提取城市名称替代jieba分词"""
# 去除空格
content = content.replace(" ", "") content = content.replace(" ", "")
# 处理几种常见格式
if content.startswith("天气"): if content.startswith("天气"):
# 格式: "天气北京"
return content[2:] return content[2:]
elif content.endswith("天气"): elif content.endswith("天气"):
# 格式: "北京天气"
return content[:-2] return content[:-2]
# else:
# # 尝试分离"天气"和城市名
# parts = content.split("天气")
# if len(parts) == 2:
# # 选择非空的部分作为城市名
# return parts[0] if parts[0] else parts[1]
#
return "" return ""
async def _get_weather_info(self, city_name: str) -> str:
"""获取天气信息"""
try:
# 查询城市信息
headers = {
"X-QW-Api-Key": f'{self.api_key}'
}
params = {
"location": city_name
}
geo_api_url = f"{self.api_domain}/geo/v2/city/lookup"
conn_ssl = aiohttp.TCPConnector(ssl=False)
async with aiohttp.request('GET', url=geo_api_url, connector=conn_ssl, headers=headers,
params=params) as response:
geoapi_json = await response.json()
await conn_ssl.close()
if 'code' not in geoapi_json:
self.LOG.debug(f"未查询到城市信息:{city_name},{geoapi_json}")
return ""
if geoapi_json['code'] == '404':
return ""
elif geoapi_json['code'] != '200':
return ""
country = geoapi_json["location"][0]["country"]
adm1 = geoapi_json["location"][0]["adm1"]
adm2 = geoapi_json["location"][0]["adm2"]
city_id = geoapi_json["location"][0]["id"]
# 请求现在天气api
conn_ssl = aiohttp.TCPConnector(verify_ssl=False)
now_weather_api_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', url=now_weather_api_url, connector=conn_ssl) as response:
now_weather_api_json = await response.json()
await conn_ssl.close()
# 请求预报天气api
conn_ssl = aiohttp.TCPConnector(verify_ssl=False)
weather_forecast_api_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', url=weather_forecast_api_url, connector=conn_ssl) as response:
weather_forecast_api_json = await response.json()
await conn_ssl.close()
# 组合天气信息
return self._compose_weather_message(country, adm1, adm2, now_weather_api_json, weather_forecast_api_json)
except Exception as e:
self.LOG.error(f"获取天气信息出错: {e}")
return ""
def _compose_weather_message(self, country, adm1, adm2, now_weather_api_json, weather_forecast_api_json):
"""组合天气信息"""
update_time = now_weather_api_json['updateTime']
now_temperature = now_weather_api_json['now']['temp']
now_feelslike = now_weather_api_json['now']['feelsLike']
now_weather = now_weather_api_json['now']['text']
now_wind_direction = now_weather_api_json['now']['windDir']
now_wind_scale = now_weather_api_json['now']['windScale']
now_humidity = now_weather_api_json['now']['humidity']
now_precip = now_weather_api_json['now']['precip']
now_visibility = now_weather_api_json['now']['vis']
now_uvindex = weather_forecast_api_json['daily'][0]['uvIndex']
message = (
f"{country}{adm1}{adm2} 实时天气☁️\n"
f"⏰更新时间:{update_time}\n\n"
f"🌡️当前温度:{now_temperature}\n"
f"🌡️体感温度:{now_feelslike}\n"
f"☁️天气:{now_weather}\n"
f"☀️紫外线指数:{now_uvindex}\n"
f"🌬️风向:{now_wind_direction}\n"
f"🌬️风力:{now_wind_scale}\n"
f"💦湿度:{now_humidity}%\n"
f"🌧️降水量:{now_precip}mm/h\n"
f"👀能见度:{now_visibility}km\n\n"
f"未来3天 {adm2} 天气:\n"
)
for day in weather_forecast_api_json['daily'][1:4]:
date = '.'.join([i.lstrip('0') for i in day['fxDate'].split('-')[1:]])
weather = day['textDay']
max_temp = day['tempMax']
min_temp = day['tempMin']
uv_index = day['uvIndex']
message += f'{date} {weather} 最高🌡️{max_temp}℃ 最低🌡️{min_temp}℃ ☀️紫外线:{uv_index}\n'
return message