194 lines
6.3 KiB
Python
194 lines
6.3 KiB
Python
"""流量预警检查服务"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from database import Config, ProxyMachine, get_session, ensure_singleton_config
|
|
from services.traffic_monitor import get_current_month_traffic
|
|
from services.email_service import send_traffic_alert_email
|
|
from services.scheduler import stop_machine_auto
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
|
|
|
|
# 字节转GB
|
|
BYTES_PER_GB = 1024 * 1024 * 1024
|
|
|
|
|
|
def check_machine_traffic_alert(machine: ProxyMachine, config: Config) -> dict:
|
|
"""
|
|
检查单台机器的流量预警
|
|
|
|
Returns:
|
|
{"triggered": bool, "message": str}
|
|
"""
|
|
if not machine.traffic_alert_enabled or not machine.traffic_alert_limit_gb:
|
|
return {"triggered": False, "message": "未启用流量预警"}
|
|
|
|
if not config.aws_access_key or not config.aws_secret_key:
|
|
return {"triggered": False, "message": "AWS 凭证未配置"}
|
|
|
|
try:
|
|
# 获取当月流量
|
|
traffic_data = get_current_month_traffic(
|
|
aws_service=machine.aws_service,
|
|
region=machine.aws_region,
|
|
instance_id=machine.aws_instance_id,
|
|
aws_access_key=config.aws_access_key,
|
|
aws_secret_key=config.aws_secret_key,
|
|
)
|
|
|
|
if not traffic_data.get("ok"):
|
|
return {"triggered": False, "message": f"获取流量失败: {traffic_data.get('message')}"}
|
|
|
|
# 根据服务类型判断预警条件
|
|
if machine.aws_service == "lightsail":
|
|
# Lightsail: 总流量预警
|
|
current_bytes = traffic_data["total"]
|
|
traffic_type = "total"
|
|
else:
|
|
# EC2: 上传流量预警
|
|
current_bytes = traffic_data["network_out"]
|
|
traffic_type = "upload"
|
|
|
|
current_gb = current_bytes / BYTES_PER_GB
|
|
limit_gb = machine.traffic_alert_limit_gb
|
|
|
|
if current_gb >= limit_gb:
|
|
return {
|
|
"triggered": True,
|
|
"message": f"流量超限: {current_gb:.2f} GB / {limit_gb:.2f} GB",
|
|
"current_gb": current_gb,
|
|
"limit_gb": limit_gb,
|
|
"traffic_type": traffic_type,
|
|
}
|
|
|
|
return {
|
|
"triggered": False,
|
|
"message": f"流量正常: {current_gb:.2f} GB / {limit_gb:.2f} GB",
|
|
"current_gb": current_gb,
|
|
"limit_gb": limit_gb,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.exception("Failed to check traffic for %s", machine.name)
|
|
return {"triggered": False, "message": f"检查失败: {str(e)}"}
|
|
|
|
|
|
def handle_traffic_alert(machine: ProxyMachine, config: Config, alert_result: dict) -> None:
|
|
"""
|
|
处理流量预警:暂停机器 + 发送邮件
|
|
"""
|
|
session = get_session()
|
|
try:
|
|
# 重新获取机器对象(确保在当前 session 中)
|
|
db_machine = session.query(ProxyMachine).filter_by(id=machine.id).first()
|
|
if not db_machine:
|
|
return
|
|
|
|
# 标记已触发预警
|
|
db_machine.traffic_alert_triggered = True
|
|
db_machine.traffic_last_check_at = datetime.now(SHANGHAI_TZ)
|
|
|
|
# 暂停自动任务
|
|
if db_machine.auto_enabled:
|
|
db_machine.auto_enabled = False
|
|
stop_machine_auto(db_machine.id)
|
|
logger.warning("Machine %s auto job stopped due to traffic alert", db_machine.name)
|
|
|
|
session.commit()
|
|
|
|
# 发送邮件通知
|
|
if config.smtp_host and config.alert_email:
|
|
send_traffic_alert_email(
|
|
smtp_host=config.smtp_host,
|
|
smtp_port=config.smtp_port or 587,
|
|
smtp_user=config.smtp_user,
|
|
smtp_password=config.smtp_password,
|
|
use_tls=config.smtp_use_tls,
|
|
to_email=config.alert_email,
|
|
machine_name=db_machine.name,
|
|
aws_service=db_machine.aws_service,
|
|
current_traffic_gb=alert_result["current_gb"],
|
|
limit_gb=alert_result["limit_gb"],
|
|
traffic_type=alert_result["traffic_type"],
|
|
)
|
|
else:
|
|
logger.warning("Email not sent: SMTP or alert email not configured")
|
|
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def check_all_traffic_alerts() -> dict:
|
|
"""
|
|
检查所有机器的流量预警
|
|
|
|
Returns:
|
|
{"checked": int, "triggered": int, "results": [...]}
|
|
"""
|
|
session = get_session()
|
|
try:
|
|
config = ensure_singleton_config(session)
|
|
|
|
# 获取所有启用了流量预警且未触发的机器
|
|
machines = session.query(ProxyMachine).filter(
|
|
ProxyMachine.traffic_alert_enabled == True,
|
|
ProxyMachine.traffic_alert_triggered == False,
|
|
ProxyMachine.enabled == True,
|
|
).all()
|
|
|
|
results = []
|
|
triggered_count = 0
|
|
|
|
for machine in machines:
|
|
result = check_machine_traffic_alert(machine, config)
|
|
results.append({
|
|
"machine_id": machine.id,
|
|
"machine_name": machine.name,
|
|
**result,
|
|
})
|
|
|
|
if result.get("triggered"):
|
|
triggered_count += 1
|
|
handle_traffic_alert(machine, config, result)
|
|
logger.warning("Traffic alert triggered for %s: %s", machine.name, result["message"])
|
|
|
|
# 更新检查时间
|
|
machine.traffic_last_check_at = datetime.now(SHANGHAI_TZ)
|
|
session.commit()
|
|
|
|
return {
|
|
"ok": True,
|
|
"checked": len(machines),
|
|
"triggered": triggered_count,
|
|
"results": results,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.exception("Failed to check traffic alerts")
|
|
return {"ok": False, "message": str(e), "checked": 0, "triggered": 0, "results": []}
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def reset_machine_alert(machine_id: int) -> dict:
|
|
"""
|
|
重置机器的预警状态(手动解除预警)
|
|
"""
|
|
session = get_session()
|
|
try:
|
|
machine = session.query(ProxyMachine).filter_by(id=machine_id).first()
|
|
if not machine:
|
|
return {"ok": False, "message": "机器不存在"}
|
|
|
|
machine.traffic_alert_triggered = False
|
|
session.commit()
|
|
|
|
return {"ok": True, "message": f"已重置 {machine.name} 的预警状态"}
|
|
finally:
|
|
session.close()
|