""" 联网搜索插件 支持命令触发和LLM工具调用 """ import asyncio import tomllib import aiohttp from pathlib import Path from typing import List, Optional from loguru import logger from utils.plugin_base import PluginBase from utils.decorators import on_text_message from WechatHook import WechatHookClient class WebSearch(PluginBase): """联网搜索插件""" description = "联网搜索插件 - 支持实时信息查询和LLM工具调用" author = "ShiHao" version = "1.0.0" def __init__(self): super().__init__() self.config = None async def async_init(self): """异步初始化""" config_path = Path(__file__).parent / "config.toml" with open(config_path, "rb") as f: self.config = tomllib.load(f) logger.success(f"联网搜索插件初始化完成") async def search(self, query: str, max_results: int = None) -> dict: """ 执行搜索 Args: query: 搜索关键词 max_results: 最大结果数量 Returns: {"success": bool, "results": List[dict], "message": str} """ api_config = self.config["api"] behavior_config = self.config["behavior"] if max_results is None: max_results = behavior_config["max_results"] try: url = f"{api_config['base_url']}/search" params = { "q": query, "format": "json", "language": api_config["language"] } logger.info(f"搜索请求: {query}") async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=api_config["timeout"])) as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() results = data.get("results", []) if not results: return { "success": False, "results": [], "message": "未找到相关结果" } # 限制结果数量 results = results[:max_results] logger.success(f"搜索成功,找到 {len(results)} 条结果") return { "success": True, "results": results, "message": f"找到 {len(results)} 条结果" } else: error_text = await response.text() logger.error(f"搜索API请求失败: {response.status}, {error_text[:200]}") return { "success": False, "results": [], "message": f"搜索失败: HTTP {response.status}" } except asyncio.TimeoutError: logger.warning(f"搜索请求超时") return { "success": False, "results": [], "message": "搜索超时,请稍后重试" } except Exception as e: logger.error(f"搜索异常: {e}") return { "success": False, "results": [], "message": f"搜索失败: {str(e)}" } def format_results(self, results: List[dict], max_results: int = 5) -> str: """ 格式化搜索结果为美化文本 Args: results: 搜索结果列表 max_results: 最大显示数量 Returns: 格式化后的文本 """ if not results: return "❌ 未找到相关结果" output = f"🔍 搜索结果(共 {len(results)} 条)\n" output += "━━━━━━━━━━━━━━━━━━━━\n\n" for i, result in enumerate(results[:max_results], 1): title = result.get("title", "无标题") url = result.get("url", "") content = result.get("content", "") # 截断过长的内容 if len(content) > 120: content = content[:120] + "..." output += f"📌 {i}. {title}\n" if content: output += f"💬 {content}\n" output += f"🔗 {url}\n" if i < min(len(results), max_results): output += "\n" + "─" * 30 + "\n\n" return output.strip() async def send_results_as_chat_record(self, bot: WechatHookClient, to_wxid: str, query: str, results: List[dict], max_results: int = 5): """ 以聊天记录格式发送搜索结果 Args: bot: 机器人客户端 to_wxid: 接收者 wxid query: 搜索关键词 results: 搜索结果列表 max_results: 最大发送数量 """ if not results: await bot.send_text(to_wxid, "❌ 未找到相关结果") return # 构建聊天记录内容(单个字符串) content_lines = [] for i, result in enumerate(results[:max_results], 1): title = result.get("title", "无标题") url = result.get("url", "") desc = result.get("content", "") # 截断过长的描述 if len(desc) > 150: desc = desc[:150] + "..." # 格式化每条结果 content_lines.append(f"📌 {i}. {title}") if desc: content_lines.append(f"💬 {desc}") content_lines.append(f"🔗 {url}") content_lines.append("") # 空行分隔 # 合并为单个字符串 content = "\n".join(content_lines) # 使用聊天记录格式发送 title = f"🔍 搜索:{query}" await self._send_chat_records(bot, to_wxid, title, content) async def _send_chat_records(self, bot, from_wxid: str, title: str, content: str): """发送聊天记录格式消息""" try: import uuid import time import hashlib import xml.etree.ElementTree as ET is_group = from_wxid.endswith("@chatroom") # 自动分割内容(与总结插件相同) max_length = 800 content_parts = [] if len(content) <= max_length: content_parts = [content] else: lines = content.split('\n') current_part = "" for line in lines: if len(current_part + line + '\n') > max_length: if current_part: content_parts.append(current_part.strip()) current_part = line + '\n' else: content_parts.append(line[:max_length]) current_part = line[max_length:] + '\n' else: current_part += line + '\n' if current_part.strip(): content_parts.append(current_part.strip()) recordinfo = ET.Element("recordinfo") info_el = ET.SubElement(recordinfo, "info") info_el.text = title is_group_el = ET.SubElement(recordinfo, "isChatRoom") is_group_el.text = "1" if is_group else "0" datalist = ET.SubElement(recordinfo, "datalist") datalist.set("count", str(len(content_parts))) desc_el = ET.SubElement(recordinfo, "desc") desc_el.text = title fromscene_el = ET.SubElement(recordinfo, "fromscene") fromscene_el.text = "3" for i, part in enumerate(content_parts): di = ET.SubElement(datalist, "dataitem") di.set("datatype", "1") di.set("dataid", uuid.uuid4().hex) src_local_id = str((int(time.time() * 1000) % 90000) + 10000) new_msg_id = str(int(time.time() * 1000) + i) create_time = str(int(time.time()) - len(content_parts) + i) ET.SubElement(di, "srcMsgLocalid").text = src_local_id ET.SubElement(di, "sourcetime").text = time.strftime("%Y-%m-%d %H:%M", time.localtime(int(create_time))) ET.SubElement(di, "fromnewmsgid").text = new_msg_id ET.SubElement(di, "srcMsgCreateTime").text = create_time ET.SubElement(di, "sourcename").text = "搜索助手" ET.SubElement(di, "sourceheadurl").text = "" ET.SubElement(di, "datatitle").text = part ET.SubElement(di, "datadesc").text = part ET.SubElement(di, "datafmt").text = "text" ET.SubElement(di, "ischatroom").text = "1" if is_group else "0" dataitemsource = ET.SubElement(di, "dataitemsource") ET.SubElement(dataitemsource, "hashusername").text = hashlib.sha256(from_wxid.encode("utf-8")).hexdigest() record_xml = ET.tostring(recordinfo, encoding="unicode") appmsg_parts = [ "", f"{title}", f"{title}", "19", "https://support.weixin.qq.com/cgi-bin/mmsupport-bin/readtemplate?t=page/favorite_record__w_unsupport", "", f"", "0", "" ] appmsg_xml = "".join(appmsg_parts) await bot._send_data_async(11214, {"to_wxid": from_wxid, "content": appmsg_xml}) logger.success(f"已发送聊天记录: {title}") except Exception as e: logger.error(f"发送聊天记录失败: {e}") @on_text_message(priority=70) async def handle_message(self, bot: WechatHookClient, message: dict): """处理文本消息""" if not self.config["behavior"]["enable_command"]: return True content = message.get("Content", "").strip() from_wxid = message.get("FromWxid", "") is_group = message.get("IsGroup", False) # 检查群聊/私聊开关 if is_group and not self.config["behavior"]["enable_group"]: return True if not is_group and not self.config["behavior"]["enable_private"]: return True # 检查是否是搜索命令(精确匹配命令+空格+关键词) keywords = self.config["behavior"]["command_keywords"] matched_keyword = None for keyword in keywords: if content.startswith(keyword + " "): matched_keyword = keyword break if not matched_keyword: return True # 提取搜索关键词 query = content[len(matched_keyword):].strip() if not query: await bot.send_text(from_wxid, "❌ 请提供搜索关键词\n用法: /搜 <关键词>") return False logger.info(f"收到搜索请求: {query}") # 发送处理中提示 await bot.send_text(from_wxid, "🔍 正在搜索,请稍候...") try: # 执行搜索 result = await self.search(query) if result["success"]: # 根据配置选择发送方式 send_as_cards = self.config["behavior"].get("send_as_cards", True) if send_as_cards: # 以聊天记录形式发送 await self.send_results_as_chat_record(bot, from_wxid, query, result["results"]) else: # 以格式化文本形式发送 formatted_text = self.format_results(result["results"]) await bot.send_text(from_wxid, formatted_text) logger.success(f"搜索成功,已发送结果") else: await bot.send_text(from_wxid, f"❌ {result['message']}") except Exception as e: logger.error(f"搜索处理失败: {e}") await bot.send_text(from_wxid, f"❌ 处理失败: {str(e)}") return False def get_llm_tools(self) -> List[dict]: """ 返回LLM工具定义 供AIChat插件调用 """ if not self.config["llm_tool"]["enabled"]: return [] return [{ "type": "function", "function": { "name": self.config["llm_tool"]["tool_name"], "description": self.config["llm_tool"]["tool_description"], "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "搜索关键词,描述想要查询的内容" }, "max_results": { "type": "integer", "description": "返回的最大结果数量,默认5条", "default": 5 } }, "required": ["query"] } } }] async def execute_llm_tool(self, tool_name: str, arguments: dict, bot: WechatHookClient, from_wxid: str) -> dict: """ 执行LLM工具调用 供AIChat插件调用 Returns: {"success": bool, "message": str, "results": List[dict]} """ expected_tool_name = self.config["llm_tool"]["tool_name"] logger.info(f"WebSearch工具检查: 收到={tool_name}, 期望={expected_tool_name}") if tool_name != expected_tool_name: return None # 不是本插件的工具,返回None让其他插件处理 try: query = arguments.get("query") if not query: return {"success": False, "message": "缺少搜索关键词参数"} max_results = arguments.get("max_results", 5) logger.info(f"LLM工具调用搜索: {query}") # 执行搜索 result = await self.search(query, max_results) if result["success"]: # 直接发送聊天记录,不经过 AI 总结 await self.send_results_as_chat_record(bot, from_wxid, query, result["results"], max_results) # 返回简短说明给 AI return { "success": True, "message": f"已为用户搜索「{query}」并发送了 {len(result['results'])} 条搜索结果的聊天记录卡片。", "no_reply": True # 不需要 AI 再回复 } else: return { "success": False, "message": result["message"] } except Exception as e: logger.error(f"LLM工具执行失败: {e}") return {"success": False, "message": f"执行失败: {str(e)}"}