from loguru import logger import aiohttp import os from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost from wechat_ipad import WechatAPIClient class WeatherPlugin(MessagePluginInterface): """天气查询插件""" # 功能权限常量 FEATURE_KEY = "WEATHER" FEATURE_DESCRIPTION = "🌤️ 天气查询 [上海天气, 天气上海]" @property def name(self) -> str: return "天气查询" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供天气查询功能,支持查询城市实时天气和未来天气预报" @property def author(self) -> str: return "fg" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.plugin_dir = os.path.dirname(os.path.abspath(__file__)) # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") # 从TOML配置文件加载配置 self._commands = self._config.get("Weather", {}).get("command", ["天气"]) self.command_format = self._config.get("Weather", {}).get("command-format", """⚙️获取天气: 天气 城市名 天气城市名 城市名天气 城市名 天气""") self.enable = self._config.get("Weather", {}).get("enable", True) # 加载API配置 self.api_key = self._config.get("Weather", {}).get("API_KEY", "") self.api_domain = self._config.get("Weather", {}).get("API_DOMAIN", "") self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() if content.startswith("天气") or content.endswith("天气"): return True command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="天气查询") @plugin_points_cost(1, "天气查询消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" # 处理消息内容 - 不再使用jieba分词 city_name = self._extract_city_name(content) self.LOG.debug(f"城市名称:{city_name}") if not city_name: # await bot.send_text_message((roomid if roomid else sender), f"\n{self.command_format}", sender) return False, "命令格式错误" # 配置密钥检查 if not self.api_key: await bot.send_text_message((roomid if roomid else sender), "\n你还没配置天气API密钥!请在config.toml中配置Weather.API_KEY", sender) return False, "API密钥未配置" try: # 获取天气信息 weather_info = await self._get_weather_info(city_name) if not weather_info: self.LOG.debug("\n⚠️查询天气失败!") return False, "查询天气失败" # 发送天气信息 await bot.send_text_message((roomid if roomid else sender), weather_info, sender) return True, "发送成功" except Exception as e: self.LOG.error(f"处理天气请求出错: {e}") # await bot.send_text_message((roomid if roomid else sender), f"\n⚠️处理出错: {str(e)}", sender) return False, f"处理出错: {e}" def _extract_city_name(self, content: str) -> str: """提取城市名称,替代jieba分词""" # 去除空格 content = content.replace(" ", "") # 处理几种常见格式 if content.startswith("天气"): # 格式: "天气北京" return content[2:] elif content.endswith("天气"): # 格式: "北京天气" return content[:-2] # else: # # 尝试分离"天气"和城市名 # parts = content.split("天气") # if len(parts) == 2: # # 选择非空的部分作为城市名 # return parts[0] if parts[0] else parts[1] # return "" async def _get_weather_info(self, city_name: str) -> str: """获取天气信息""" try: # 查询城市信息 headers = { "X-QW-Api-Key": f'{self.api_key}' } params = { "location": city_name } geo_api_url = f"{self.api_domain}/geo/v2/city/lookup" conn_ssl = aiohttp.TCPConnector(ssl=False) async with aiohttp.request('GET', url=geo_api_url, connector=conn_ssl, headers=headers, params=params) as response: geoapi_json = await response.json() await conn_ssl.close() if 'code' not in geoapi_json: self.LOG.debug(f"未查询到城市信息:{city_name},{geoapi_json}") return "" if geoapi_json['code'] == '404': return "" elif geoapi_json['code'] != '200': return "" country = geoapi_json["location"][0]["country"] adm1 = geoapi_json["location"][0]["adm1"] adm2 = geoapi_json["location"][0]["adm2"] city_id = geoapi_json["location"][0]["id"] # 请求现在天气api conn_ssl = aiohttp.TCPConnector(verify_ssl=False) now_weather_api_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}' async with aiohttp.request('GET', url=now_weather_api_url, connector=conn_ssl) as response: now_weather_api_json = await response.json() await conn_ssl.close() # 请求预报天气api conn_ssl = aiohttp.TCPConnector(verify_ssl=False) weather_forecast_api_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}' async with aiohttp.request('GET', url=weather_forecast_api_url, connector=conn_ssl) as response: weather_forecast_api_json = await response.json() await conn_ssl.close() # 组合天气信息 return self._compose_weather_message(country, adm1, adm2, now_weather_api_json, weather_forecast_api_json) except Exception as e: self.LOG.error(f"获取天气信息出错: {e}") return "" def _compose_weather_message(self, country, adm1, adm2, now_weather_api_json, weather_forecast_api_json): """组合天气信息""" update_time = now_weather_api_json['updateTime'] now_temperature = now_weather_api_json['now']['temp'] now_feelslike = now_weather_api_json['now']['feelsLike'] now_weather = now_weather_api_json['now']['text'] now_wind_direction = now_weather_api_json['now']['windDir'] now_wind_scale = now_weather_api_json['now']['windScale'] now_humidity = now_weather_api_json['now']['humidity'] now_precip = now_weather_api_json['now']['precip'] now_visibility = now_weather_api_json['now']['vis'] now_uvindex = weather_forecast_api_json['daily'][0]['uvIndex'] message = ( f"{country}{adm1}{adm2} 实时天气☁️\n" f"⏰更新时间:{update_time}\n\n" f"🌡️当前温度:{now_temperature}℃\n" f"🌡️体感温度:{now_feelslike}℃\n" f"☁️天气:{now_weather}\n" f"☀️紫外线指数:{now_uvindex}\n" f"🌬️风向:{now_wind_direction}\n" f"🌬️风力:{now_wind_scale}级\n" f"💦湿度:{now_humidity}%\n" f"🌧️降水量:{now_precip}mm/h\n" f"👀能见度:{now_visibility}km\n\n" f"☁️未来3天 {adm2} 天气:\n" ) for day in weather_forecast_api_json['daily'][1:4]: date = '.'.join([i.lstrip('0') for i in day['fxDate'].split('-')[1:]]) weather = day['textDay'] max_temp = day['tempMax'] min_temp = day['tempMin'] uv_index = day['uvIndex'] message += f'{date} {weather} 最高🌡️{max_temp}℃ 最低🌡️{min_temp}℃ ☀️紫外线:{uv_index}\n' return message