from flask import Blueprint, render_template, jsonify, request, send_from_directory, current_app, Response from .auth import login_required from loguru import logger import os import time import subprocess from datetime import datetime import platform import psutil from collections import deque import gzip import json import yaml from utils.markdown_to_image import get_md2img_health_snapshot, warmup_md2img_browser_sync # 创建系统信息蓝图 system_bp = Blueprint('system', __name__) # 记录应用启动时间 APP_START_TIME = time.time() def _system_config_path() -> str: return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'config.yaml')) def _load_system_yaml() -> dict: config_path = _system_config_path() if not os.path.exists(config_path): return {} with open(config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) or {} def _save_system_yaml(config_obj: dict) -> None: config_path = _system_config_path() with open(config_path, 'w', encoding='utf-8') as f: yaml.safe_dump(config_obj, f, allow_unicode=True, sort_keys=False) @system_bp.route('/api_docs') @login_required def api_docs(): src = request.args.get('src') if not src: try: server = current_app.dashboard_server cfg = getattr(server.robot, "ipad_config", {}) or {} src = cfg.get("server_url", "http://127.0.0.1:8059/") except Exception: src = "http://127.0.0.1:8059/" return render_template('api_docs.html', src_url=src) @system_bp.route('/system_status') @login_required def system_status(): src = request.args.get('src') if not src: try: server = current_app.dashboard_server glances = getattr(server.robot, "config").glances if hasattr(server.robot, "config") else {} host = glances.get("host", "127.0.0.1") port = glances.get("port", 61208) src = f"http://{host}:{port}/" except Exception: src = "http://127.0.0.1:61208/" return render_template('system_status.html', src_url=src) @system_bp.route('/system_llm') @login_required def system_llm(): return render_template('system_llm.html') # 页面路由 @system_bp.route('/wx_logs') @login_required def wx_logs(): return render_template('wx_logs.html') # API路由 @system_bp.route('/api/system_info') @login_required def api_system_info(): try: # 获取系统信息 system_info = { "os": platform.system(), "os_version": platform.version(), "python_version": platform.python_version(), "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "disk_usage": psutil.disk_usage('/').percent, "uptime": time.time() - APP_START_TIME, # 使用应用启动时间计算运行时长 "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "open_files": len(psutil.Process(os.getpid()).open_files()) } return jsonify({"success": True, "data": system_info}) except Exception as e: logger.error(f"获取系统信息失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @system_bp.route('/api/wx_logs') @login_required def api_wx_logs(): try: log_type = request.args.get('type', 'info') # 默认显示info日志 lines = request.args.get('lines', 100, type=int) # 默认显示最后100行 # 修正日志文件路径计算,获取项目根目录 project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..','logs')) if log_type == 'error': log_file = os.path.join(project_root, 'wx_error.log') elif log_type == 'debug': log_file = os.path.join(project_root, 'wx_debug.log') else: log_file = os.path.join(project_root, 'wx_info.log') log_content = [] if os.path.exists(log_file): try: chunk_size = 8192 with open(log_file, 'rb') as f: f.seek(0, os.SEEK_END) size = f.tell() buffer = b"" pos = size while pos > 0 and buffer.count(b'\n') <= lines: read_size = chunk_size if pos >= chunk_size else pos pos -= read_size f.seek(pos) buffer = f.read(read_size) + buffer log_content = [b.decode('utf-8', errors='ignore') for b in buffer.splitlines()[-lines:]] except Exception as e: logger.error(f"高效读取日志失败,回退到常规方式: {e}") with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: log_content = list(deque(f, lines)) else: logger.warning(f"日志文件不存在: {log_file}") # 尝试列出项目根目录下的所有日志文件,帮助调试 try: all_files = [f for f in os.listdir(project_root) if f.endswith('.log')] logger.info(f"项目根目录下的日志文件: {all_files}") except Exception as e: logger.error(f"列出目录文件失败: {e}") payload = { "success": True, "data": { "log_type": log_type, "log_file": log_file, "content": log_content, "lines": len(log_content) } } accept = request.headers.get('Accept-Encoding', '') if 'gzip' in accept.lower(): body = json.dumps(payload, ensure_ascii=False).encode('utf-8') gz = gzip.compress(body, compresslevel=6) resp = Response(gz, mimetype='application/json') resp.headers['Content-Encoding'] = 'gzip' return resp return jsonify(payload) except Exception as e: logger.error(f"获取微信日志失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 # 在现有路由下添加 @system_bp.route('/api/current_user_info', methods=['GET']) @login_required def get_current_user_info(): """获取当前登录的微信用户信息""" dashboard_server = current_app.dashboard_server result = dashboard_server.get_current_user_info() return jsonify(result) @system_bp.route('/api/system/config/raw', methods=['GET']) @login_required def get_system_config_raw(): try: server = current_app.dashboard_server config_path = _system_config_path() with open(config_path, 'r', encoding='utf-8') as f: config_text = f.read() robot_config = getattr(getattr(server, "robot", None), "config", None) llm_config = getattr(robot_config, "llm", {}) if robot_config else {} llm_backends = (llm_config or {}).get("backends", {}) return jsonify({ "success": True, "data": config_text, "path": config_path, "llm_backends": list((llm_backends or {}).keys()), }) except Exception as e: logger.error(f"读取系统配置失败: {e}") return jsonify({"success": False, "message": str(e)}), 500 @system_bp.route('/api/system/config/update', methods=['POST']) @login_required def update_system_config(): try: server = current_app.dashboard_server data = request.get_json() or {} config_text = data.get("config_text") if config_text is None: return jsonify({"success": False, "message": "缺少配置内容"}), 400 yaml.safe_load(config_text) config_path = _system_config_path() with open(config_path, 'w', encoding='utf-8') as f: f.write(config_text) if getattr(server, "robot", None) and getattr(server.robot, "config", None): server.robot.config.reload() return jsonify({"success": True, "message": "全局配置已保存"}) except Exception as e: logger.error(f"保存系统配置失败: {e}") return jsonify({"success": False, "message": str(e)}), 500 @system_bp.route('/api/system/llm_config', methods=['GET']) @login_required def get_system_llm_config(): try: config_obj = _load_system_yaml() llm_config = config_obj.get("llm", {}) or {} backends = llm_config.get("backends", {}) or {} backend_list = [] for name, backend in backends.items(): if not isinstance(backend, dict): continue item = dict(backend) item["name"] = name backend_list.append(item) backend_list.sort(key=lambda item: item.get("name", "")) return jsonify({ "success": True, "data": { "default_backend": llm_config.get("default_backend", ""), "backends": backend_list, "config_path": _system_config_path(), } }) except Exception as e: logger.error(f"读取全局 LLM 配置失败: {e}") return jsonify({"success": False, "message": str(e)}), 500 @system_bp.route('/api/system/llm_config', methods=['POST']) @login_required def update_system_llm_config(): try: server = current_app.dashboard_server data = request.get_json() or {} default_backend = str(data.get("default_backend") or "").strip() backend_list = data.get("backends", []) or [] if not isinstance(backend_list, list): return jsonify({"success": False, "message": "backends 格式不正确"}), 400 normalized_backends = {} for raw in backend_list: if not isinstance(raw, dict): continue name = str(raw.get("name") or "").strip() if not name: continue item = {} for key, value in raw.items(): if key == "name": continue if value is None: continue if isinstance(value, str): value = value.strip() if value == "": continue item[key] = value normalized_backends[name] = item if default_backend and default_backend not in normalized_backends: return jsonify({"success": False, "message": "默认后端不存在"}), 400 config_obj = _load_system_yaml() config_obj["llm"] = { "default_backend": default_backend, "backends": normalized_backends, } _save_system_yaml(config_obj) if getattr(server, "robot", None) and getattr(server.robot, "config", None): server.robot.config.reload() return jsonify({"success": True, "message": "全局 LLM 配置已保存"}) except Exception as e: logger.error(f"保存全局 LLM 配置失败: {e}") return jsonify({"success": False, "message": str(e)}), 500 @system_bp.route('/api/system/md2img_health', methods=['GET']) @login_required def get_md2img_health(): """查询 Markdown 转图运行时健康状态。""" try: # 默认只读取状态,不主动拉起 runtime。 # 当后台希望“刷新并顺便拉起”时,可传 ensure_runtime=true。 ensure_runtime = str(request.args.get('ensure_runtime', 'false')).strip().lower() in {'1', 'true', 'yes', 'on'} data = get_md2img_health_snapshot(ensure_runtime=ensure_runtime) return jsonify({"success": True, "data": data}) except Exception as e: logger.error(f"获取 md2img 健康状态失败: {e}") return jsonify({"success": False, "message": str(e)}), 500 @system_bp.route('/api/system/md2img_warmup', methods=['POST']) @login_required def trigger_md2img_warmup(): """手动触发 Markdown 转图浏览器预热。""" try: payload = request.get_json(silent=True) or {} timeout_seconds = int(payload.get('timeout_seconds', 45) or 45) timeout_seconds = max(10, min(timeout_seconds, 180)) ok = warmup_md2img_browser_sync(timeout_seconds=timeout_seconds) data = get_md2img_health_snapshot(ensure_runtime=False) if ok: return jsonify({ "success": True, "message": f"预热完成(timeout={timeout_seconds}s)", "data": data, }) return jsonify({ "success": False, "message": f"预热失败(timeout={timeout_seconds}s),请查看运行日志", "data": data, }), 500 except Exception as e: logger.error(f"触发 md2img 预热失败: {e}") return jsonify({"success": False, "message": str(e)}), 500 @system_bp.route('/api/restart_service', methods=['POST']) @login_required def restart_service(): """调用项目根目录下的 restart.sh 重启服务""" try: project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) script_path = os.path.join(project_root, 'restart.sh') if not os.path.exists(script_path): return jsonify({"success": False, "message": f"未找到脚本: {script_path}"}), 404 subprocess.Popen( ['bash', script_path], cwd=project_root, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True ) logger.warning(f"后台触发服务重启脚本: {script_path}") return jsonify({ "success": True, "message": "已触发重启脚本,服务将在短时间内重启" }) except Exception as e: logger.error(f"触发服务重启失败: {e}") return jsonify({"success": False, "message": str(e)}), 500