""" 天气预报插件 支持三种触发方式: 1. 直接发送"天气",使用用户在签到插件中设置的城市 2. 发送"天气 城市名",查询指定城市 3. AI聊天插件通过函数调用触发 """ import tomllib import aiohttp from pathlib import Path from typing import Optional, List, Dict from loguru import logger from utils.plugin_base import PluginBase from utils.decorators import on_text_message from WechatHook import WechatHookClient class WeatherPlugin(PluginBase): """天气预报插件""" description = "天气预报插件 - 查询城市天气信息" author = "ShiHao" version = "1.0.0" def __init__(self): super().__init__() self.config = None self.signin_plugin = None # 签到插件引用 async def async_init(self): """异步初始化""" # 读取配置 config_path = Path(__file__).parent / "config.toml" with open(config_path, "rb") as f: self.config = tomllib.load(f) # 获取签到插件引用(用于获取用户城市信息) try: from utils.plugin_manager import PluginManager plugins = PluginManager().plugins if "SignInPlugin" in plugins: self.signin_plugin = plugins["SignInPlugin"] logger.success("天气插件已关联签到插件") else: logger.warning("未找到签到插件,将无法使用用户设置的城市") except Exception as e: logger.error(f"获取签到插件失败: {e}") logger.success("天气预报插件初始化完成") def get_user_city(self, wxid: str) -> Optional[str]: """从签到插件获取用户城市信息""" if not self.signin_plugin: logger.warning(f"签到插件未关联,无法获取用户 {wxid} 的城市信息") return None try: logger.info(f"正在从签到插件获取用户 {wxid} 的城市信息") user_info = self.signin_plugin.get_user_info(wxid) logger.info(f"获取到用户信息: {user_info}") if user_info and user_info.get("city"): city = user_info["city"] logger.success(f"成功获取用户 {wxid} 的城市: {city}") return city else: logger.warning(f"用户 {wxid} 的城市信息为空") return None except Exception as e: logger.error(f"获取用户城市信息失败: {e}") import traceback logger.error(f"详细错误: {traceback.format_exc()}") return None async def fetch_weather(self, city: str) -> Optional[dict]: """ 调用天气API获取天气信息 Args: city: 城市名称 Returns: 天气数据字典,失败返回None """ weather_config = self.config["weather"] api_url = weather_config["api_url"] api_key = weather_config["api_key"] timeout = weather_config.get("timeout", 10) params = { "city": city, "key": api_key } try: async with aiohttp.ClientSession() as session: async with session.get( api_url, params=params, timeout=aiohttp.ClientTimeout(total=timeout) ) as resp: if resp.status != 200: logger.error(f"天气API返回错误状态码: {resp.status}") return None result = await resp.json() if result.get("code") != 200: logger.error(f"天气API返回错误: {result.get('msg')}") return None return result.get("data") except aiohttp.ClientError as e: logger.error(f"天气API请求失败: {e}") return None except Exception as e: logger.error(f"获取天气信息异常: {e}") return None def format_weather_message(self, weather_data: dict) -> list: """ 格式化天气信息为聊天记录格式的多条消息 Args: weather_data: 天气数据 Returns: 格式化后的消息列表,每条消息对应一天的天气 """ city = weather_data.get("city", "未知") data_list = weather_data.get("data", []) if not data_list: return [{"title": f"🌤️ {city}天气预报", "content": "❌ 天气数据为空"}] # 格式化所有天气数据为独立的消息 weather_messages = [] for day_data in data_list: day_date = day_data.get("date", "") day_temp = day_data.get("temperature", "") day_weather = day_data.get("weather", "") day_wind = day_data.get("wind", "") day_air = day_data.get("air_quality", "") # 每一天作为一条独立的消息 content = ( f"🌡️ 温度:{day_temp}\n" f"☁️ 天气:{day_weather}\n" f"💨 风力:{day_wind}\n" f"🌫️ 空气质量:{day_air}" ) weather_messages.append({ "title": f"📅 {day_date}", "content": content }) return weather_messages async def _send_weather_as_chat_record(self, bot, from_wxid: str, city: str, weather_data: dict): """将天气预报以聊天记录格式发送""" try: import uuid import time import hashlib import xml.etree.ElementTree as ET is_group = from_wxid.endswith("@chatroom") # 格式化天气数据 weather_messages = self.format_weather_message(weather_data) recordinfo = ET.Element("recordinfo") info_el = ET.SubElement(recordinfo, "info") info_el.text = f"🌤️ {city}天气预报" is_group_el = ET.SubElement(recordinfo, "isChatRoom") is_group_el.text = "1" if is_group else "0" datalist = ET.SubElement(recordinfo, "datalist") datalist.set("count", str(len(weather_messages))) desc_el = ET.SubElement(recordinfo, "desc") desc_el.text = f"{city}天气预报" fromscene_el = ET.SubElement(recordinfo, "fromscene") fromscene_el.text = "3" for i, msg in enumerate(weather_messages): di = ET.SubElement(datalist, "dataitem") di.set("datatype", "1") di.set("dataid", uuid.uuid4().hex) src_local_id = str((int(time.time() * 1000) % 90000) + 10000) new_msg_id = str(int(time.time() * 1000) + i) create_time = str(int(time.time()) - len(weather_messages) + i) ET.SubElement(di, "srcMsgLocalid").text = src_local_id ET.SubElement(di, "sourcetime").text = time.strftime("%Y-%m-%d %H:%M", time.localtime(int(create_time))) ET.SubElement(di, "fromnewmsgid").text = new_msg_id ET.SubElement(di, "srcMsgCreateTime").text = create_time ET.SubElement(di, "sourcename").text = msg["title"] ET.SubElement(di, "sourceheadurl").text = "" ET.SubElement(di, "datatitle").text = msg["content"] ET.SubElement(di, "datadesc").text = msg["content"] ET.SubElement(di, "datafmt").text = "text" ET.SubElement(di, "ischatroom").text = "1" if is_group else "0" dataitemsource = ET.SubElement(di, "dataitemsource") ET.SubElement(dataitemsource, "hashusername").text = hashlib.sha256(from_wxid.encode("utf-8")).hexdigest() record_xml = ET.tostring(recordinfo, encoding="unicode") appmsg_parts = [ "", f"🌤️ {city}天气预报", f"{city}天气预报", "19", "https://support.weixin.qq.com/cgi-bin/mmsupport-bin/readtemplate?t=page/favorite_record__w_unsupport", "", f"", "0", "" ] appmsg_xml = "".join(appmsg_parts) await bot._send_data_async(11214, {"to_wxid": from_wxid, "content": appmsg_xml}) logger.success(f"已发送天气预报聊天记录: {city}") except Exception as e: logger.error(f"发送天气预报聊天记录失败: {e}") @on_text_message(priority=55) async def handle_weather_query(self, bot: WechatHookClient, message: dict): """处理天气查询消息""" content = message.get("Content", "").strip() from_wxid = message.get("FromWxid", "") sender_wxid = message.get("SenderWxid", "") is_group = message.get("IsGroup", False) # 获取实际发送者 user_wxid = sender_wxid if is_group else from_wxid # 检查是否是天气查询关键词 keywords = self.config["weather"]["keywords"] # 精确匹配关键词或"关键词 城市名"格式 is_weather_query = False specified_city = None for keyword in keywords: if content == keyword: # 精确匹配关键词 is_weather_query = True break elif content.startswith(f"{keyword} "): # "关键词 城市名"格式 is_weather_query = True parts = content.split(maxsplit=1) if len(parts) == 2: specified_city = parts[1].strip() break if not is_weather_query: return True # 不是天气查询,继续传递 logger.info(f"用户 {user_wxid} 查询天气,指定城市: {specified_city}") try: # 确定要查询的城市 city = None if specified_city: # 用户指定了城市 city = specified_city else: # 尝试从签到插件获取用户城市 city = self.get_user_city(user_wxid) if not city: # 用户未设置城市,提示设置 await bot.send_text(from_wxid, self.config["messages"]["no_city"]) return False # 获取天气信息 weather_data = await self.fetch_weather(city) if not weather_data: await bot.send_text(from_wxid, self.config["messages"]["api_error"]) return False # 以聊天记录格式发送天气信息 await self._send_weather_as_chat_record(bot, from_wxid, city, weather_data) logger.success(f"已发送天气信息: {city}") except Exception as e: logger.error(f"处理天气查询失败: {e}") await bot.send_text(from_wxid, self.config["messages"]["api_error"]) return False # 已处理,停止传播 def get_llm_tools(self) -> List[dict]: """返回LLM工具定义,供AIChat插件调用""" return [ { "type": "function", "function": { "name": "query_weather", "description": "查询天气预报信息,包括温度、天气状况、风力和空气质量。当用户询问天气、气温、会不会下雨等天气相关问题时,应该调用此函数。如果用户没有指定城市,函数会自动使用用户之前设置的城市;如果用户指定了城市名称,则查询该城市的天气。", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "要查询的城市名称,例如:北京、上海、广州。如果用户没有明确指定城市,可以不传此参数,系统会自动使用用户设置的默认城市。" } }, "required": [] # city不是必需的,可以使用用户设置的城市 } } } ] async def execute_llm_tool(self, tool_name: str, arguments: dict, bot, from_wxid: str) -> dict: """执行LLM工具调用,供AIChat插件调用""" try: if tool_name != "query_weather": return {"success": False, "message": "未知的工具名称"} # 从 arguments 中获取用户信息 user_wxid = arguments.get("user_wxid", from_wxid) is_group = arguments.get("is_group", from_wxid.endswith("@chatroom")) # 获取城市参数 city = arguments.get("city") # 如果没有指定城市,尝试从签到插件获取 if not city: city = self.get_user_city(user_wxid) if not city: # 用户未设置城市,提醒注册 await bot.send_text(from_wxid, self.config["messages"]["no_city"]) logger.info(f"用户 {user_wxid} 未设置城市,已提醒注册") return {"success": True, "message": "已提醒用户设置城市"} # 工具执行成功,只是用户需要先设置城市 logger.info(f"AI调用天气查询: city={city}, user={user_wxid}") # 获取天气信息 weather_data = await self.fetch_weather(city) if not weather_data: await bot.send_text(from_wxid, self.config["messages"]["api_error"]) return {"success": False, "message": "获取天气信息失败"} # 以聊天记录格式发送天气信息 await self._send_weather_as_chat_record(bot, from_wxid, city, weather_data) logger.success(f"AI工具调用成功,已发送天气信息: {city}") return {"success": True, "message": f"已查询{city}的天气信息"} except Exception as e: logger.error(f"LLM工具执行失败: {e}") return {"success": False, "message": f"执行失败: {str(e)}"}