Files
abot/plugins/weather/main.py
2025-05-12 16:12:00 +08:00

247 lines
9.7 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
import aiohttp
import jieba
import os
import yaml
from typing import Dict, Any, List, Optional, Tuple
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
class WeatherPlugin(MessagePluginInterface):
"""天气查询插件"""
@property
def name(self) -> str:
return "天气查询"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供天气查询功能,支持查询城市实时天气和未来天气预报"
@property
def author(self) -> str:
return "fg"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.plugin_dir = os.path.dirname(os.path.abspath(__file__))
self.config_path = os.path.join(self.plugin_dir, "config.yaml")
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self._commands = self._config.get("Weather", {}).get("command", ["天气"])
self.command_format = self._config.get("Weather", {}).get("command-format", """⚙️获取天气:
天气 城市名
天气城市名
城市名天气
城市名 天气""")
self.enable = self._config.get("Weather", {}).get("enable", True)
# 加载API配置
self.load_config()
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def load_config(self):
"""加载配置"""
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
self.weather_config = yaml.safe_load(f)
else:
# 默认配置
self.weather_config = {
"API_KEY": "",
"API_DOMAIN": ""
}
# 保存默认配置
with open(self.config_path, 'w', encoding='utf-8') as f:
yaml.dump(self.weather_config, f, default_flow_style=False, allow_unicode=True)
self.api_key = self.weather_config.get("API_KEY", "")
self.api_domain = self.weather_config.get("API_DOMAIN", "")
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
roomid = message.get("roomid", "")
# 只处理群消息且包含"天气"的消息
if not roomid or "天气" not in content:
return False
return True
@plugin_stats_decorator(plugin_name="天气查询")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.WEATHER) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理消息内容
content = content.replace(" ", "")
command = list(jieba.cut(content))
if len(command) == 1:
await bot.send_text_message((roomid if roomid else sender), f"\n{self.command_format}", sender)
return False, "命令格式错误"
elif len(command) > 3:
return False, "命令格式错误"
# 配置密钥检查
if not self.api_key:
await bot.send_text_message((roomid if roomid else sender), "\n你还没配置天气API密钥", sender)
return False, "API密钥未配置"
try:
# 提取城市名
command.remove("天气")
request_loc = "".join(command)
# 获取天气信息
weather_info = await self._get_weather_info(request_loc)
if not weather_info:
await bot.send_text_message((roomid if roomid else sender), "\n⚠️查询天气失败!", sender)
return False, "查询天气失败"
# 发送天气信息
await bot.send_text_message((roomid if roomid else sender), weather_info, sender)
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理天气请求出错: {e}")
await bot.send_text_message((roomid if roomid else sender), f"\n⚠️处理出错: {str(e)}", sender)
return False, f"处理出错: {e}"
async def _get_weather_info(self, city_name: str) -> str:
"""获取天气信息"""
try:
# 查询城市信息
headers = {
"X-QW-Api-Key": f'{self.api_key}'
}
params = {
"location": city_name
}
geo_api_url = f"{self.api_domain}/geo/v2/city/lookup"
conn_ssl = aiohttp.TCPConnector(ssl=False)
async with aiohttp.request('GET', url=geo_api_url, connector=conn_ssl, headers=headers, params=params) as response:
geoapi_json = await response.json()
await conn_ssl.close()
if geoapi_json['code'] == '404':
return "\n⚠️查无此地!"
elif geoapi_json['code'] != '200':
return f"\n⚠️请求失败\n{geoapi_json}"
country = geoapi_json["location"][0]["country"]
adm1 = geoapi_json["location"][0]["adm1"]
adm2 = geoapi_json["location"][0]["adm2"]
city_id = geoapi_json["location"][0]["id"]
# 请求现在天气api
conn_ssl = aiohttp.TCPConnector(verify_ssl=False)
now_weather_api_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', url=now_weather_api_url, connector=conn_ssl) as response:
now_weather_api_json = await response.json()
await conn_ssl.close()
# 请求预报天气api
conn_ssl = aiohttp.TCPConnector(verify_ssl=False)
weather_forecast_api_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', url=weather_forecast_api_url, connector=conn_ssl) as response:
weather_forecast_api_json = await response.json()
await conn_ssl.close()
# 组合天气信息
return self._compose_weather_message(country, adm1, adm2, now_weather_api_json, weather_forecast_api_json)
except Exception as e:
self.LOG.error(f"获取天气信息出错: {e}")
return f"\n⚠️获取天气信息出错: {str(e)}"
def _compose_weather_message(self, country, adm1, adm2, now_weather_api_json, weather_forecast_api_json):
"""组合天气信息"""
update_time = now_weather_api_json['updateTime']
now_temperature = now_weather_api_json['now']['temp']
now_feelslike = now_weather_api_json['now']['feelsLike']
now_weather = now_weather_api_json['now']['text']
now_wind_direction = now_weather_api_json['now']['windDir']
now_wind_scale = now_weather_api_json['now']['windScale']
now_humidity = now_weather_api_json['now']['humidity']
now_precip = now_weather_api_json['now']['precip']
now_visibility = now_weather_api_json['now']['vis']
now_uvindex = weather_forecast_api_json['daily'][0]['uvIndex']
message = (
f"-----fgwx-----\n"
f"{country}{adm1}{adm2} 实时天气☁️\n"
f"⏰更新时间:{update_time}\n\n"
f"🌡️当前温度:{now_temperature}\n"
f"🌡️体感温度:{now_feelslike}\n"
f"☁️天气:{now_weather}\n"
f"☀️紫外线指数:{now_uvindex}\n"
f"🌬️风向:{now_wind_direction}\n"
f"🌬️风力:{now_wind_scale}\n"
f"💦湿度:{now_humidity}%\n"
f"🌧️降水量:{now_precip}mm/h\n"
f"👀能见度:{now_visibility}km\n\n"
f"未来3天 {adm2} 天气:\n"
)
for day in weather_forecast_api_json['daily'][1:4]:
date = '.'.join([i.lstrip('0') for i in day['fxDate'].split('-')[1:]])
weather = day['textDay']
max_temp = day['tempMax']
min_temp = day['tempMin']
uv_index = day['uvIndex']
message += f'{date} {weather} 最高🌡️{max_temp}℃ 最低🌡️{min_temp}℃ ☀️紫外线:{uv_index}\n'
return message