Files
abot/plugins/global_news/main.py
liuwei 7b6bd19781 feat: 引入LLM场景路由与后台拓扑管理能力
变更项:

1. 新增 llm.scenes 场景路由层,支持 scene->backend 统一映射,并补充默认场景配置。

2. 扩展 LLMRegistry,新增 scene 解析逻辑;当声明 scene 时强制按场景路由结果生效,保持旧 backend 配置兼容。

3. 扩展后台 /api/system/llm_config 读写能力,支持 scenes 配置保存;新增插件 LLM 依赖扫描与拓扑数据输出。

4. 升级 system_llm 页面:新增场景路由管理区、插件依赖拓扑表,支持可视化查看 插件->scene->backend->provider。

5. 迁移核心插件配置到 scene 模式(保留兼容字段):dify/global_news/game_task/message_summary/ai_auto_response/member_context/douyu。

6. 调整部分插件初始化默认 llm_config,补充 scene 字段,确保后台场景切换可直接生效。
2026-04-20 14:36:56 +08:00

229 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import threading
import time # 添加这一行
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from utils.markdown_to_image import convert_md_str_to_image
from utils.ai.unified_llm import UnifiedLLMClient
from wechat_ipad import WechatAPIClient
# 导入新闻抓取函数
from .news_crawler import nbc, cnn, abc, fox, bbc
class GlobalNewsPlugin(MessagePluginInterface):
"""全球新闻插件"""
# 功能权限常量
FEATURE_KEY = "GLOBAL_NEWS"
FEATURE_DESCRIPTION = "🌍 全球新闻功能 [全球新闻, 国际新闻, 环球新闻, 政经新闻]"
@property
def name(self) -> str:
return "全球新闻"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供全球政治经济新闻查询功能"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.bot: WechatAPIClient = None
self._news_tasks = {} # 存储正在进行的新闻抓取任务
# 注册功能权限
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self._commands = self._config.get("GlobalNews", {}).get("command",
["全球新闻", "国际新闻", "环球新闻", "政经新闻"])
self.command_format = self._config.get("GlobalNews", {}).get("command-format",
"全球新闻 - 获取最新的全球政治经济新闻")
plugin_config = self._config.get("GlobalNews", {})
self.enable = plugin_config.get("enable", True)
llm_config = plugin_config.get("llm", {}) or {}
if not llm_config:
llm_config = {
# 场景路由优先,便于后台统一切换新闻分析后端。
"scene": plugin_config.get("scene", ""),
"backend": plugin_config.get("backend", ""),
"provider": "dify",
"mode": "chat",
"authorization": plugin_config.get("authorization", ""),
"url": plugin_config.get("url", ""),
"response_mode": "blocking",
}
self.llm_client = UnifiedLLMClient(llm_config)
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="全球新闻")
@plugin_points_cost(1, "全球新闻消耗积分", FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# 生成唯一任务ID
task_id = f"{sender}_{roomid}_{int(time.time())}"
# 发送等待消息
await self.bot.send_text_message(
(roomid if roomid else sender), "🌍正在获取全球新闻,请稍候...", sender)
# 启动异步任务
self._start_news_task(task_id, sender, roomid)
return True, "新闻获取任务已启动"
def _start_news_task(self, task_id: str, sender: str, roomid: str):
"""启动异步新闻获取任务"""
thread = threading.Thread(
target=self._fetch_news_thread,
args=(task_id, sender, roomid)
)
thread.daemon = True
thread.start()
self._news_tasks[task_id] = thread
self.LOG.info(f"启动新闻获取任务: {task_id}")
async def _fetch_news_thread(self, task_id: str, sender: str, roomid: str):
"""在单独的线程中运行异步新闻获取任务"""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
news_result = loop.run_until_complete(self._fetch_news_async())
loop.close()
# 处理结果
if news_result:
# 发送新闻图片
receiver = roomid if roomid else sender
await self.bot.send_image_message(receiver, news_result)
await self.bot.send_text_message("🌍全球新闻获取完成!", receiver, sender)
else:
await self.bot.send_text_message(
(roomid if roomid else sender), "❌获取新闻失败,请稍后再试", sender)
except Exception as e:
self.LOG.error(f"新闻获取任务出错: {e}")
await self.bot.send_text_message((roomid if roomid else sender), f"❌获取新闻出错: {str(e)}",
sender)
finally:
# 清理任务
if task_id in self._news_tasks:
del self._news_tasks[task_id]
async def _fetch_news_async(self) -> str:
"""异步获取所有新闻源的新闻"""
try:
# 创建所有新闻源的任务
tasks = [
self._run_in_executor(nbc),
self._run_in_executor(cnn),
self._run_in_executor(abc),
self._run_in_executor(fox),
self._run_in_executor(bbc)
]
# 并行执行所有任务
results = await asyncio.gather(*tasks)
# 合并结果
news_titles = "\n".join(results)
# 使用AI分析新闻
markdown_news = await self._run_in_executor(self.analyze_news_titles, news_titles)
# 转换为图片
image_path = await self._run_in_executor(
convert_md_str_to_image, markdown_news, "news_output.png"
)
return image_path
except Exception as e:
self.LOG.error(f"异步获取新闻失败: {e}")
return ""
async def _run_in_executor(self, func, *args):
"""在线程池中运行同步函数"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args)
def analyze_news_titles(self, content: str) -> Optional[str]:
"""同步分析新闻标题,便于在线程池中复用。"""
response = self.llm_client.run(
prompt=content,
user="a-bot-global_news",
inputs={"query": content},
tag="global_news",
)
if not response:
self.LOG.error(f"新闻分析请求失败: {self.llm_client.last_error}")
return None
return response.get("text") or None