from loguru import logger import aiohttp import jieba import os import yaml from typing import Dict, Any, List, Optional, Tuple from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost from wechat_ipad import WechatAPIClient class WeatherPlugin(MessagePluginInterface): """天气查询插件""" @property def name(self) -> str: return "天气查询" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供天气查询功能,支持查询城市实时天气和未来天气预报" @property def author(self) -> str: return "fg" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands def __init__(self): super().__init__() self.plugin_dir = os.path.dirname(os.path.abspath(__file__)) self.config_path = os.path.join(self.plugin_dir, "config.yaml") def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self.message_util = context.get("message_util") self._commands = self._config.get("Weather", {}).get("command", ["天气"]) self.command_format = self._config.get("Weather", {}).get("command-format", """⚙️获取天气: 天气 城市名 天气城市名 城市名天气 城市名 天气""") self.enable = self._config.get("Weather", {}).get("enable", True) # 加载API配置 self.load_config() self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def load_config(self): """加载配置""" if os.path.exists(self.config_path): with open(self.config_path, 'r', encoding='utf-8') as f: self.weather_config = yaml.safe_load(f) else: # 默认配置 self.weather_config = { "API_KEY": "", "API_DOMAIN": "" } # 保存默认配置 with open(self.config_path, 'w', encoding='utf-8') as f: yaml.dump(self.weather_config, f, default_flow_style=False, allow_unicode=True) self.api_key = self.weather_config.get("API_KEY", "") self.api_domain = self.weather_config.get("API_DOMAIN", "") def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() roomid = message.get("roomid", "") # 只处理群消息且包含"天气"的消息 if not roomid or "天气" not in content: return False return True @plugin_stats_decorator(plugin_name="天气查询") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.info(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and gbm.get_group_permission(roomid, Feature.WEATHER) == PermissionStatus.DISABLED: return False, "没有权限" # 处理消息内容 content = content.replace(" ", "") command = list(jieba.cut(content)) if len(command) == 1: await bot.send_text_message((roomid if roomid else sender), f"\n{self.command_format}", sender) return False, "命令格式错误" elif len(command) > 3: return False, "命令格式错误" # 配置密钥检查 if not self.api_key: await bot.send_text_message((roomid if roomid else sender), "\n你还没配置天气API密钥!", sender) return False, "API密钥未配置" try: # 提取城市名 command.remove("天气") request_loc = "".join(command) # 获取天气信息 weather_info = await self._get_weather_info(request_loc) if not weather_info: await bot.send_text_message((roomid if roomid else sender), "\n⚠️查询天气失败!", sender) return False, "查询天气失败" # 发送天气信息 await bot.send_text_message((roomid if roomid else sender), weather_info, sender) return True, "发送成功" except Exception as e: self.LOG.error(f"处理天气请求出错: {e}") await bot.send_text_message((roomid if roomid else sender), f"\n⚠️处理出错: {str(e)}", sender) return False, f"处理出错: {e}" async def _get_weather_info(self, city_name: str) -> str: """获取天气信息""" try: # 查询城市信息 headers = { "X-QW-Api-Key": f'{self.api_key}' } params = { "location": city_name } geo_api_url = f"{self.api_domain}/geo/v2/city/lookup" conn_ssl = aiohttp.TCPConnector(ssl=False) async with aiohttp.request('GET', url=geo_api_url, connector=conn_ssl, headers=headers, params=params) as response: geoapi_json = await response.json() await conn_ssl.close() if geoapi_json['code'] == '404': return "\n⚠️查无此地!" elif geoapi_json['code'] != '200': return f"\n⚠️请求失败\n{geoapi_json}" country = geoapi_json["location"][0]["country"] adm1 = geoapi_json["location"][0]["adm1"] adm2 = geoapi_json["location"][0]["adm2"] city_id = geoapi_json["location"][0]["id"] # 请求现在天气api conn_ssl = aiohttp.TCPConnector(verify_ssl=False) now_weather_api_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}' async with aiohttp.request('GET', url=now_weather_api_url, connector=conn_ssl) as response: now_weather_api_json = await response.json() await conn_ssl.close() # 请求预报天气api conn_ssl = aiohttp.TCPConnector(verify_ssl=False) weather_forecast_api_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}' async with aiohttp.request('GET', url=weather_forecast_api_url, connector=conn_ssl) as response: weather_forecast_api_json = await response.json() await conn_ssl.close() # 组合天气信息 return self._compose_weather_message(country, adm1, adm2, now_weather_api_json, weather_forecast_api_json) except Exception as e: self.LOG.error(f"获取天气信息出错: {e}") return f"\n⚠️获取天气信息出错: {str(e)}" def _compose_weather_message(self, country, adm1, adm2, now_weather_api_json, weather_forecast_api_json): """组合天气信息""" update_time = now_weather_api_json['updateTime'] now_temperature = now_weather_api_json['now']['temp'] now_feelslike = now_weather_api_json['now']['feelsLike'] now_weather = now_weather_api_json['now']['text'] now_wind_direction = now_weather_api_json['now']['windDir'] now_wind_scale = now_weather_api_json['now']['windScale'] now_humidity = now_weather_api_json['now']['humidity'] now_precip = now_weather_api_json['now']['precip'] now_visibility = now_weather_api_json['now']['vis'] now_uvindex = weather_forecast_api_json['daily'][0]['uvIndex'] message = ( f"-----fgwx-----\n" f"{country}{adm1}{adm2} 实时天气☁️\n" f"⏰更新时间:{update_time}\n\n" f"🌡️当前温度:{now_temperature}℃\n" f"🌡️体感温度:{now_feelslike}℃\n" f"☁️天气:{now_weather}\n" f"☀️紫外线指数:{now_uvindex}\n" f"🌬️风向:{now_wind_direction}\n" f"🌬️风力:{now_wind_scale}级\n" f"💦湿度:{now_humidity}%\n" f"🌧️降水量:{now_precip}mm/h\n" f"👀能见度:{now_visibility}km\n\n" f"☁️未来3天 {adm2} 天气:\n" ) for day in weather_forecast_api_json['daily'][1:4]: date = '.'.join([i.lstrip('0') for i in day['fxDate'].split('-')[1:]]) weather = day['textDay'] max_temp = day['tempMax'] min_temp = day['tempMin'] uv_index = day['uvIndex'] message += f'{date} {weather} 最高🌡️{max_temp}℃ 最低🌡️{min_temp}℃ ☀️紫外线:{uv_index}\n' return message