import asyncio import threading import time from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost from utils.markdown_to_image import convert_md_str_to_image from utils.ai.unified_llm import UnifiedLLMClient from wechat_ipad import WechatAPIClient # 导入新闻抓取函数 from .news_crawler import nbc, cnn, abc, fox, bbc class GlobalNewsPlugin(MessagePluginInterface): """全球新闻插件""" # 功能权限常量 FEATURE_KEY = "GLOBAL_NEWS" FEATURE_DESCRIPTION = "🌍 全球新闻功能 [全球新闻, 国际新闻, 环球新闻, 政经新闻]" @property def name(self) -> str: return "全球新闻" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供全球政治经济新闻查询功能" @property def author(self) -> str: return "liu.wei" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.bot: WechatAPIClient = None self._news_tasks = {} # 存储正在进行的新闻抓取任务 # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG.debug(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self._commands = self._config.get("GlobalNews", {}).get("command", ["全球新闻", "国际新闻", "环球新闻", "政经新闻"]) self.command_format = self._config.get("GlobalNews", {}).get("command-format", "全球新闻 - 获取最新的全球政治经济新闻") plugin_config = self._config.get("GlobalNews", {}) self.enable = plugin_config.get("enable", True) llm_config = plugin_config.get("llm", {}) or {} if not llm_config: # 严格场景路由:仅由 scene 决定使用哪个后端。 llm_config = { "scene": plugin_config.get("scene", ""), } self.llm_client = UnifiedLLMClient(llm_config) self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="全球新闻") @plugin_points_cost(1, "全球新闻消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" # 生成唯一任务ID task_id = f"{sender}_{roomid}_{int(time.time())}" # 发送等待消息 await self.bot.send_text_message( (roomid if roomid else sender), "🌍正在获取全球新闻,请稍候...", sender) # 启动异步任务 self._start_news_task(task_id, sender, roomid) return True, "新闻获取任务已启动" def _start_news_task(self, task_id: str, sender: str, roomid: str): """启动异步新闻获取任务""" thread = threading.Thread( target=self._fetch_news_thread, args=(task_id, sender, roomid) ) thread.daemon = True thread.start() self._news_tasks[task_id] = thread self.LOG.info(f"启动新闻获取任务: {task_id}") def _fetch_news_thread(self, task_id: str, sender: str, roomid: str): """在单独线程里执行新闻抓取主流程。 这里必须保持为同步函数: 1. `threading.Thread(target=...)` 只能直接执行普通可调用对象; 2. 之前把协程函数直接塞给 `target`,线程里只会得到一个未执行的 coroutine,任务实际上不会跑; 3. 现在在线程内部显式创建事件循环,再把异步抓取和发消息协程跑完,才能真正脱离主链路执行。 """ loop = asyncio.new_event_loop() try: asyncio.set_event_loop(loop) news_result = loop.run_until_complete(self._fetch_news_async()) # 处理结果 receiver = roomid if roomid else sender if news_result: # 在线程自有事件循环里把图片和完成提示真正发出去, # 避免这里只拿到 coroutine 对象却没有执行。 loop.run_until_complete(self.bot.send_image_message(receiver, news_result)) loop.run_until_complete(self.bot.send_text_message(receiver, "🌍全球新闻获取完成!", sender)) else: loop.run_until_complete(self.bot.send_text_message(receiver, "❌获取新闻失败,请稍后再试", sender)) except Exception as e: self.LOG.error(f"新闻获取任务出错: {e}") try: receiver = roomid if roomid else sender loop.run_until_complete(self.bot.send_text_message(receiver, f"❌获取新闻出错: {str(e)}", sender)) except Exception as send_error: self.LOG.error(f"新闻获取失败后的通知发送异常: {send_error}") finally: # 清理任务 if task_id in self._news_tasks: del self._news_tasks[task_id] try: loop.close() except Exception: pass async def _fetch_news_async(self) -> str: """异步获取所有新闻源的新闻""" try: # 创建所有新闻源的任务 tasks = [ self._run_in_executor(nbc), self._run_in_executor(cnn), self._run_in_executor(abc), self._run_in_executor(fox), self._run_in_executor(bbc) ] # 并行执行所有任务 results = await asyncio.gather(*tasks) # 合并结果 news_titles = "\n".join(results) # 使用AI分析新闻 markdown_news = await self._run_in_executor(self.analyze_news_titles, news_titles) # 转换为图片 image_path = await self._run_in_executor( convert_md_str_to_image, markdown_news, "news_output.png" ) return image_path except Exception as e: self.LOG.error(f"异步获取新闻失败: {e}") return "" async def _run_in_executor(self, func, *args): """在线程池中运行同步函数""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, func, *args) def analyze_news_titles(self, content: str) -> Optional[str]: """同步分析新闻标题,便于在线程池中复用。""" response = self.llm_client.run( prompt=content, user="a-bot-global_news", inputs={"query": content}, tag="global_news", ) if not response: self.LOG.error(f"新闻分析请求失败: {self.llm_client.last_error}") return None return response.get("text") or None