from loguru import logger import aiohttp import os import json import asyncio import datetime from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.connection import DBConnectionManager from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost from wechat_ipad import WechatAPIClient # ================= Redis 管理器 ================= class WeatherRedisManager: """处理 Redis 数据存取""" def __init__(self, db_manager): self.redis = db_manager.get_redis_connection() self.prefix = "bot:weather:" def add_subscription(self, key: str, data: dict): """添加订阅 (存入标准城市名和ID)""" redis_key = f"{self.prefix}subs" # 存入 JSON 字符串 self.redis.hset(redis_key, key, json.dumps(data, ensure_ascii=False)) def remove_subscription(self, key: str): """取消订阅""" redis_key = f"{self.prefix}subs" self.redis.hdel(redis_key, key) def get_all_subscriptions(self) -> Dict[str, dict]: """获取所有订阅数据""" redis_key = f"{self.prefix}subs" try: raw_data = self.redis.hgetall(redis_key) result = {} if raw_data: for k, v in raw_data.items(): k_str = k.decode('utf-8') if isinstance(k, bytes) else k v_str = v.decode('utf-8') if isinstance(v, bytes) else v result[k_str] = json.loads(v_str) return result except Exception as e: logger.error(f"Redis 读取订阅失败: {e}") return {} def get_history(self, city_id: str) -> Optional[dict]: """获取某城市(ID)昨天的历史数据""" # 注意:这里改用 city_id 作为 Key,确保唯一性 redis_key = f"{self.prefix}history:{city_id}" data = self.redis.get(redis_key) if data: return json.loads(data) return None def save_history(self, city_id: str, data: dict): """保存今天的核心数据 (Key 为 city_id)""" redis_key = f"{self.prefix}history:{city_id}" # 有效期 48 小时 self.redis.set(redis_key, json.dumps(data, ensure_ascii=False), ex=172800) # ================= 插件主体类 ================= class WeatherPlugin(MessagePluginInterface): """天气查询插件 (v3.0 - 城市ID精准订阅版)""" FEATURE_KEY = "WEATHER" FEATURE_DESCRIPTION = "🌤️ 天气查询与订阅 [天气 长沙, 订阅天气 长沙]" @property def name(self) -> str: return "天气查询" @property def version(self) -> str: return "3.0.0" @property def description(self) -> str: return "提供精准的天气订阅服务,支持城市模糊搜索自动纠正。" @property def author(self) -> str: return "fg" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.plugin_dir = os.path.dirname(os.path.abspath(__file__)) self.feature = self.register_feature() self.redis_manager = None self._config = {} self.bot_client = None self.task = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") # Redis 初始化 self.db_manager = DBConnectionManager.get_instance() self.redis_manager = WeatherRedisManager(self.db_manager) self._commands = self._config.get("Weather", {}).get("command", ["天气", "订阅天气", "取消订阅"]) self.enable = self._config.get("Weather", {}).get("enable", True) self.api_key = self._config.get("Weather", {}).get("API_KEY", "") self.api_domain = self._config.get("Weather", {}).get("API_DOMAIN", "https://api.qweather.com") return True def start(self) -> bool: self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING self.task = asyncio.create_task(self._daily_scheduler()) return True def stop(self) -> bool: self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED if self.task: self.task.cancel() return True def can_process(self, message: Dict[str, Any]) -> bool: if not self.enable: return False content = str(message.get("content", "")).strip() for cmd in ["天气", "订阅天气", "取消订阅"]: if content.startswith(cmd): return True return content.endswith("天气") @plugin_stats_decorator(plugin_name="天气查询") @plugin_points_cost(10, "天气服务消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: content = str(message.get("content", "")).strip() sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") if not self.bot_client: self.bot_client = bot if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" if not self.api_key: return False, "API密钥未配置" # --- 业务分流 --- # 1. 订阅逻辑 (核心修改点) if content.startswith("订阅天气"): if not self.redis_manager: return False, "数据库未连接" input_city = content.replace("订阅天气", "").strip() if not input_city: return False, "请指定城市" # Step A: 模糊查询获取 唯一ID 和 标准名 city_info = await self._lookup_city_info(input_city) if not city_info: return False, f"找不到城市 [{input_city}],请检查输入。" std_city_name = city_info['name'] # 例如 "长沙" city_id = city_info['id'] # 例如 "101250101" # Step B: 存入 Redis (存储标准信息) unique_id = f"{roomid}_{sender}" if roomid else f"private_{sender}" sub_data = { "city_id": city_id, # 核心:以 ID 为准 "city_name": std_city_name, # 存标准名用于展示 "input_name": input_city, # 用户原始输入(可选) "room_id": roomid, "sender_id": sender, "created_at": str(datetime.datetime.now()) } self.redis_manager.add_subscription(unique_id, sub_data) # 反馈时告诉用户标准名 msg = f"✅ 订阅成功!\n已锁定城市:{city_info['adm1']} - {std_city_name}\n每天 08:00 推送早报。" await bot.send_text_message(roomid or sender, msg, sender) # Step C: 顺便存历史 (用于明天对比) # 使用 ID 获取天气,避免再次 Geo 查询 weather_data = await self._fetch_weather_by_id(city_id) if weather_data: self._save_today_as_history(city_id, weather_data) return True, "订阅成功" # 2. 取消订阅 elif content.startswith("取消订阅"): if not self.redis_manager: return False, "数据库不可用" unique_id = f"{roomid}_{sender}" if roomid else f"private_{sender}" self.redis_manager.remove_subscription(unique_id) await bot.send_text_message(roomid or sender, "✅ 已取消天气订阅。", sender) return True, "取消成功" # 3. 普通查询 (保持原逻辑,但也建议先 lookup 再查,或者复用 lookup) else: input_city = self._extract_city_name(content) if not input_city: return False, "无法识别城市" # 这里为了简单,依然使用旧的 Name -> Search -> Weather 流程 # 或者你可以复用 _lookup_city_info + _fetch_weather_by_id 组合 weather_text = await self._get_weather_text_response(input_city) if not weather_text: return False, "查询失败" await bot.send_text_message(roomid or sender, weather_text, sender) return True, "发送成功" # ================= 定时任务系统 ================= async def _daily_scheduler(self): """每日 08:00 调度器""" self.LOG.info("⏰ 天气订阅定时任务已启动...") while self.status == PluginStatus.RUNNING: try: now = datetime.datetime.now() target_time = now.replace(hour=8, minute=0, second=0, microsecond=0) if now > target_time: target_time += datetime.timedelta(days=1) wait_seconds = (target_time - now).total_seconds() self.LOG.info(f"⏳ 下一次天气推送将在 {wait_seconds:.0f} 秒后") await asyncio.sleep(wait_seconds) if self.bot_client and self.redis_manager: await self._execute_daily_push() await asyncio.sleep(60) except asyncio.CancelledError: break except Exception as e: self.LOG.error(f"定时任务异常: {e}") await asyncio.sleep(60) async def _execute_daily_push(self): """执行全量推送 (基于 ID 聚合)""" self.LOG.info("🚀 执行每日推送...") subs = self.redis_manager.get_all_subscriptions() if not subs: return # 1. 按 [city_id] 聚合 (真正的去重) # 结构: {"101250101": {"name": "长沙", "users": [...]}, ...} agg_map = {} for key, sub in subs.items(): cid = sub.get('city_id') cname = sub.get('city_name') if not cid: continue # 兼容旧数据或错误数据 if cid not in agg_map: agg_map[cid] = {"name": cname, "users": []} agg_map[cid]["users"].append(sub) # 2. 遍历 ID 获取天气 for city_id, info in agg_map.items(): try: city_name = info["name"] user_list = info["users"] # 直接用 ID 查天气 (无需 Geo API,极速) api_data = await self._fetch_weather_by_id(city_id) if not api_data: continue # 获取 Redis 历史 (Key 也是 ID) history_data = self.redis_manager.get_history(city_id) # 生成文案 push_content = self._analyze_weather_change(city_name, api_data, history_data) if push_content: today_str = datetime.datetime.now().strftime("%m月%d日") final_msg = f"📅 {today_str} 天气早报\n{push_content}" for user in user_list: room_id = user.get('room_id') sender_id = user.get('sender_id') target_id = room_id if room_id else sender_id await asyncio.sleep(0.5) try: if self.bot_client: await self.bot_client.send_text_message(target_id, final_msg, sender_id if room_id else None) except Exception as send_e: self.LOG.error(f"推送给 {target_id} 失败: {send_e}") # 存档 self._save_today_as_history(city_id, api_data) except Exception as e: self.LOG.error(f"处理城市ID {city_id} 推送失败: {e}") # ================= 核心分析算法 (逻辑不变) ================= def _analyze_weather_change(self, city_name: str, api_data: dict, history_data: Optional[dict]) -> str: """分析天气变化""" daily_forecast = api_data.get('daily_forecast', []) if not daily_forecast: return "" today = daily_forecast[0] t_max = int(today['tempMax']) t_min = int(today['tempMin']) text_day = today['textDay'] alerts = self._check_alerts_and_history(today, history_data) life_tips = self._generate_life_tips(today) # astronomy = self._get_astronomy_info(today) # 整合进 life_tips 了 clothing = self._get_clothing_advice(t_min, t_max) msg = f"📍 {city_name} | {text_day} | {t_min}°C ~ {t_max}°C\n" if alerts: msg += "\n⚠️ **特别关注**\n" + "\n".join(alerts) + "\n" if life_tips: msg += "\n💡 **温馨提示**\n" + "\n".join(life_tips) + "\n" msg += f"\n👔 **穿衣**:{clothing}" # 增加一些趣味数据 msg += self._get_fun_stats(today) msg += "\n\n(回复 '取消订阅' 退订)" return msg def _check_alerts_and_history(self, today: dict, history_data: Optional[dict]) -> List[str]: alerts = [] t_max = int(today['tempMax']) t_min = int(today['tempMin']) text_day = today['textDay'] wind_scale = today.get('windScaleDay', '0') precip = float(today.get('precip', 0.0)) # 1. 温差提醒 diff_day = t_max - t_min if diff_day >= 12: alerts.append(f"📉 **昼夜温差大**:相差 {diff_day}°C,早晚多穿件外套。") # 2. 变温提醒 (对比昨天) if history_data: last_max = int(history_data.get('tempMax', t_max)) diff_yesterday = t_max - last_max if diff_yesterday <= -6: alerts.append(f"🥶 **气温骤降**:比昨天冷了 {abs(diff_yesterday)}°C!") elif diff_yesterday >= 6: alerts.append(f"🥵 **气温飙升**:比昨天热了 {diff_yesterday}°C。") # 3. 极端天气关键词 if "暴雨" in text_day: alerts.append("🌧️ **暴雨预警**:非必要不外出,注意防汛。") elif "大雨" in text_day: alerts.append("🌧️ **大雨倾盆**:鞋子容易湿,建议穿雨靴。") elif "雪" in text_day: alerts.append("❄️ **冰雪天气**:路面湿滑,注意行车安全。") elif "冰雹" in text_day: alerts.append("☄️ **冰雹警报**:赶紧把车停到室内!") # 4. 风力 try: # windScaleDay 可能是 "1-2" 或 "3" max_wind = int(wind_scale.split('-')[-1]) wind_dir = today.get('windDirDay', '') if max_wind >= 7: alerts.append(f"🌪️ **狂风大作**:{wind_dir}{wind_scale}级,广告牌下别站人!") elif max_wind >= 5: alerts.append(f"🌬️ **大风警报**:{wind_dir}{wind_scale}级,发型要乱啦。") except: pass return alerts def _generate_life_tips(self, data: dict) -> List[str]: tips = [] # 1. 降水 (precip) precip = float(data.get('precip', 0.0)) text_day = data.get('textDay', '') if precip > 0: if precip < 10: tips.append(f"☔️ 今天有雨({precip}mm),出门记得带伞。") else: tips.append(f"🌧️ 雨量较大({precip}mm),外出注意防雨防滑。") elif "阴" in text_day: tips.append("☁️ 天空阴沉,虽然没雨,但也别指望晒被子啦。") elif "晴" in text_day: tips.append("☀️ 阳光明媚,心情也会变好哦。") # 2. 紫外线 (uvIndex) uv = int(data.get('uvIndex', 0)) if uv >= 10: tips.append("☠️ **紫外线爆表**:尽量留在室内,出门必须全副武装!") elif uv >= 6: tips.append("☂️ **紫外线强**:涂好防晒霜,戴上墨镜。") # 3. 湿度 (humidity) hum = int(data.get('humidity', 0)) if hum >= 90: tips.append("💧 空气像是能拧出水(湿度>90%),注意防潮除湿。") elif hum <= 20: tips.append("🌵 空气很干燥(湿度<20%),多喝水,小心静电。") # 4. 气压 (pressure) pres = int(data.get('pressure', 1000)) if pres < 995: tips.append("🌬️ 气压较低,可能会感到有些闷热或不适。") # 5. 能见度 (vis) vis = int(data.get('vis', 25)) if vis < 1: tips.append("🌫️ **大雾弥漫**:能见度不足1公里,开车务必慢行!") elif vis < 5: tips.append("👀 轻雾缭绕,远处朦朦胧胧。") # 6. 舒适度/活动建议 cloud = int(data.get('cloud', 50)) sunset = data.get('sunset', '') if cloud < 20 and "晴" in text_day: tips.append(f"� 傍晚{sunset}左右可能有美丽的晚霞哦。") return tips def _get_fun_stats(self, data: dict) -> str: """获取一些有趣的数据统计""" sunrise = data.get('sunrise', '-') sunset = data.get('sunset', '-') moon_phase = data.get('moonPhase', '') moon_icon = data.get('moonPhaseIcon', '') # 假设API有这个,或者直接用 phase stats = [] # 计算白昼时长 try: sr_h, sr_m = map(int, sunrise.split(':')) ss_h, ss_m = map(int, sunset.split(':')) day_len_m = (ss_h * 60 + ss_m) - (sr_h * 60 + sr_m) hours = day_len_m // 60 minutes = day_len_m % 60 stats.append(f"\n☀️ **白昼**:{hours}小时{minutes}分 (日出{sunrise}|日落{sunset})") except: pass if moon_phase: stats.append(f"🌙 **月相**:{moon_phase}") return "".join(stats) def _get_clothing_advice(self, t_min: int, t_max: int) -> str: """根据最低和最高温综合给出穿衣建议""" avg_temp = (t_min + t_max) / 2 if t_min < -5: return "极寒!羽绒服+保暖内衣+围巾手套缺一不可。" elif t_min < 5: return "很冷,建议穿棉衣、羽绒服,里面穿毛衣。" elif t_min < 12: return "较冷,大衣或厚外套是标配。" if avg_temp < 18: return "凉爽,建议风衣、卫衣或薄外套,早晚保暖。" elif avg_temp < 24: return "舒适,衬衫、T恤外搭薄外套即可。" elif avg_temp < 30: return "微热,短袖、裙子,透气衣物。" else: return "炎热!穿得越少越凉快(注意防晒)。" # ================= 新版数据获取 (分离 ID 和 搜索) ================= async def _lookup_city_info(self, city_name: str) -> Optional[Dict]: """[Step 1] 通过名称查找城市 ID ( Geo API )""" try: headers = {"X-QW-Api-Key": f'{self.api_key}'} params = {"location": city_name} geo_url = f"{self.api_domain}/geo/v2/city/lookup" conn = aiohttp.TCPConnector(ssl=False) async with aiohttp.request('GET', geo_url, connector=conn, headers=headers, params=params) as resp: geo_json = await resp.json() if geo_json.get('code') != '200' or not geo_json.get('location'): return None # 返回最匹配的第一个结果 return geo_json["location"][0] except Exception as e: self.LOG.error(f"GeoAPI 失败: {e}") return None async def _fetch_weather_by_id(self, city_id: str) -> Optional[Dict]: """[Step 2] 通过 ID 直接获取天气 ( Now + 7D ) - 极速模式""" try: # 获取实时天气 conn = aiohttp.TCPConnector(ssl=False) now_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}' async with aiohttp.request('GET', now_url, connector=conn) as resp: now_json = await resp.json() # 获取预报 conn = aiohttp.TCPConnector(ssl=False) daily_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}' async with aiohttp.request('GET', daily_url, connector=conn) as resp: daily_json = await resp.json() if now_json.get('code') != '200' or daily_json.get('code') != '200': return None return { "now": now_json.get('now'), "daily_forecast": daily_json.get('daily'), "updateTime": now_json.get('updateTime') # 注意:这里没有 Geo 信息了,因为是直接用 ID 查的 } except Exception as e: self.LOG.error(f"WeatherAPI(ID) 失败: {e}") return None def _save_today_as_history(self, city_id: str, api_data: dict): """保存历史 (Key使用 ID)""" if not self.redis_manager: return try: today_fc = api_data.get('daily_forecast', [])[0] # 保存完整的接口数据 data = today_fc.copy() # 补充 date 字段,方便后续处理 data["date"] = datetime.datetime.now().strftime("%Y-%m-%d") self.redis_manager.save_history(city_id, data) except Exception as e: self.LOG.error(f"存档失败: {e}") # --- 旧版兼容方法,用于普通查询 --- async def _get_weather_text_response(self, city_name: str) -> str: """普通查询流程:查找 -> 获取 -> 拼接""" # 1. 查 ID city_info = await self._lookup_city_info(city_name) if not city_info: return "" cid = city_info['id'] # 2. 查天气 raw = await self._fetch_weather_by_id(cid) if not raw: return "" # 3. 拼接 geo_str = f"{city_info['country']}{city_info['adm1']}{city_info['adm2']}" now = raw['now'] daily = raw['daily_forecast'] msg = ( f"{geo_str} 实时天气☁️\n" f"⏰ {raw['updateTime'][11:16]}\n\n" f"🌡️ {now['temp']}℃ (体感{now['feelsLike']})\n" f"☁️ {now['text']}\n" f"🌬️ {now['windDir']} {now['windScale']}级\n" f"💦 湿度{now['humidity']}%\n" f"👀 能见度{now['vis']}km\n\n" f"☁️ 未来预报:\n" ) for day in daily[0:3]: d = day['fxDate'][5:] msg += f"{d} {day['textDay']}|{day['tempMin']}~{day['tempMax']}℃\n" return msg def _extract_city_name(self, content: str) -> str: content = content.replace(" ", "") if content.startswith("天气"): return content[2:] elif content.endswith("天气"): return content[:-2] return ""