"""流量预警检查服务""" from __future__ import annotations import logging from datetime import datetime from zoneinfo import ZoneInfo from database import Config, ProxyMachine, get_session, ensure_singleton_config from services.traffic_monitor import get_current_month_traffic from services.email_service import send_traffic_alert_email from services.scheduler import stop_machine_auto logger = logging.getLogger(__name__) SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") # 字节转GB BYTES_PER_GB = 1024 * 1024 * 1024 def check_machine_traffic_alert(machine: ProxyMachine, config: Config) -> dict: """ 检查单台机器的流量预警 Returns: {"triggered": bool, "message": str} """ if not machine.traffic_alert_enabled or not machine.traffic_alert_limit_gb: return {"triggered": False, "message": "未启用流量预警"} if not config.aws_access_key or not config.aws_secret_key: return {"triggered": False, "message": "AWS 凭证未配置"} try: # 获取当月流量 traffic_data = get_current_month_traffic( aws_service=machine.aws_service, region=machine.aws_region, instance_id=machine.aws_instance_id, aws_access_key=config.aws_access_key, aws_secret_key=config.aws_secret_key, ) if not traffic_data.get("ok"): return {"triggered": False, "message": f"获取流量失败: {traffic_data.get('message')}"} # 根据服务类型判断预警条件 if machine.aws_service == "lightsail": # Lightsail: 总流量预警 current_bytes = traffic_data["total"] traffic_type = "total" else: # EC2: 上传流量预警 current_bytes = traffic_data["network_out"] traffic_type = "upload" current_gb = current_bytes / BYTES_PER_GB limit_gb = machine.traffic_alert_limit_gb if current_gb >= limit_gb: return { "triggered": True, "message": f"流量超限: {current_gb:.2f} GB / {limit_gb:.2f} GB", "current_gb": current_gb, "limit_gb": limit_gb, "traffic_type": traffic_type, } return { "triggered": False, "message": f"流量正常: {current_gb:.2f} GB / {limit_gb:.2f} GB", "current_gb": current_gb, "limit_gb": limit_gb, } except Exception as e: logger.exception("Failed to check traffic for %s", machine.name) return {"triggered": False, "message": f"检查失败: {str(e)}"} def handle_traffic_alert(machine: ProxyMachine, config: Config, alert_result: dict) -> None: """ 处理流量预警:暂停机器 + 发送邮件 """ session = get_session() try: # 重新获取机器对象(确保在当前 session 中) db_machine = session.query(ProxyMachine).filter_by(id=machine.id).first() if not db_machine: return # 标记已触发预警 db_machine.traffic_alert_triggered = True db_machine.traffic_last_check_at = datetime.now(SHANGHAI_TZ) # 暂停自动任务 if db_machine.auto_enabled: db_machine.auto_enabled = False stop_machine_auto(db_machine.id) logger.warning("Machine %s auto job stopped due to traffic alert", db_machine.name) session.commit() # 发送邮件通知 if config.smtp_host and config.alert_email: send_traffic_alert_email( smtp_host=config.smtp_host, smtp_port=config.smtp_port or 587, smtp_user=config.smtp_user, smtp_password=config.smtp_password, use_tls=config.smtp_use_tls, to_email=config.alert_email, machine_name=db_machine.name, aws_service=db_machine.aws_service, current_traffic_gb=alert_result["current_gb"], limit_gb=alert_result["limit_gb"], traffic_type=alert_result["traffic_type"], ) else: logger.warning("Email not sent: SMTP or alert email not configured") finally: session.close() def check_all_traffic_alerts() -> dict: """ 检查所有机器的流量预警 Returns: {"checked": int, "triggered": int, "results": [...]} """ session = get_session() try: config = ensure_singleton_config(session) # 获取所有启用了流量预警且未触发的机器 machines = session.query(ProxyMachine).filter( ProxyMachine.traffic_alert_enabled == True, ProxyMachine.traffic_alert_triggered == False, ProxyMachine.enabled == True, ).all() results = [] triggered_count = 0 for machine in machines: result = check_machine_traffic_alert(machine, config) results.append({ "machine_id": machine.id, "machine_name": machine.name, **result, }) if result.get("triggered"): triggered_count += 1 handle_traffic_alert(machine, config, result) logger.warning("Traffic alert triggered for %s: %s", machine.name, result["message"]) # 更新检查时间 machine.traffic_last_check_at = datetime.now(SHANGHAI_TZ) session.commit() return { "ok": True, "checked": len(machines), "triggered": triggered_count, "results": results, } except Exception as e: logger.exception("Failed to check traffic alerts") return {"ok": False, "message": str(e), "checked": 0, "triggered": 0, "results": []} finally: session.close() def reset_machine_alert(machine_id: int) -> dict: """ 重置机器的预警状态(手动解除预警) """ session = get_session() try: machine = session.query(ProxyMachine).filter_by(id=machine_id).first() if not machine: return {"ok": False, "message": "机器不存在"} machine.traffic_alert_triggered = False session.commit() return {"ok": True, "message": f"已重置 {machine.name} 的预警状态"} finally: session.close()