diff --git a/plugins/weather/main.py b/plugins/weather/main.py index c634593..82ae603 100644 --- a/plugins/weather/main.py +++ b/plugins/weather/main.py @@ -1,6 +1,9 @@ from loguru import logger import aiohttp import os +import json +import asyncio +import datetime from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface @@ -11,12 +14,65 @@ from utils.decorator.points_decorator import plugin_points_cost from wechat_ipad import WechatAPIClient -class WeatherPlugin(MessagePluginInterface): - """天气查询插件""" +# ================= Redis 管理器 ================= + +class WeatherRedisManager: + """处理 Redis 数据存取""" + + def __init__(self, db_manager): + self.redis = db_manager + self.prefix = "bot:weather:" + + async def add_subscription(self, key: str, data: dict): + """添加订阅 (存入标准城市名和ID)""" + redis_key = f"{self.prefix}subs" + # 存入 JSON 字符串 + await self.redis.hset(redis_key, key, json.dumps(data, ensure_ascii=False)) + + async def remove_subscription(self, key: str): + """取消订阅""" + redis_key = f"{self.prefix}subs" + await self.redis.hdel(redis_key, key) + + async def get_all_subscriptions(self) -> Dict[str, dict]: + """获取所有订阅数据""" + redis_key = f"{self.prefix}subs" + try: + raw_data = await self.redis.hgetall(redis_key) + result = {} + if raw_data: + for k, v in raw_data.items(): + k_str = k.decode('utf-8') if isinstance(k, bytes) else k + v_str = v.decode('utf-8') if isinstance(v, bytes) else v + result[k_str] = json.loads(v_str) + return result + except Exception as e: + logger.error(f"Redis 读取订阅失败: {e}") + return {} + + async def get_history(self, city_id: str) -> Optional[dict]: + """获取某城市(ID)昨天的历史数据""" + # 注意:这里改用 city_id 作为 Key,确保唯一性 + redis_key = f"{self.prefix}history:{city_id}" + data = await self.redis.get(redis_key) + if data: + return json.loads(data) + return None + + async def save_history(self, city_id: str, data: dict): + """保存今天的核心数据 (Key 为 city_id)""" + redis_key = f"{self.prefix}history:{city_id}" + # 有效期 48 小时 + await self.redis.set(redis_key, json.dumps(data, ensure_ascii=False), ex=172800) + + +# ================= 插件主体类 ================= + +class WeatherPlugin(MessagePluginInterface): + """天气查询插件 (v3.0 - 城市ID精准订阅版)""" - # 功能权限常量 FEATURE_KEY = "WEATHER" - FEATURE_DESCRIPTION = "🌤️ 天气查询 [上海天气, 天气上海]" + FEATURE_DESCRIPTION = "🌤️ 天气查询与订阅 [天气 长沙, 订阅天气 长沙]" @property def name(self) -> str: @@ -24,11 +80,11 @@ class WeatherPlugin(MessagePluginInterface): @property def version(self) -> str: - return "1.0.0" + return "3.0.0" @property def description(self) -> str: - return "提供天气查询功能,支持查询城市实时天气和未来天气预报" + return "提供精准的天气订阅服务,支持城市模糊搜索自动纠正。" @property def author(self) -> str: @@ -36,7 +92,7 @@ class WeatherPlugin(MessagePluginInterface): @property def command_prefix(self) -> Optional[str]: - return "" # 不需要前缀,直接匹配命令 + return "" @property def commands(self) -> List[str]: @@ -53,203 +109,415 @@ class WeatherPlugin(MessagePluginInterface): def __init__(self): super().__init__() self.plugin_dir = os.path.dirname(os.path.abspath(__file__)) - # 注册功能权限 self.feature = self.register_feature() + self.redis_manager = None + self.bot_client = None + self.task = None def initialize(self, context: Dict[str, Any]) -> bool: - """初始化插件""" + """初始化""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") - # 从TOML配置文件加载配置 - self._commands = self._config.get("Weather", {}).get("command", ["天气"]) - self.command_format = self._config.get("Weather", {}).get("command-format", """⚙️获取天气: - 天气 城市名 - 天气城市名 - 城市名天气 - 城市名 天气""") + self._config = context.get("config", {}) + + # Redis 初始化 + db_manager = context.get("db_manager") + if db_manager: + self.redis_manager = WeatherRedisManager(db_manager) + else: + self.LOG.error("❌ 未找到 db_manager,功能受限!") + + self._commands = self._config.get("Weather", {}).get("command", ["天气", "订阅天气", "取消订阅"]) self.enable = self._config.get("Weather", {}).get("enable", True) - - # 加载API配置 self.api_key = self._config.get("Weather", {}).get("API_KEY", "") - self.api_domain = self._config.get("Weather", {}).get("API_DOMAIN", "") + self.api_domain = self._config.get("Weather", {}).get("API_DOMAIN", "https://api.qweather.com") - self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: - """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING + self.task = asyncio.create_task(self._daily_scheduler()) return True def stop(self) -> bool: - """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED + if self.task: + self.task.cancel() return True def can_process(self, message: Dict[str, Any]) -> bool: - """检查是否可以处理该消息""" - if not self.enable: - return False - + if not self.enable: return False content = str(message.get("content", "")).strip() - - if content.startswith("天气") or content.endswith("天气"): - return True - command = content.split(" ")[0] - return command in self._commands + for cmd in ["天气", "订阅天气", "取消订阅"]: + if content.startswith(cmd): return True + return content.endswith("天气") @plugin_stats_decorator(plugin_name="天气查询") - @plugin_points_cost(10, "天气查询消耗积分", FEATURE_KEY) + @plugin_points_cost(10, "天气服务消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """处理消息""" content = str(message.get("content", "")).strip() - self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") - # 检查权限 + if not self.bot_client: self.bot_client = bot + if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" - # 处理消息内容 - 不再使用jieba分词 - city_name = self._extract_city_name(content) - self.LOG.debug(f"城市名称:{city_name}") - if not city_name: - # await bot.send_text_message((roomid if roomid else sender), f"\n{self.command_format}", sender) - return False, "命令格式错误" - - # 配置密钥检查 if not self.api_key: - await bot.send_text_message((roomid if roomid else sender), - "\n你还没配置天气API密钥!请在config.toml中配置Weather.API_KEY", sender) return False, "API密钥未配置" - try: - # 获取天气信息 - weather_info = await self._get_weather_info(city_name) - if not weather_info: - self.LOG.debug("\n⚠️查询天气失败!") - return False, "查询天气失败" + # --- 业务分流 --- - # 发送天气信息 - await bot.send_text_message((roomid if roomid else sender), weather_info, sender) + # 1. 订阅逻辑 (核心修改点) + if content.startswith("订阅天气"): + if not self.redis_manager: return False, "数据库未连接" + + input_city = content.replace("订阅天气", "").strip() + if not input_city: return False, "请指定城市" + + # Step A: 模糊查询获取 唯一ID 和 标准名 + city_info = await self._lookup_city_info(input_city) + if not city_info: + return False, f"找不到城市 [{input_city}],请检查输入。" + + std_city_name = city_info['name'] # 例如 "长沙" + city_id = city_info['id'] # 例如 "101250101" + + # Step B: 存入 Redis (存储标准信息) + unique_id = f"{roomid}_{sender}" if roomid else f"private_{sender}" + sub_data = { + "city_id": city_id, # 核心:以 ID 为准 + "city_name": std_city_name, # 存标准名用于展示 + "input_name": input_city, # 用户原始输入(可选) + "room_id": roomid, + "sender_id": sender, + "created_at": str(datetime.datetime.now()) + } + + await self.redis_manager.add_subscription(unique_id, sub_data) + + # 反馈时告诉用户标准名 + msg = f"✅ 订阅成功!\n已锁定城市:{city_info['adm1']} - {std_city_name}\n每天 08:00 推送早报。" + await bot.send_text_message(roomid or sender, msg, sender) + + # Step C: 顺便存历史 (用于明天对比) + # 使用 ID 获取天气,避免再次 Geo 查询 + weather_data = await self._fetch_weather_by_id(city_id) + if weather_data: + await self._save_today_as_history(city_id, weather_data) + + return True, "订阅成功" + + # 2. 取消订阅 + elif content.startswith("取消订阅"): + if not self.redis_manager: return False, "数据库不可用" + unique_id = f"{roomid}_{sender}" if roomid else f"private_{sender}" + await self.redis_manager.remove_subscription(unique_id) + await bot.send_text_message(roomid or sender, "✅ 已取消天气订阅。", sender) + return True, "取消成功" + + # 3. 普通查询 (保持原逻辑,但也建议先 lookup 再查,或者复用 lookup) + else: + input_city = self._extract_city_name(content) + if not input_city: return False, "无法识别城市" + + # 这里为了简单,依然使用旧的 Name -> Search -> Weather 流程 + # 或者你可以复用 _lookup_city_info + _fetch_weather_by_id 组合 + weather_text = await self._get_weather_text_response(input_city) + if not weather_text: return False, "查询失败" + + await bot.send_text_message(roomid or sender, weather_text, sender) return True, "发送成功" + # ================= 定时任务系统 ================= + + async def _daily_scheduler(self): + """每日 08:00 调度器""" + self.LOG.info("⏰ 天气订阅定时任务已启动...") + while self.status == PluginStatus.RUNNING: + try: + now = datetime.datetime.now() + target_time = now.replace(hour=8, minute=0, second=0, microsecond=0) + if now > target_time: + target_time += datetime.timedelta(days=1) + + wait_seconds = (target_time - now).total_seconds() + self.LOG.info(f"⏳ 下一次天气推送将在 {wait_seconds:.0f} 秒后") + + await asyncio.sleep(wait_seconds) + + if self.bot_client and self.redis_manager: + await self._execute_daily_push() + + await asyncio.sleep(60) + except asyncio.CancelledError: + break + except Exception as e: + self.LOG.error(f"定时任务异常: {e}") + await asyncio.sleep(60) + + async def _execute_daily_push(self): + """执行全量推送 (基于 ID 聚合)""" + self.LOG.info("🚀 执行每日推送...") + subs = await self.redis_manager.get_all_subscriptions() + if not subs: return + + # 1. 按 [city_id] 聚合 (真正的去重) + # 结构: {"101250101": {"name": "长沙", "users": [...]}, ...} + agg_map = {} + for key, sub in subs.items(): + cid = sub.get('city_id') + cname = sub.get('city_name') + if not cid: continue # 兼容旧数据或错误数据 + + if cid not in agg_map: + agg_map[cid] = {"name": cname, "users": []} + agg_map[cid]["users"].append(sub) + + # 2. 遍历 ID 获取天气 + for city_id, info in agg_map.items(): + try: + city_name = info["name"] + user_list = info["users"] + + # 直接用 ID 查天气 (无需 Geo API,极速) + api_data = await self._fetch_weather_by_id(city_id) + if not api_data: continue + + # 获取 Redis 历史 (Key 也是 ID) + history_data = await self.redis_manager.get_history(city_id) + + # 生成文案 + push_content = self._analyze_weather_change(city_name, api_data, history_data) + + if push_content: + today_str = datetime.datetime.now().strftime("%m月%d日") + final_msg = f"📅 {today_str} 天气早报\n{push_content}" + + for user in user_list: + room_id = user.get('room_id') + sender_id = user.get('sender_id') + target_id = room_id if room_id else sender_id + + await asyncio.sleep(0.5) + try: + if self.bot_client: + await self.bot_client.send_text_message(target_id, final_msg, + sender_id if room_id else None) + except Exception as send_e: + self.LOG.error(f"推送给 {target_id} 失败: {send_e}") + + # 存档 + await self._save_today_as_history(city_id, api_data) + + except Exception as e: + self.LOG.error(f"处理城市ID {city_id} 推送失败: {e}") + + # ================= 核心分析算法 (逻辑不变) ================= + + def _analyze_weather_change(self, city_name: str, api_data: dict, history_data: Optional[dict]) -> str: + """分析天气变化""" + daily_forecast = api_data.get('daily_forecast', []) + if not daily_forecast: return "" + today = daily_forecast[0] + + t_max = int(today['tempMax']) + t_min = int(today['tempMin']) + text_day = today['textDay'] + + alerts = self._check_alerts_and_history(today, history_data) + life_tips = self._generate_life_tips(today) + astronomy = self._get_astronomy_info(today) + clothing = self._get_clothing_advice((t_max + t_min) / 2) + + msg = f"📍 {city_name} | {text_day} | {t_min}°C ~ {t_max}°C\n" + + if alerts: msg += "\n⚠️ **特别关注**\n" + "\n".join(alerts) + "\n" + if life_tips: msg += "\n💡 **温馨提示**\n" + "\n".join(life_tips) + "\n" + + msg += f"\n👔 **穿衣**:{clothing}" + if astronomy: msg += f"\n🌙 **天文**:{astronomy}" + + msg += "\n\n(回复 '取消订阅' 退订)" + return msg + + def _check_alerts_and_history(self, today: dict, history_data: Optional[dict]) -> List[str]: + alerts = [] + t_max = int(today['tempMax']) + text_day = today['textDay'] + wind_scale = today.get('windScaleDay', '0') + + if history_data: + last_max = int(history_data.get('tempMax', t_max)) + diff = t_max - last_max + if diff <= -6: + alerts.append(f"🥶 **气温骤降**:比昨天冷了 {abs(diff)}°C!") + elif diff >= 6: + alerts.append(f"🥵 **气温飙升**:比昨天热了 {diff}°C。") + + if "暴雨" in text_day: + alerts.append("🌧️ **暴雨预警**:减少户外活动。") + elif "雨" in text_day: + alerts.append("☔️ 今天有雨,出门带伞。") + if "雪" in text_day: alerts.append("❄️ 雪天路滑,注意安全。") + + try: + max_wind = int(wind_scale.split('-')[-1]) + if max_wind >= 5: alerts.append(f"🌬️ **大风警报**:{wind_scale}级大风。") + except: + pass + return alerts + + def _generate_life_tips(self, data: dict) -> List[str]: + tips = [] + uv = int(data.get('uvIndex', 0)) + if uv >= 8: + tips.append("☀️ 紫外线极强(UV>8),防晒!") + elif uv >= 5: + tips.append("☂️ 紫外线较强,注意防晒。") + + hum = int(data.get('humidity', 0)) + if hum >= 90: + tips.append("💧 潮湿(>90%),注意防潮。") + elif hum <= 20: + tips.append("🌵 干燥(<20%),多喝水。") + + vis = int(data.get('vis', 25)) + if vis < 1: + tips.append("🌫️ **大雾**:能见度<1km,慢行!") + elif vis < 5: + tips.append("👀 轻雾或霾,能见度一般。") + + moon = data.get('moonPhase', '') + text_night = data.get('textNight', '') + if "晴" in text_night and ("满月" in moon or "圆" in moon): + tips.append(f"🌕 天晴,宜赏月({moon})。") + return tips + + def _get_astronomy_info(self, data: dict) -> str: + sunrise = data.get('sunrise', '-') + sunset = data.get('sunset', '-') + moon = data.get('moonPhase', '') + parts = [] + if sunrise != '-' and sunset != '-': parts.append(f"日出{sunrise}|日落{sunset}") + if moon: parts.append(moon) + return " ".join(parts) + + def _get_clothing_advice(self, temp: float) -> str: + if temp < 0: return "羽绒服+围巾 (严寒)" + if temp < 10: return "棉衣/大衣 (冷)" + if temp < 18: return "风衣/卫衣 (凉)" + if temp < 26: return "衬衫/T恤 (舒适)" + return "短袖/裙子 (热)" + + # ================= 新版数据获取 (分离 ID 和 搜索) ================= + + async def _lookup_city_info(self, city_name: str) -> Optional[Dict]: + """[Step 1] 通过名称查找城市 ID ( Geo API )""" + try: + headers = {"X-QW-Api-Key": f'{self.api_key}'} + params = {"location": city_name} + geo_url = f"{self.api_domain}/geo/v2/city/lookup" + + conn = aiohttp.TCPConnector(ssl=False) + async with aiohttp.request('GET', geo_url, connector=conn, headers=headers, params=params) as resp: + geo_json = await resp.json() + + if geo_json.get('code') != '200' or not geo_json.get('location'): + return None + + # 返回最匹配的第一个结果 + return geo_json["location"][0] except Exception as e: - self.LOG.error(f"处理天气请求出错: {e}") - # await bot.send_text_message((roomid if roomid else sender), f"\n⚠️处理出错: {str(e)}", sender) - return False, f"处理出错: {e}" + self.LOG.error(f"GeoAPI 失败: {e}") + return None + + async def _fetch_weather_by_id(self, city_id: str) -> Optional[Dict]: + """[Step 2] 通过 ID 直接获取天气 ( Now + 7D ) - 极速模式""" + try: + # 获取实时天气 + conn = aiohttp.TCPConnector(ssl=False) + now_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}' + async with aiohttp.request('GET', now_url, connector=conn) as resp: + now_json = await resp.json() + + # 获取预报 + conn = aiohttp.TCPConnector(ssl=False) + daily_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}' + async with aiohttp.request('GET', daily_url, connector=conn) as resp: + daily_json = await resp.json() + + if now_json.get('code') != '200' or daily_json.get('code') != '200': + return None + + return { + "now": now_json.get('now'), + "daily_forecast": daily_json.get('daily'), + "updateTime": now_json.get('updateTime') + # 注意:这里没有 Geo 信息了,因为是直接用 ID 查的 + } + except Exception as e: + self.LOG.error(f"WeatherAPI(ID) 失败: {e}") + return None + + async def _save_today_as_history(self, city_id: str, api_data: dict): + """保存历史 (Key使用 ID)""" + if not self.redis_manager: return + try: + today_fc = api_data.get('daily_forecast', [])[0] + data = { + "date": datetime.datetime.now().strftime("%Y-%m-%d"), + "tempMax": today_fc['tempMax'], + "tempMin": today_fc['tempMin'], + "text": today_fc['textDay'] + } + await self.redis_manager.save_history(city_id, data) + except Exception as e: + self.LOG.error(f"存档失败: {e}") + + # --- 旧版兼容方法,用于普通查询 --- + async def _get_weather_text_response(self, city_name: str) -> str: + """普通查询流程:查找 -> 获取 -> 拼接""" + # 1. 查 ID + city_info = await self._lookup_city_info(city_name) + if not city_info: return "" + + cid = city_info['id'] + + # 2. 查天气 + raw = await self._fetch_weather_by_id(cid) + if not raw: return "" + + # 3. 拼接 + geo_str = f"{city_info['country']}{city_info['adm1']}{city_info['adm2']}" + now = raw['now'] + daily = raw['daily_forecast'] + + msg = ( + f"{geo_str} 实时天气☁️\n" + f"⏰ {raw['updateTime'][11:16]}\n\n" + f"🌡️ {now['temp']}℃ (体感{now['feelsLike']})\n" + f"☁️ {now['text']}\n" + f"🌬️ {now['windDir']} {now['windScale']}级\n" + f"💦 湿度{now['humidity']}%\n" + f"👀 能见度{now['vis']}km\n\n" + f"☁️ 未来预报:\n" + ) + for day in daily[0:3]: + d = day['fxDate'][5:] + msg += f"{d} {day['textDay']}|{day['tempMin']}~{day['tempMax']}℃\n" + return msg def _extract_city_name(self, content: str) -> str: - """提取城市名称,替代jieba分词""" - # 去除空格 content = content.replace(" ", "") - - # 处理几种常见格式 if content.startswith("天气"): - # 格式: "天气北京" return content[2:] elif content.endswith("天气"): - # 格式: "北京天气" return content[:-2] - # else: - # # 尝试分离"天气"和城市名 - # parts = content.split("天气") - # if len(parts) == 2: - # # 选择非空的部分作为城市名 - # return parts[0] if parts[0] else parts[1] - # return "" - - async def _get_weather_info(self, city_name: str) -> str: - """获取天气信息""" - try: - # 查询城市信息 - headers = { - "X-QW-Api-Key": f'{self.api_key}' - } - params = { - "location": city_name - } - geo_api_url = f"{self.api_domain}/geo/v2/city/lookup" - conn_ssl = aiohttp.TCPConnector(ssl=False) - async with aiohttp.request('GET', url=geo_api_url, connector=conn_ssl, headers=headers, - params=params) as response: - geoapi_json = await response.json() - await conn_ssl.close() - if 'code' not in geoapi_json: - self.LOG.debug(f"未查询到城市信息:{city_name},{geoapi_json}") - return "" - - if geoapi_json['code'] == '404': - return "" - elif geoapi_json['code'] != '200': - return "" - - country = geoapi_json["location"][0]["country"] - adm1 = geoapi_json["location"][0]["adm1"] - adm2 = geoapi_json["location"][0]["adm2"] - city_id = geoapi_json["location"][0]["id"] - - # 请求现在天气api - conn_ssl = aiohttp.TCPConnector(verify_ssl=False) - now_weather_api_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}' - async with aiohttp.request('GET', url=now_weather_api_url, connector=conn_ssl) as response: - now_weather_api_json = await response.json() - await conn_ssl.close() - - # 请求预报天气api - conn_ssl = aiohttp.TCPConnector(verify_ssl=False) - weather_forecast_api_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}' - async with aiohttp.request('GET', url=weather_forecast_api_url, connector=conn_ssl) as response: - weather_forecast_api_json = await response.json() - await conn_ssl.close() - - # 组合天气信息 - return self._compose_weather_message(country, adm1, adm2, now_weather_api_json, weather_forecast_api_json) - - except Exception as e: - self.LOG.error(f"获取天气信息出错: {e}") - return "" - - def _compose_weather_message(self, country, adm1, adm2, now_weather_api_json, weather_forecast_api_json): - """组合天气信息""" - update_time = now_weather_api_json['updateTime'] - now_temperature = now_weather_api_json['now']['temp'] - now_feelslike = now_weather_api_json['now']['feelsLike'] - now_weather = now_weather_api_json['now']['text'] - now_wind_direction = now_weather_api_json['now']['windDir'] - now_wind_scale = now_weather_api_json['now']['windScale'] - now_humidity = now_weather_api_json['now']['humidity'] - now_precip = now_weather_api_json['now']['precip'] - now_visibility = now_weather_api_json['now']['vis'] - now_uvindex = weather_forecast_api_json['daily'][0]['uvIndex'] - - message = ( - f"{country}{adm1}{adm2} 实时天气☁️\n" - f"⏰更新时间:{update_time}\n\n" - f"🌡️当前温度:{now_temperature}℃\n" - f"🌡️体感温度:{now_feelslike}℃\n" - f"☁️天气:{now_weather}\n" - f"☀️紫外线指数:{now_uvindex}\n" - f"🌬️风向:{now_wind_direction}\n" - f"🌬️风力:{now_wind_scale}级\n" - f"💦湿度:{now_humidity}%\n" - f"🌧️降水量:{now_precip}mm/h\n" - f"👀能见度:{now_visibility}km\n\n" - f"☁️未来3天 {adm2} 天气:\n" - ) - for day in weather_forecast_api_json['daily'][1:4]: - date = '.'.join([i.lstrip('0') for i in day['fxDate'].split('-')[1:]]) - weather = day['textDay'] - max_temp = day['tempMax'] - min_temp = day['tempMin'] - uv_index = day['uvIndex'] - message += f'{date} {weather} 最高🌡️{max_temp}℃ 最低🌡️{min_temp}℃ ☀️紫外线:{uv_index}\n' - - return message