天气查询插件

This commit is contained in:
liuwei
2025-05-12 16:09:37 +08:00
parent a452facb95
commit dcd8765232
4 changed files with 268 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入WeatherPlugin类
from .main import WeatherPlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return WeatherPlugin()

View File

@@ -0,0 +1,10 @@
[Weather]
enable = true
command = ["天气"]
command-format = """
⚙️获取天气:
天气 城市名
天气城市名
城市名天气
城市名 天气
"""

View File

@@ -0,0 +1,3 @@
# 和风天气API配置
API_KEY: "32e692f94e364105808ea2c0302ef162" # 请填写您的和风天气API密钥
API_DOMAIN: "https://mx564thnfv.re.qweatherapi.com" # API域名请根据实际情况修改

248
plugins/weather/main.py Normal file
View File

@@ -0,0 +1,248 @@
from loguru import logger
import aiohttp
import jieba
import os
import yaml
from typing import Dict, Any, List, Optional, Tuple
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
class WeatherPlugin(MessagePluginInterface):
"""天气查询插件"""
@property
def name(self) -> str:
return "天气查询"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供天气查询功能,支持查询城市实时天气和未来天气预报"
@property
def author(self) -> str:
return "fg"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.plugin_dir = os.path.dirname(os.path.abspath(__file__))
self.config_path = os.path.join(self.plugin_dir, "config.yaml")
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self._commands = self._config.get("Weather", {}).get("command", ["天气"])
self.command_format = self._config.get("Weather", {}).get("command-format", """⚙️获取天气:
天气 城市名
天气城市名
城市名天气
城市名 天气""")
self.enable = self._config.get("Weather", {}).get("enable", True)
# 加载API配置
self.load_config()
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def load_config(self):
"""加载配置"""
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
self.weather_config = yaml.safe_load(f)
else:
# 默认配置
self.weather_config = {
"API_KEY": "",
"API_DOMAIN": ""
}
# 保存默认配置
with open(self.config_path, 'w', encoding='utf-8') as f:
yaml.dump(self.weather_config, f, default_flow_style=False, allow_unicode=True)
self.api_key = self.weather_config.get("API_KEY", "")
self.api_domain = self.weather_config.get("API_DOMAIN", "")
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
roomid = message.get("roomid", "")
# 只处理群消息且包含"天气"的消息
if not roomid or "天气" not in content:
return False
return True
@plugin_stats_decorator(plugin_name="天气查询")
@plugin_points_cost(1, "天气查询消耗积分", Feature.UTILITY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.UTILITY) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理消息内容
content = content.replace(" ", "")
command = list(jieba.cut(content))
if len(command) == 1:
await bot.send_text_message((roomid if roomid else sender), f"\n{self.command_format}", sender)
return False, "命令格式错误"
elif len(command) > 3:
return False, "命令格式错误"
# 配置密钥检查
if not self.api_key:
await bot.send_text_message((roomid if roomid else sender), "\n你还没配置天气API密钥", sender)
return False, "API密钥未配置"
try:
# 提取城市名
command.remove("天气")
request_loc = "".join(command)
# 获取天气信息
weather_info = await self._get_weather_info(request_loc)
if not weather_info:
await bot.send_text_message((roomid if roomid else sender), "\n⚠️查询天气失败!", sender)
return False, "查询天气失败"
# 发送天气信息
await bot.send_text_message((roomid if roomid else sender), weather_info, sender)
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理天气请求出错: {e}")
await bot.send_text_message((roomid if roomid else sender), f"\n⚠️处理出错: {str(e)}", sender)
return False, f"处理出错: {e}"
async def _get_weather_info(self, city_name: str) -> str:
"""获取天气信息"""
try:
# 查询城市信息
headers = {
"X-QW-Api-Key": f'{self.api_key}'
}
params = {
"location": city_name
}
geo_api_url = f"{self.api_domain}/geo/v2/city/lookup"
conn_ssl = aiohttp.TCPConnector(ssl=False)
async with aiohttp.request('GET', url=geo_api_url, connector=conn_ssl, headers=headers, params=params) as response:
geoapi_json = await response.json()
await conn_ssl.close()
if geoapi_json['code'] == '404':
return "\n⚠️查无此地!"
elif geoapi_json['code'] != '200':
return f"\n⚠️请求失败\n{geoapi_json}"
country = geoapi_json["location"][0]["country"]
adm1 = geoapi_json["location"][0]["adm1"]
adm2 = geoapi_json["location"][0]["adm2"]
city_id = geoapi_json["location"][0]["id"]
# 请求现在天气api
conn_ssl = aiohttp.TCPConnector(verify_ssl=False)
now_weather_api_url = f'{self.api_domain}/v7/weather/now?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', url=now_weather_api_url, connector=conn_ssl) as response:
now_weather_api_json = await response.json()
await conn_ssl.close()
# 请求预报天气api
conn_ssl = aiohttp.TCPConnector(verify_ssl=False)
weather_forecast_api_url = f'{self.api_domain}/v7/weather/7d?key={self.api_key}&location={city_id}'
async with aiohttp.request('GET', url=weather_forecast_api_url, connector=conn_ssl) as response:
weather_forecast_api_json = await response.json()
await conn_ssl.close()
# 组合天气信息
return self._compose_weather_message(country, adm1, adm2, now_weather_api_json, weather_forecast_api_json)
except Exception as e:
self.LOG.error(f"获取天气信息出错: {e}")
return f"\n⚠️获取天气信息出错: {str(e)}"
def _compose_weather_message(self, country, adm1, adm2, now_weather_api_json, weather_forecast_api_json):
"""组合天气信息"""
update_time = now_weather_api_json['updateTime']
now_temperature = now_weather_api_json['now']['temp']
now_feelslike = now_weather_api_json['now']['feelsLike']
now_weather = now_weather_api_json['now']['text']
now_wind_direction = now_weather_api_json['now']['windDir']
now_wind_scale = now_weather_api_json['now']['windScale']
now_humidity = now_weather_api_json['now']['humidity']
now_precip = now_weather_api_json['now']['precip']
now_visibility = now_weather_api_json['now']['vis']
now_uvindex = weather_forecast_api_json['daily'][0]['uvIndex']
message = (
f"-----fgwx-----\n"
f"{country}{adm1}{adm2} 实时天气☁️\n"
f"⏰更新时间:{update_time}\n\n"
f"🌡️当前温度:{now_temperature}\n"
f"🌡️体感温度:{now_feelslike}\n"
f"☁️天气:{now_weather}\n"
f"☀️紫外线指数:{now_uvindex}\n"
f"🌬️风向:{now_wind_direction}\n"
f"🌬️风力:{now_wind_scale}\n"
f"💦湿度:{now_humidity}%\n"
f"🌧️降水量:{now_precip}mm/h\n"
f"👀能见度:{now_visibility}km\n\n"
f"未来3天 {adm2} 天气:\n"
)
for day in weather_forecast_api_json['daily'][1:4]:
date = '.'.join([i.lstrip('0') for i in day['fxDate'].split('-')[1:]])
weather = day['textDay']
max_temp = day['tempMax']
min_temp = day['tempMin']
uv_index = day['uvIndex']
message += f'{date} {weather} 最高🌡️{max_temp}℃ 最低🌡️{min_temp}℃ ☀️紫外线:{uv_index}\n'
return message