Files
2025-12-03 15:48:44 +08:00

360 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
天气预报插件
支持三种触发方式:
1. 直接发送"天气",使用用户在签到插件中设置的城市
2. 发送"天气 城市名",查询指定城市
3. AI聊天插件通过函数调用触发
"""
import tomllib
import aiohttp
from pathlib import Path
from typing import Optional, List, Dict
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message
from WechatHook import WechatHookClient
class WeatherPlugin(PluginBase):
"""天气预报插件"""
description = "天气预报插件 - 查询城市天气信息"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
self.signin_plugin = None # 签到插件引用
async def async_init(self):
"""异步初始化"""
# 读取配置
config_path = Path(__file__).parent / "config.toml"
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
# 获取签到插件引用(用于获取用户城市信息)
try:
from utils.plugin_manager import PluginManager
plugins = PluginManager().plugins
if "SignInPlugin" in plugins:
self.signin_plugin = plugins["SignInPlugin"]
logger.success("天气插件已关联签到插件")
else:
logger.warning("未找到签到插件,将无法使用用户设置的城市")
except Exception as e:
logger.error(f"获取签到插件失败: {e}")
logger.success("天气预报插件初始化完成")
def get_user_city(self, wxid: str) -> Optional[str]:
"""从签到插件获取用户城市信息"""
if not self.signin_plugin:
logger.warning(f"签到插件未关联,无法获取用户 {wxid} 的城市信息")
return None
try:
logger.info(f"正在从签到插件获取用户 {wxid} 的城市信息")
user_info = self.signin_plugin.get_user_info(wxid)
logger.info(f"获取到用户信息: {user_info}")
if user_info and user_info.get("city"):
city = user_info["city"]
logger.success(f"成功获取用户 {wxid} 的城市: {city}")
return city
else:
logger.warning(f"用户 {wxid} 的城市信息为空")
return None
except Exception as e:
logger.error(f"获取用户城市信息失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return None
async def fetch_weather(self, city: str) -> Optional[dict]:
"""
调用天气API获取天气信息
Args:
city: 城市名称
Returns:
天气数据字典失败返回None
"""
weather_config = self.config["weather"]
api_url = weather_config["api_url"]
api_key = weather_config["api_key"]
timeout = weather_config.get("timeout", 10)
params = {
"city": city,
"key": api_key
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
api_url,
params=params,
timeout=aiohttp.ClientTimeout(total=timeout)
) as resp:
if resp.status != 200:
logger.error(f"天气API返回错误状态码: {resp.status}")
return None
result = await resp.json()
if result.get("code") != 200:
logger.error(f"天气API返回错误: {result.get('msg')}")
return None
return result.get("data")
except aiohttp.ClientError as e:
logger.error(f"天气API请求失败: {e}")
return None
except Exception as e:
logger.error(f"获取天气信息异常: {e}")
return None
def format_weather_message(self, weather_data: dict) -> list:
"""
格式化天气信息为聊天记录格式的多条消息
Args:
weather_data: 天气数据
Returns:
格式化后的消息列表,每条消息对应一天的天气
"""
city = weather_data.get("city", "未知")
data_list = weather_data.get("data", [])
if not data_list:
return [{"title": f"🌤️ {city}天气预报", "content": "❌ 天气数据为空"}]
# 格式化所有天气数据为独立的消息
weather_messages = []
for day_data in data_list:
day_date = day_data.get("date", "")
day_temp = day_data.get("temperature", "")
day_weather = day_data.get("weather", "")
day_wind = day_data.get("wind", "")
day_air = day_data.get("air_quality", "")
# 每一天作为一条独立的消息
content = (
f"🌡️ 温度:{day_temp}\n"
f"☁️ 天气:{day_weather}\n"
f"💨 风力:{day_wind}\n"
f"🌫️ 空气质量:{day_air}"
)
weather_messages.append({
"title": f"📅 {day_date}",
"content": content
})
return weather_messages
async def _send_weather_as_chat_record(self, bot, from_wxid: str, city: str, weather_data: dict):
"""将天气预报以聊天记录格式发送"""
try:
import uuid
import time
import hashlib
import xml.etree.ElementTree as ET
is_group = from_wxid.endswith("@chatroom")
# 格式化天气数据
weather_messages = self.format_weather_message(weather_data)
recordinfo = ET.Element("recordinfo")
info_el = ET.SubElement(recordinfo, "info")
info_el.text = f"🌤️ {city}天气预报"
is_group_el = ET.SubElement(recordinfo, "isChatRoom")
is_group_el.text = "1" if is_group else "0"
datalist = ET.SubElement(recordinfo, "datalist")
datalist.set("count", str(len(weather_messages)))
desc_el = ET.SubElement(recordinfo, "desc")
desc_el.text = f"{city}天气预报"
fromscene_el = ET.SubElement(recordinfo, "fromscene")
fromscene_el.text = "3"
for i, msg in enumerate(weather_messages):
di = ET.SubElement(datalist, "dataitem")
di.set("datatype", "1")
di.set("dataid", uuid.uuid4().hex)
src_local_id = str((int(time.time() * 1000) % 90000) + 10000)
new_msg_id = str(int(time.time() * 1000) + i)
create_time = str(int(time.time()) - len(weather_messages) + i)
ET.SubElement(di, "srcMsgLocalid").text = src_local_id
ET.SubElement(di, "sourcetime").text = time.strftime("%Y-%m-%d %H:%M", time.localtime(int(create_time)))
ET.SubElement(di, "fromnewmsgid").text = new_msg_id
ET.SubElement(di, "srcMsgCreateTime").text = create_time
ET.SubElement(di, "sourcename").text = msg["title"]
ET.SubElement(di, "sourceheadurl").text = ""
ET.SubElement(di, "datatitle").text = msg["content"]
ET.SubElement(di, "datadesc").text = msg["content"]
ET.SubElement(di, "datafmt").text = "text"
ET.SubElement(di, "ischatroom").text = "1" if is_group else "0"
dataitemsource = ET.SubElement(di, "dataitemsource")
ET.SubElement(dataitemsource, "hashusername").text = hashlib.sha256(from_wxid.encode("utf-8")).hexdigest()
record_xml = ET.tostring(recordinfo, encoding="unicode")
appmsg_parts = [
"<appmsg appid=\"\" sdkver=\"0\">",
f"<title>🌤️ {city}天气预报</title>",
f"<des>{city}天气预报</des>",
"<type>19</type>",
"<url>https://support.weixin.qq.com/cgi-bin/mmsupport-bin/readtemplate?t=page/favorite_record__w_unsupport</url>",
"<appattach><cdnthumbaeskey></cdnthumbaeskey><aeskey></aeskey></appattach>",
f"<recorditem><![CDATA[{record_xml}]]></recorditem>",
"<percent>0</percent>",
"</appmsg>"
]
appmsg_xml = "".join(appmsg_parts)
await bot._send_data_async(11214, {"to_wxid": from_wxid, "content": appmsg_xml})
logger.success(f"已发送天气预报聊天记录: {city}")
except Exception as e:
logger.error(f"发送天气预报聊天记录失败: {e}")
@on_text_message(priority=55)
async def handle_weather_query(self, bot: WechatHookClient, message: dict):
"""处理天气查询消息"""
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
sender_wxid = message.get("SenderWxid", "")
is_group = message.get("IsGroup", False)
# 获取实际发送者
user_wxid = sender_wxid if is_group else from_wxid
# 检查是否是天气查询关键词
keywords = self.config["weather"]["keywords"]
# 精确匹配关键词或"关键词 城市名"格式
is_weather_query = False
specified_city = None
for keyword in keywords:
if content == keyword:
# 精确匹配关键词
is_weather_query = True
break
elif content.startswith(f"{keyword} "):
# "关键词 城市名"格式
is_weather_query = True
parts = content.split(maxsplit=1)
if len(parts) == 2:
specified_city = parts[1].strip()
break
if not is_weather_query:
return True # 不是天气查询,继续传递
logger.info(f"用户 {user_wxid} 查询天气,指定城市: {specified_city}")
try:
# 确定要查询的城市
city = None
if specified_city:
# 用户指定了城市
city = specified_city
else:
# 尝试从签到插件获取用户城市
city = self.get_user_city(user_wxid)
if not city:
# 用户未设置城市,提示设置
await bot.send_text(from_wxid, self.config["messages"]["no_city"])
return False
# 获取天气信息
weather_data = await self.fetch_weather(city)
if not weather_data:
await bot.send_text(from_wxid, self.config["messages"]["api_error"])
return False
# 以聊天记录格式发送天气信息
await self._send_weather_as_chat_record(bot, from_wxid, city, weather_data)
logger.success(f"已发送天气信息: {city}")
except Exception as e:
logger.error(f"处理天气查询失败: {e}")
await bot.send_text(from_wxid, self.config["messages"]["api_error"])
return False # 已处理,停止传播
def get_llm_tools(self) -> List[dict]:
"""返回LLM工具定义供AIChat插件调用"""
return [
{
"type": "function",
"function": {
"name": "query_weather",
"description": "查询天气预报信息,包括温度、天气状况、风力和空气质量。当用户询问天气、气温、会不会下雨等天气相关问题时,应该调用此函数。如果用户没有指定城市,函数会自动使用用户之前设置的城市;如果用户指定了城市名称,则查询该城市的天气。",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "要查询的城市名称,例如:北京、上海、广州。如果用户没有明确指定城市,可以不传此参数,系统会自动使用用户设置的默认城市。"
}
},
"required": [] # city不是必需的可以使用用户设置的城市
}
}
}
]
async def execute_llm_tool(self, tool_name: str, arguments: dict, bot, from_wxid: str) -> dict:
"""执行LLM工具调用供AIChat插件调用"""
try:
if tool_name != "query_weather":
return {"success": False, "message": "未知的工具名称"}
# 从 arguments 中获取用户信息
user_wxid = arguments.get("user_wxid", from_wxid)
is_group = arguments.get("is_group", from_wxid.endswith("@chatroom"))
# 获取城市参数
city = arguments.get("city")
# 如果没有指定城市,尝试从签到插件获取
if not city:
city = self.get_user_city(user_wxid)
if not city:
# 用户未设置城市,提醒注册
await bot.send_text(from_wxid, self.config["messages"]["no_city"])
logger.info(f"用户 {user_wxid} 未设置城市,已提醒注册")
return {"success": True, "message": "已提醒用户设置城市"} # 工具执行成功,只是用户需要先设置城市
logger.info(f"AI调用天气查询: city={city}, user={user_wxid}")
# 获取天气信息
weather_data = await self.fetch_weather(city)
if not weather_data:
await bot.send_text(from_wxid, self.config["messages"]["api_error"])
return {"success": False, "message": "获取天气信息失败"}
# 以聊天记录格式发送天气信息
await self._send_weather_as_chat_record(bot, from_wxid, city, weather_data)
logger.success(f"AI工具调用成功已发送天气信息: {city}")
return {"success": True, "message": f"已查询{city}的天气信息"}
except Exception as e:
logger.error(f"LLM工具执行失败: {e}")
return {"success": False, "message": f"执行失败: {str(e)}"}