Files
2025-12-03 15:48:44 +08:00

417 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
联网搜索插件
支持命令触发和LLM工具调用
"""
import asyncio
import tomllib
import aiohttp
from pathlib import Path
from typing import List, Optional
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message
from WechatHook import WechatHookClient
class WebSearch(PluginBase):
"""联网搜索插件"""
description = "联网搜索插件 - 支持实时信息查询和LLM工具调用"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.config = None
async def async_init(self):
"""异步初始化"""
config_path = Path(__file__).parent / "config.toml"
with open(config_path, "rb") as f:
self.config = tomllib.load(f)
logger.success(f"联网搜索插件初始化完成")
async def search(self, query: str, max_results: int = None) -> dict:
"""
执行搜索
Args:
query: 搜索关键词
max_results: 最大结果数量
Returns:
{"success": bool, "results": List[dict], "message": str}
"""
api_config = self.config["api"]
behavior_config = self.config["behavior"]
if max_results is None:
max_results = behavior_config["max_results"]
try:
url = f"{api_config['base_url']}/search"
params = {
"q": query,
"format": "json",
"language": api_config["language"]
}
logger.info(f"搜索请求: {query}")
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=api_config["timeout"])) as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
results = data.get("results", [])
if not results:
return {
"success": False,
"results": [],
"message": "未找到相关结果"
}
# 限制结果数量
results = results[:max_results]
logger.success(f"搜索成功,找到 {len(results)} 条结果")
return {
"success": True,
"results": results,
"message": f"找到 {len(results)} 条结果"
}
else:
error_text = await response.text()
logger.error(f"搜索API请求失败: {response.status}, {error_text[:200]}")
return {
"success": False,
"results": [],
"message": f"搜索失败: HTTP {response.status}"
}
except asyncio.TimeoutError:
logger.warning(f"搜索请求超时")
return {
"success": False,
"results": [],
"message": "搜索超时,请稍后重试"
}
except Exception as e:
logger.error(f"搜索异常: {e}")
return {
"success": False,
"results": [],
"message": f"搜索失败: {str(e)}"
}
def format_results(self, results: List[dict], max_results: int = 5) -> str:
"""
格式化搜索结果为美化文本
Args:
results: 搜索结果列表
max_results: 最大显示数量
Returns:
格式化后的文本
"""
if not results:
return "❌ 未找到相关结果"
output = f"🔍 搜索结果(共 {len(results)} 条)\n"
output += "━━━━━━━━━━━━━━━━━━━━\n\n"
for i, result in enumerate(results[:max_results], 1):
title = result.get("title", "无标题")
url = result.get("url", "")
content = result.get("content", "")
# 截断过长的内容
if len(content) > 120:
content = content[:120] + "..."
output += f"📌 {i}. {title}\n"
if content:
output += f"💬 {content}\n"
output += f"🔗 {url}\n"
if i < min(len(results), max_results):
output += "\n" + "" * 30 + "\n\n"
return output.strip()
async def send_results_as_chat_record(self, bot: WechatHookClient, to_wxid: str, query: str, results: List[dict], max_results: int = 5):
"""
以聊天记录格式发送搜索结果
Args:
bot: 机器人客户端
to_wxid: 接收者 wxid
query: 搜索关键词
results: 搜索结果列表
max_results: 最大发送数量
"""
if not results:
await bot.send_text(to_wxid, "❌ 未找到相关结果")
return
# 构建聊天记录内容(单个字符串)
content_lines = []
for i, result in enumerate(results[:max_results], 1):
title = result.get("title", "无标题")
url = result.get("url", "")
desc = result.get("content", "")
# 截断过长的描述
if len(desc) > 150:
desc = desc[:150] + "..."
# 格式化每条结果
content_lines.append(f"📌 {i}. {title}")
if desc:
content_lines.append(f"💬 {desc}")
content_lines.append(f"🔗 {url}")
content_lines.append("") # 空行分隔
# 合并为单个字符串
content = "\n".join(content_lines)
# 使用聊天记录格式发送
title = f"🔍 搜索:{query}"
await self._send_chat_records(bot, to_wxid, title, content)
async def _send_chat_records(self, bot, from_wxid: str, title: str, content: str):
"""发送聊天记录格式消息"""
try:
import uuid
import time
import hashlib
import xml.etree.ElementTree as ET
is_group = from_wxid.endswith("@chatroom")
# 自动分割内容(与总结插件相同)
max_length = 800
content_parts = []
if len(content) <= max_length:
content_parts = [content]
else:
lines = content.split('\n')
current_part = ""
for line in lines:
if len(current_part + line + '\n') > max_length:
if current_part:
content_parts.append(current_part.strip())
current_part = line + '\n'
else:
content_parts.append(line[:max_length])
current_part = line[max_length:] + '\n'
else:
current_part += line + '\n'
if current_part.strip():
content_parts.append(current_part.strip())
recordinfo = ET.Element("recordinfo")
info_el = ET.SubElement(recordinfo, "info")
info_el.text = title
is_group_el = ET.SubElement(recordinfo, "isChatRoom")
is_group_el.text = "1" if is_group else "0"
datalist = ET.SubElement(recordinfo, "datalist")
datalist.set("count", str(len(content_parts)))
desc_el = ET.SubElement(recordinfo, "desc")
desc_el.text = title
fromscene_el = ET.SubElement(recordinfo, "fromscene")
fromscene_el.text = "3"
for i, part in enumerate(content_parts):
di = ET.SubElement(datalist, "dataitem")
di.set("datatype", "1")
di.set("dataid", uuid.uuid4().hex)
src_local_id = str((int(time.time() * 1000) % 90000) + 10000)
new_msg_id = str(int(time.time() * 1000) + i)
create_time = str(int(time.time()) - len(content_parts) + i)
ET.SubElement(di, "srcMsgLocalid").text = src_local_id
ET.SubElement(di, "sourcetime").text = time.strftime("%Y-%m-%d %H:%M", time.localtime(int(create_time)))
ET.SubElement(di, "fromnewmsgid").text = new_msg_id
ET.SubElement(di, "srcMsgCreateTime").text = create_time
ET.SubElement(di, "sourcename").text = "搜索助手"
ET.SubElement(di, "sourceheadurl").text = ""
ET.SubElement(di, "datatitle").text = part
ET.SubElement(di, "datadesc").text = part
ET.SubElement(di, "datafmt").text = "text"
ET.SubElement(di, "ischatroom").text = "1" if is_group else "0"
dataitemsource = ET.SubElement(di, "dataitemsource")
ET.SubElement(dataitemsource, "hashusername").text = hashlib.sha256(from_wxid.encode("utf-8")).hexdigest()
record_xml = ET.tostring(recordinfo, encoding="unicode")
appmsg_parts = [
"<appmsg appid=\"\" sdkver=\"0\">",
f"<title>{title}</title>",
f"<des>{title}</des>",
"<type>19</type>",
"<url>https://support.weixin.qq.com/cgi-bin/mmsupport-bin/readtemplate?t=page/favorite_record__w_unsupport</url>",
"<appattach><cdnthumbaeskey></cdnthumbaeskey><aeskey></aeskey></appattach>",
f"<recorditem><![CDATA[{record_xml}]]></recorditem>",
"<percent>0</percent>",
"</appmsg>"
]
appmsg_xml = "".join(appmsg_parts)
await bot._send_data_async(11214, {"to_wxid": from_wxid, "content": appmsg_xml})
logger.success(f"已发送聊天记录: {title}")
except Exception as e:
logger.error(f"发送聊天记录失败: {e}")
@on_text_message(priority=70)
async def handle_message(self, bot: WechatHookClient, message: dict):
"""处理文本消息"""
if not self.config["behavior"]["enable_command"]:
return True
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
is_group = message.get("IsGroup", False)
# 检查群聊/私聊开关
if is_group and not self.config["behavior"]["enable_group"]:
return True
if not is_group and not self.config["behavior"]["enable_private"]:
return True
# 检查是否是搜索命令(精确匹配命令+空格+关键词)
keywords = self.config["behavior"]["command_keywords"]
matched_keyword = None
for keyword in keywords:
if content.startswith(keyword + " "):
matched_keyword = keyword
break
if not matched_keyword:
return True
# 提取搜索关键词
query = content[len(matched_keyword):].strip()
if not query:
await bot.send_text(from_wxid, "❌ 请提供搜索关键词\n用法: /搜 <关键词>")
return False
logger.info(f"收到搜索请求: {query}")
# 发送处理中提示
await bot.send_text(from_wxid, "🔍 正在搜索,请稍候...")
try:
# 执行搜索
result = await self.search(query)
if result["success"]:
# 根据配置选择发送方式
send_as_cards = self.config["behavior"].get("send_as_cards", True)
if send_as_cards:
# 以聊天记录形式发送
await self.send_results_as_chat_record(bot, from_wxid, query, result["results"])
else:
# 以格式化文本形式发送
formatted_text = self.format_results(result["results"])
await bot.send_text(from_wxid, formatted_text)
logger.success(f"搜索成功,已发送结果")
else:
await bot.send_text(from_wxid, f"{result['message']}")
except Exception as e:
logger.error(f"搜索处理失败: {e}")
await bot.send_text(from_wxid, f"❌ 处理失败: {str(e)}")
return False
def get_llm_tools(self) -> List[dict]:
"""
返回LLM工具定义
供AIChat插件调用
"""
if not self.config["llm_tool"]["enabled"]:
return []
return [{
"type": "function",
"function": {
"name": self.config["llm_tool"]["tool_name"],
"description": self.config["llm_tool"]["tool_description"],
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,描述想要查询的内容"
},
"max_results": {
"type": "integer",
"description": "返回的最大结果数量默认5条",
"default": 5
}
},
"required": ["query"]
}
}
}]
async def execute_llm_tool(self, tool_name: str, arguments: dict, bot: WechatHookClient, from_wxid: str) -> dict:
"""
执行LLM工具调用
供AIChat插件调用
Returns:
{"success": bool, "message": str, "results": List[dict]}
"""
expected_tool_name = self.config["llm_tool"]["tool_name"]
logger.info(f"WebSearch工具检查: 收到={tool_name}, 期望={expected_tool_name}")
if tool_name != expected_tool_name:
return None # 不是本插件的工具返回None让其他插件处理
try:
query = arguments.get("query")
if not query:
return {"success": False, "message": "缺少搜索关键词参数"}
max_results = arguments.get("max_results", 5)
logger.info(f"LLM工具调用搜索: {query}")
# 执行搜索
result = await self.search(query, max_results)
if result["success"]:
# 直接发送聊天记录,不经过 AI 总结
await self.send_results_as_chat_record(bot, from_wxid, query, result["results"], max_results)
# 返回简短说明给 AI
return {
"success": True,
"message": f"已为用户搜索「{query}」并发送了 {len(result['results'])} 条搜索结果的聊天记录卡片。",
"no_reply": True # 不需要 AI 再回复
}
else:
return {
"success": False,
"message": result["message"]
}
except Exception as e:
logger.error(f"LLM工具执行失败: {e}")
return {"success": False, "message": f"执行失败: {str(e)}"}