from loguru import logger import aiohttp import os import json import asyncio import datetime from typing import Dict, Any, List, Optional, Set, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.connection import DBConnectionManager from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost from wechat_ipad import WechatAPIClient # ================= Redis 管理器 ================= class WeatherRedisManager: """处理 Redis 数据存取""" def __init__(self, db_manager): self.redis = db_manager.get_redis_connection() self.prefix = "bot:weather:" def add_subscription(self, key: str, data: dict): """添加订阅 (存入标准城市名和ID)""" redis_key = f"{self.prefix}subs" # 存入 JSON 字符串 self.redis.hset(redis_key, key, json.dumps(data, ensure_ascii=False)) def remove_subscription(self, key: str): """取消订阅""" redis_key = f"{self.prefix}subs" self.redis.hdel(redis_key, key) def get_all_subscriptions(self) -> Dict[str, dict]: """获取所有订阅数据""" redis_key = f"{self.prefix}subs" try: raw_data = self.redis.hgetall(redis_key) result = {} if raw_data: for k, v in raw_data.items(): k_str = k.decode('utf-8') if isinstance(k, bytes) else k v_str = v.decode('utf-8') if isinstance(v, bytes) else v result[k_str] = json.loads(v_str) return result except Exception as e: logger.error(f"Redis 读取订阅失败: {e}") return {} def get_history(self, city_id: str) -> Optional[dict]: """获取某城市(ID)昨天的历史数据""" # 注意:这里改用 city_id 作为 Key,确保唯一性 redis_key = f"{self.prefix}history:{city_id}" data = self.redis.get(redis_key) if data: return json.loads(data) return None def save_history(self, city_id: str, data: dict): """保存今天的核心数据 (Key 为 city_id)""" redis_key = f"{self.prefix}history:{city_id}" # 有效期 48 小时 self.redis.set(redis_key, json.dumps(data, ensure_ascii=False), ex=172800) # ================= 插件主体类 ================= class WeatherPlugin(MessagePluginInterface): """天气查询插件 (v3.0 - 城市ID精准订阅版)""" FEATURE_KEY = "WEATHER" FEATURE_DESCRIPTION = "🌤️ 天气查询与订阅 [天气 长沙, 订阅天气 长沙]" @property def name(self) -> str: return "天气查询" @property def version(self) -> str: return "3.0.0" @property def description(self) -> str: return "提供精准的天气订阅服务,支持城市模糊搜索自动纠正。" @property def author(self) -> str: return "fg" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.plugin_dir = os.path.dirname(os.path.abspath(__file__)) self.feature = self.register_feature() self.redis_manager = None self._config = {} self.bot: WechatAPIClient = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") # Redis 初始化 self.db_manager = DBConnectionManager.get_instance() self.redis_manager = WeatherRedisManager(self.db_manager) self._commands = self._config.get("Weather", {}).get("command", ["天气", "订阅天气", "取消订阅"]) self.enable = self._config.get("Weather", {}).get("enable", True) self.api_key = self._config.get("Weather", {}).get("API_KEY", "") self.api_domain = self._config.get("Weather", {}).get("API_DOMAIN", "https://api.qweather.com") return True def start(self) -> bool: self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: if not self.enable: return False content = str(message.get("content", "")).strip() for cmd in ["天气", "订阅天气", "取消订阅"]: if content.startswith(cmd): return True return content.endswith("天气") @plugin_stats_decorator(plugin_name="天气查询") @plugin_points_cost(10, "天气服务消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: content = str(message.get("content", "")).strip() sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") self.bot: WechatAPIClient = message.get("bot") if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" if not self.api_key: return False, "API密钥未配置" # --- 业务分流 --- # 1. 订阅逻辑 (核心修改点) if content.startswith("订阅天气"): if not self.redis_manager: return False, "数据库未连接" input_city = content.replace("订阅天气", "").strip() if not input_city: return False, "请指定城市" # Step A: 模糊查询获取 唯一ID 和 标准名 city_info = await self._lookup_city_info(input_city) if not city_info: return False, f"找不到城市 [{input_city}],请检查输入。" std_city_name = city_info['name'] # 例如 "长沙" city_id = city_info['id'] # 例如 "101250101" # Step B: 存入 Redis (存储标准信息) unique_id = f"{roomid}_{sender}" if roomid else f"private_{sender}" sub_data = { "city_id": city_id, # 核心:以 ID 为准 "city_name": std_city_name, # 存标准名用于展示 "input_name": input_city, # 用户原始输入(可选) "room_id": roomid, "sender_id": sender, "created_at": str(datetime.datetime.now()) } self.redis_manager.add_subscription(unique_id, sub_data) # 反馈时告诉用户标准名 msg = f"✅ 订阅成功!\n已锁定城市:{city_info['adm1']} - {std_city_name}\n每天 08:00 推送早报。" await self.bot.send_text_message(roomid or sender, msg, sender) # Step C: 顺便存历史 (用于明天对比) # 使用 ID 获取天气,避免再次 Geo 查询 weather_data = await self._fetch_weather_by_id(city_id) if weather_data: self._save_today_as_history(city_id, weather_data) return True, "订阅成功" # 2. 取消订阅 elif content.startswith("取消订阅"): if not self.redis_manager: return False, "数据库不可用" unique_id = f"{roomid}_{sender}" if roomid else f"private_{sender}" self.redis_manager.remove_subscription(unique_id) await self.bot.send_text_message(roomid or sender, "✅ 已取消天气订阅。", sender) return True, "取消成功" # 3. 普通查询 (保持原逻辑,但也建议先 lookup 再查,或者复用 lookup) else: input_city = self._extract_city_name(content) if not input_city: return False, "无法识别城市" # 这里为了简单,依然使用旧的 Name -> Search -> Weather 流程 # 或者你可以复用 _lookup_city_info + _fetch_weather_by_id 组合 weather_text = await self._get_weather_text_response(input_city) if not weather_text: return False, "查询失败" await self.bot.send_text_message(roomid or sender, weather_text, sender) return True, "发送成功" def get_schedule_actions(self) -> List[Dict[str, Any]]: """声明天气插件支持的可调度动作。""" return [ { "action_key": "daily_push", "name": "天气订阅日报推送", "description": "按订阅关系推送天气日报,可限定目标群范围", "trigger_type": "at_times", "trigger_config": {"time_list": ["08:00"]}, "target_scope": "all_enabled_groups", "target_config": {}, "payload": {}, # 该任务原先是硬编码默认启用,这里保持兼容。 "default_enabled": True, } ] async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]: """执行后台调度动作并返回结构化结果。""" if action_key != "daily_push": return { "success": False, "summary": f"不支持的动作: {action_key}", "detail": {"action_key": action_key}, } # 调度中心会按 target_scope 解析目标群,这里仅做最终过滤。 target_groups = context.get("target_groups") or [] target_group_set = {str(g).strip() for g in target_groups if str(g).strip()} or None result = await self._execute_daily_push(allowed_group_ids=target_group_set) return { "success": bool(result.get("failed_targets", 0) == 0), "summary": ( f"天气推送完成: 目标{result.get('total_targets', 0)}," f"成功{result.get('success_targets', 0)},失败{result.get('failed_targets', 0)}" ), "detail": result, } # ================= 定时任务系统 ================= async def _execute_daily_push(self, allowed_group_ids: Optional[Set[str]] = None) -> Dict[str, int]: """执行全量推送 (基于 ID 聚合)。 Args: allowed_group_ids: 允许发送的群ID集合;为 None 时表示不过滤群范围。 Returns: dict: 推送统计信息,便于后台调度日志展示。 """ self.LOG.info("🚀 [Weather] 开始执行每日推送任务...") if not self.bot: self.LOG.warning("⚠️ [Weather] Bot 实例未初始化,无法发送消息!(若本次推送失败,请先在群里发送任意消息激活机器人)") subs = self.redis_manager.get_all_subscriptions() self.LOG.info(f"📋 [Weather] 共获取到 {len(subs)} 条订阅记录") if not subs: return {"cities": 0, "total_targets": 0, "success_targets": 0, "failed_targets": 0} # 1. 按 [city_id] 聚合 (真正的去重) # 结构: {"101250101": {"name": "长沙", "users": [...]}, ...} agg_map = {} for key, sub in subs.items(): cid = sub.get('city_id') cname = sub.get('city_name') if not cid: self.LOG.warning(f"⚠️ [Weather] 订阅记录 {key} 缺少 city_id,已跳过") continue # 兼容旧数据或错误数据 if cid not in agg_map: agg_map[cid] = {"name": cname, "users": []} agg_map[cid]["users"].append(sub) self.LOG.info(f"🏙️ [Weather] 聚合为 {len(agg_map)} 个城市待处理: {[info['name'] for info in agg_map.values()]}") # 2. 遍历 ID 获取天气 total_targets = 0 success_targets = 0 failed_targets = 0 for city_id, info in agg_map.items(): try: city_name = info["name"] user_list = info["users"] self.LOG.info(f"🔄 [Weather] 正在处理城市: {city_name} (ID: {city_id}), 关联订阅: {len(user_list)} 个") # 直接用 ID 查天气 (无需 Geo API,极速) api_data = await self._fetch_weather_by_id(city_id) if not api_data: self.LOG.error(f"❌ [Weather] 获取城市 {city_name} (ID: {city_id}) 天气数据失败,跳过此城市推送") continue # 获取 Redis 历史 (Key 也是 ID) history_data = self.redis_manager.get_history(city_id) # 生成文案 push_content = self._analyze_weather_change(city_name, api_data, history_data) if push_content: today_str = datetime.datetime.now().strftime("%m月%d日") final_msg = f"📅 {today_str} 天气早报\n{push_content}" # 聚合到目标ID维度:同一群只发一次,并@所有订阅者 target_map = {} for user in user_list: room_id = user.get('room_id') sender_id = user.get('sender_id') # 若指定了目标群范围,仅向范围内群聊推送;私聊订阅在此模式下不发送。 if allowed_group_ids is not None: if not room_id: continue if room_id not in allowed_group_ids: continue target_id = room_id if room_id else sender_id if target_id not in target_map: target_map[target_id] = { 'mentions': set(), 'is_group': bool(room_id) } # 仅在群聊中@订阅者;私聊无需@列表 if room_id: target_map[target_id]['mentions'].add(sender_id) self.LOG.info(f"📤 [Weather] 准备向 {len(target_map)} 个目标(群/人) 发送推送") total_targets += len(target_map) for target_id, info in target_map.items(): await asyncio.sleep(0.5) try: if self.bot: mentions = list(info['mentions']) if info['is_group'] else [] if mentions: await self.bot.send_at_message(target_id, final_msg, mentions) else: # 私聊:直接发送文本 await self.bot.send_text_message(target_id, final_msg) self.LOG.info(f"✅ [Weather] 推送成功 -> {target_id}") success_targets += 1 else: self.LOG.error(f"❌ [Weather] Bot未就绪,无法推送给 -> {target_id}") failed_targets += 1 except Exception as send_e: self.LOG.error(f"❌ [Weather] 推送给 {target_id} 失败: {send_e}") failed_targets += 1 else: self.LOG.warning(f"⚠️ [Weather] 城市 {city_name} 分析后无推送内容(可能是数据缺失)") # 存档 self._save_today_as_history(city_id, api_data) except Exception as e: self.LOG.error(f"❌ [Weather] 处理城市ID {city_id} 发生异常: {e}") return { "cities": len(agg_map), "total_targets": total_targets, "success_targets": success_targets, "failed_targets": failed_targets, } # ================= 核心分析算法 (逻辑不变) ================= def _analyze_weather_change(self, city_name: str, api_data: dict, history_data: Optional[dict]) -> str: """分析天气变化""" daily_forecast = api_data.get('daily_forecast', []) if not daily_forecast: return "" today = daily_forecast[0] t_max = int(today['tempMax']) t_min = int(today['tempMin']) text_day = today['textDay'] alerts = self._check_alerts_and_history(today, history_data) life_tips = self._generate_life_tips(today) # astronomy = self._get_astronomy_info(today) # 整合进 life_tips 了 clothing = self._get_clothing_advice(t_min, t_max) msg = f"📍 {city_name} | {text_day} | {t_min}°C ~ {t_max}°C\n" if alerts: msg += "\n⚠️ **特别关注**\n" + "\n".join(alerts) + "\n" if life_tips: msg += "\n💡 **温馨提示**\n" + "\n".join(life_tips) + "\n" msg += f"\n👔 **穿衣**:{clothing}" # 增加一些趣味数据 msg += self._get_fun_stats(today) msg += "\n\n(回复 '取消订阅' 退订)" return msg def _check_alerts_and_history(self, today: dict, history_data: Optional[dict]) -> List[str]: alerts = [] t_max = int(today['tempMax']) t_min = int(today['tempMin']) text_day = today['textDay'] wind_scale = today.get('windScaleDay', '0') precip = float(today.get('precip', 0.0)) # 1. 温差提醒 diff_day = t_max - t_min if diff_day >= 12: alerts.append(f"📉 **昼夜温差大**:相差 {diff_day}°C,早晚多穿件外套。") # 2. 变温提醒 (对比昨天) if history_data: last_max = int(history_data.get('tempMax', t_max)) diff_yesterday = t_max - last_max if diff_yesterday <= -6: alerts.append(f"🥶 **气温骤降**:比昨天冷了 {abs(diff_yesterday)}°C!") elif diff_yesterday >= 6: alerts.append(f"🥵 **气温飙升**:比昨天热了 {diff_yesterday}°C。") # 3. 极端天气关键词 if "暴雨" in text_day: alerts.append("🌧️ **暴雨预警**:非必要不外出,注意防汛。") elif "大雨" in text_day: alerts.append("🌧️ **大雨倾盆**:鞋子容易湿,建议穿雨靴。") elif "雪" in text_day: alerts.append("❄️ **冰雪天气**:路面湿滑,注意行车安全。") elif "冰雹" in text_day: alerts.append("☄️ **冰雹警报**:赶紧把车停到室内!") # 4. 风力 try: # windScaleDay 可能是 "1-2" 或 "3" max_wind = int(wind_scale.split('-')[-1]) wind_dir = today.get('windDirDay', '') if max_wind >= 7: alerts.append(f"🌪️ **狂风大作**:{wind_dir}{wind_scale}级,广告牌下别站人!") elif max_wind >= 5: alerts.append(f"🌬️ **大风警报**:{wind_dir}{wind_scale}级,发型要乱啦。") except: pass return alerts def _generate_life_tips(self, data: dict) -> List[str]: tips = [] # 1. 降水 (precip) precip = float(data.get('precip', 0.0)) text_day = data.get('textDay', '') if precip > 0: if precip < 10: tips.append(f"☔️ 今天有雨({precip}mm),出门记得带伞。") else: tips.append(f"🌧️ 雨量较大({precip}mm),外出注意防雨防滑。") elif "阴" in text_day: tips.append("☁️ 天空阴沉,虽然没雨,但也别指望晒被子啦。") elif "晴" in text_day: tips.append("☀️ 阳光明媚,心情也会变好哦。") # 2. 紫外线 (uvIndex) uv = int(data.get('uvIndex', 0)) if uv >= 10: tips.append("☠️ **紫外线爆表**:尽量留在室内,出门必须全副武装!") elif uv >= 6: tips.append("☂️ **紫外线强**:涂好防晒霜,戴上墨镜。") # 3. 湿度 (humidity) hum = int(data.get('humidity', 0)) if hum >= 90: tips.append("💧 空气像是能拧出水(湿度>90%),注意防潮除湿。") elif hum <= 20: tips.append("🌵 空气很干燥(湿度<20%),多喝水,小心静电。") # 4. 气压 (pressure) pres = int(data.get('pressure', 1000)) if pres < 995: tips.append("🌬️ 气压较低,可能会感到有些闷热或不适。") # 5. 能见度 (vis) vis = int(data.get('vis', 25)) if vis < 1: tips.append("🌫️ **大雾弥漫**:能见度不足1公里,开车务必慢行!") elif vis < 5: tips.append("👀 轻雾缭绕,远处朦朦胧胧。") # 6. 舒适度/活动建议 cloud = int(data.get('cloud', 50)) sunset = data.get('sunset', '') if cloud < 20 and "晴" in text_day: tips.append(f"🌄 傍晚{sunset}左右可能有美丽的晚霞哦。") return tips def _get_fun_stats(self, data: dict) -> str: """获取一些有趣的数据统计""" sunrise = data.get('sunrise', '-') sunset = data.get('sunset', '-') moon_phase = data.get('moonPhase', '') moon_icon = data.get('moonPhaseIcon', '') # 假设API有这个,或者直接用 phase stats = [] # 计算白昼时长 try: sr_h, sr_m = map(int, sunrise.split(':')) ss_h, ss_m = map(int, sunset.split(':')) day_len_m = (ss_h * 60 + ss_m) - (sr_h * 60 + sr_m) hours = day_len_m // 60 minutes = day_len_m % 60 stats.append(f"\n☀️ **白昼**:{hours}小时{minutes}分 (日出{sunrise}|日落{sunset})") except: pass if moon_phase: stats.append(f"🌙 **月相**:{moon_phase}") return "".join(stats) def _get_clothing_advice(self, t_min: int, t_max: int) -> str: """根据最低和最高温综合给出穿衣建议""" avg_temp = (t_min + t_max) / 2 if t_min < -5: return "极寒!羽绒服+保暖内衣+围巾手套缺一不可。" elif t_min < 5: return "很冷,建议穿棉衣、羽绒服,里面穿毛衣。" elif t_min < 12: return "较冷,大衣或厚外套是标配。" if avg_temp < 18: return "凉爽,建议风衣、卫衣或薄外套,早晚保暖。" elif avg_temp < 24: return "舒适,衬衫、T恤外搭薄外套即可。" elif avg_temp < 30: return "微热,短袖、裙子,透气衣物。" else: return "炎热!穿得越少越凉快(注意防晒)。" # ================= 新版数据获取 (分离 ID 和 搜索) ================= async def _lookup_city_info(self, city_name: str) -> Optional[Dict]: """[Step 1] 通过名称查找城市 ID ( Geo API )""" try: headers = {"X-QW-Api-Key": f'{self.api_key}'} params = {"location": city_name} geo_url = f"{self.api_domain}/geo/v2/city/lookup" conn = aiohttp.TCPConnector(ssl=False) async with aiohttp.request('GET', geo_url, connector=conn, headers=headers, params=params) as resp: geo_json = await resp.json() if geo_json.get('code') != '200' or not geo_json.get('location'): return None # 返回最匹配的第一个结果 return geo_json["location"][0] except Exception as e: self.LOG.error(f"GeoAPI 失败: {e}") return None async def _fetch_weather_by_id(self, city_id: str) -> Optional[Dict]: """[Step 2] 通过 ID 直接获取天气 ( Now + 7D ) - 极速模式""" try: # 获取实时天气 conn = aiohttp.TCPConnector(ssl=False) now_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}' async with aiohttp.request('GET', now_url, connector=conn) as resp: now_json = await resp.json() # 获取预报 conn = aiohttp.TCPConnector(ssl=False) daily_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}' async with aiohttp.request('GET', daily_url, connector=conn) as resp: daily_json = await resp.json() if now_json.get('code') != '200' or daily_json.get('code') != '200': return None return { "now": now_json.get('now'), "daily_forecast": daily_json.get('daily'), "updateTime": now_json.get('updateTime') # 注意:这里没有 Geo 信息了,因为是直接用 ID 查的 } except Exception as e: self.LOG.error(f"WeatherAPI(ID) 失败: {e}") return None def _save_today_as_history(self, city_id: str, api_data: dict): """保存历史 (Key使用 ID)""" if not self.redis_manager: return try: today_fc = api_data.get('daily_forecast', [])[0] # 保存完整的接口数据 data = today_fc.copy() # 补充 date 字段,方便后续处理 data["date"] = datetime.datetime.now().strftime("%Y-%m-%d") self.redis_manager.save_history(city_id, data) except Exception as e: self.LOG.error(f"存档失败: {e}") # --- 旧版兼容方法,用于普通查询 --- async def _get_weather_text_response(self, city_name: str) -> str: city_info = await self._lookup_city_info(city_name) if not city_info: return "" cid = city_info.get('id') raw = await self._fetch_weather_by_id(cid) if not raw: return "" now = raw.get('now', {}) daily = raw.get('daily_forecast', []) today = daily[0] if daily else {} history = self.redis_manager.get_history(cid) if self.redis_manager else None update_hm = str(raw.get('updateTime', ''))[11:16] geo_str = f"{city_info.get('country','')}{city_info.get('adm1','')}{city_info.get('adm2','')}" title = f"{geo_str} | {city_info.get('name','')}" t_now = now.get('temp', '-') feels = now.get('feelsLike', '-') text_now = now.get('text', '') wind_dir = now.get('windDir', '') wind_scale = now.get('windScale', '') humidity = now.get('humidity', '') vis = now.get('vis', '') t_min = str(today.get('tempMin', '')) t_max = str(today.get('tempMax', '')) text_day = today.get('textDay', '') uv = str(today.get('uvIndex', '')) precip = str(today.get('precip', '')) pressure = str(today.get('pressure', '')) sunrise = today.get('sunrise', '') sunset = today.get('sunset', '') alerts = self._check_alerts_and_history(today, history) if today else [] tips = self._generate_life_tips(today) if today else [] clothing = self._get_clothing_advice(int(t_min) if t_min.isdigit() else 0, int(t_max) if t_max.isdigit() else 0) if today else "" lines = [] lines.append(f"📍 {title}") lines.append(f"⏰ {update_hm}") lines.append("") lines.append(f"🌡️ {t_now}℃ (体感{feels}℃) | {text_now}") lines.append(f"💧 湿度{humidity}% | 🌬️ {wind_dir} {wind_scale}级 | 👀 能见度{vis}km") if today: lines.append(f"🗓️ 今日 {text_day} | {t_min}~{t_max}℃") lines.append(f"☀️ 日出{sunrise} | 🌄 日落{sunset}") lines.append(f"🔆 紫外线{uv} | 🌧️ 降水{precip}mm | ⏱️ 气压{pressure}hPa") if alerts: lines.append("") lines.append("⚠️ 特别关注") for a in alerts[:3]: lines.append(f"- {a}") if tips: lines.append("") lines.append("😅 温馨提示") for t in tips[:3]: lines.append(f"- {t}") if clothing: lines.append("") lines.append(f"👔 穿衣:{clothing}") if daily: lines.append("") lines.append("📈 未来3天") for day in daily[0:3]: d = str(day.get('fxDate',''))[5:] lines.append(f"- {d} {day.get('textDay','')} | {day.get('tempMin','')}~{day.get('tempMax','')}℃") lines.append("") return "\n".join(lines) def _extract_city_name(self, content: str) -> str: content = content.replace(" ", "") if content.startswith("天气"): return content[2:] elif content.endswith("天气"): return content[:-2] return ""