Files
abot/plugins/weather/main.py

241 lines
9.6 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
import aiohttp
import os
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
class WeatherPlugin(MessagePluginInterface):
"""天气查询插件"""
@property
def name(self) -> str:
return "天气查询"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供天气查询功能,支持查询城市实时天气和未来天气预报"
@property
def author(self) -> str:
return "fg"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.plugin_dir = os.path.dirname(os.path.abspath(__file__))
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 从TOML配置文件加载配置
self._commands = self._config.get("Weather", {}).get("command", ["天气"])
self.command_format = self._config.get("Weather", {}).get("command-format", """⚙️获取天气:
天气 城市名
天气城市名
城市名天气
城市名 天气""")
self.enable = self._config.get("Weather", {}).get("enable", True)
# 加载API配置
self.api_key = self._config.get("Weather", {}).get("API_KEY", "")
self.api_domain = self._config.get("Weather", {}).get("API_DOMAIN", "")
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
if content.startswith("天气") or content.endswith("天气"):
return True
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="天气查询")
@plugin_points_cost(2, "天气查询消耗积分", Feature.WEATHER)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and GroupBotManager.get_group_permission(roomid, Feature.WEATHER) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理消息内容 - 不再使用jieba分词
city_name = self._extract_city_name(content)
self.LOG.debug(f"城市名称:{city_name}")
if not city_name:
# await bot.send_text_message((roomid if roomid else sender), f"\n{self.command_format}", sender)
return False, "命令格式错误"
# 配置密钥检查
if not self.api_key:
await bot.send_text_message((roomid if roomid else sender),
"\n你还没配置天气API密钥请在config.toml中配置Weather.API_KEY", sender)
return False, "API密钥未配置"
try:
# 获取天气信息
weather_info = await self._get_weather_info(city_name)
if not weather_info:
self.LOG.debug("\n⚠️查询天气失败!")
return False, "查询天气失败"
# 发送天气信息
await bot.send_text_message((roomid if roomid else sender), weather_info, sender)
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理天气请求出错: {e}")
# await bot.send_text_message((roomid if roomid else sender), f"\n⚠处理出错: {str(e)}", sender)
return False, f"处理出错: {e}"
def _extract_city_name(self, content: str) -> str:
"""提取城市名称替代jieba分词"""
# 去除空格
content = content.replace(" ", "")
# 处理几种常见格式
if content.startswith("天气"):
# 格式: "天气北京"
return content[2:]
elif content.endswith("天气"):
# 格式: "北京天气"
return content[:-2]
# else:
# # 尝试分离"天气"和城市名
# parts = content.split("天气")
# if len(parts) == 2:
# # 选择非空的部分作为城市名
# return parts[0] if parts[0] else parts[1]
#
return ""
async def _get_weather_info(self, city_name: str) -> str:
"""获取天气信息"""
try:
# 查询城市信息
headers = {
"X-QW-Api-Key": f'{self.api_key}'
}
params = {
"location": city_name
}
geo_api_url = f"{self.api_domain}/geo/v2/city/lookup"
conn_ssl = aiohttp.TCPConnector(ssl=False)
async with aiohttp.request('GET', url=geo_api_url, connector=conn_ssl, headers=headers,
params=params) as response:
geoapi_json = await response.json()
await conn_ssl.close()
if 'code' not in geoapi_json:
self.LOG.debug(f"未查询到城市信息:{city_name},{geoapi_json}")
return ""
if geoapi_json['code'] == '404':
return ""
elif geoapi_json['code'] != '200':
return ""
country = geoapi_json["location"][0]["country"]
adm1 = geoapi_json["location"][0]["adm1"]
adm2 = geoapi_json["location"][0]["adm2"]
city_id = geoapi_json["location"][0]["id"]
# 请求现在天气api
conn_ssl = aiohttp.TCPConnector(verify_ssl=False)
now_weather_api_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', url=now_weather_api_url, connector=conn_ssl) as response:
now_weather_api_json = await response.json()
await conn_ssl.close()
# 请求预报天气api
conn_ssl = aiohttp.TCPConnector(verify_ssl=False)
weather_forecast_api_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', url=weather_forecast_api_url, connector=conn_ssl) as response:
weather_forecast_api_json = await response.json()
await conn_ssl.close()
# 组合天气信息
return self._compose_weather_message(country, adm1, adm2, now_weather_api_json, weather_forecast_api_json)
except Exception as e:
self.LOG.error(f"获取天气信息出错: {e}")
return ""
def _compose_weather_message(self, country, adm1, adm2, now_weather_api_json, weather_forecast_api_json):
"""组合天气信息"""
update_time = now_weather_api_json['updateTime']
now_temperature = now_weather_api_json['now']['temp']
now_feelslike = now_weather_api_json['now']['feelsLike']
now_weather = now_weather_api_json['now']['text']
now_wind_direction = now_weather_api_json['now']['windDir']
now_wind_scale = now_weather_api_json['now']['windScale']
now_humidity = now_weather_api_json['now']['humidity']
now_precip = now_weather_api_json['now']['precip']
now_visibility = now_weather_api_json['now']['vis']
now_uvindex = weather_forecast_api_json['daily'][0]['uvIndex']
message = (
f"{country}{adm1}{adm2} 实时天气☁️\n"
f"⏰更新时间:{update_time}\n\n"
f"🌡️当前温度:{now_temperature}\n"
f"🌡️体感温度:{now_feelslike}\n"
f"☁️天气:{now_weather}\n"
f"☀️紫外线指数:{now_uvindex}\n"
f"🌬️风向:{now_wind_direction}\n"
f"🌬️风力:{now_wind_scale}\n"
f"💦湿度:{now_humidity}%\n"
f"🌧️降水量:{now_precip}mm/h\n"
f"👀能见度:{now_visibility}km\n\n"
f"未来3天 {adm2} 天气:\n"
)
for day in weather_forecast_api_json['daily'][1:4]:
date = '.'.join([i.lstrip('0') for i in day['fxDate'].split('-')[1:]])
weather = day['textDay']
max_temp = day['tempMax']
min_temp = day['tempMin']
uv_index = day['uvIndex']
message += f'{date} {weather} 最高🌡️{max_temp}℃ 最低🌡️{min_temp}℃ ☀️紫外线:{uv_index}\n'
return message