"""流量监控服务 - 获取 EC2/Lightsail 流量数据""" from __future__ import annotations import logging from datetime import datetime, timedelta from typing import Any from zoneinfo import ZoneInfo import boto3 logger = logging.getLogger(__name__) SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") def create_cloudwatch_client(region: str, aws_access_key: str, aws_secret_key: str): """创建 CloudWatch 客户端""" return boto3.client( "cloudwatch", region_name=region, aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, ) def get_ec2_traffic( region: str, instance_id: str, aws_access_key: str, aws_secret_key: str, start_time: datetime, end_time: datetime, period: int = 3600, ) -> dict[str, Any]: """ 获取 EC2 实例的流量数据 Args: region: AWS 区域 instance_id: EC2 实例 ID aws_access_key: AWS Access Key aws_secret_key: AWS Secret Key start_time: 开始时间 end_time: 结束时间 period: 数据点间隔(秒),默认1小时 Returns: { "network_in": 下载流量(字节), "network_out": 上传流量(字节), "data_points": 详细数据点列表 } """ try: cloudwatch = create_cloudwatch_client(region, aws_access_key, aws_secret_key) # 获取 NetworkIn(下载) network_in_response = cloudwatch.get_metric_statistics( Namespace="AWS/EC2", MetricName="NetworkIn", Dimensions=[{"Name": "InstanceId", "Value": instance_id}], StartTime=start_time, EndTime=end_time, Period=period, Statistics=["Sum"], ) # 获取 NetworkOut(上传) network_out_response = cloudwatch.get_metric_statistics( Namespace="AWS/EC2", MetricName="NetworkOut", Dimensions=[{"Name": "InstanceId", "Value": instance_id}], StartTime=start_time, EndTime=end_time, Period=period, Statistics=["Sum"], ) # 计算总流量 network_in_total = sum(dp["Sum"] for dp in network_in_response.get("Datapoints", [])) network_out_total = sum(dp["Sum"] for dp in network_out_response.get("Datapoints", [])) # 合并数据点用于图表 data_points = [] in_points = {dp["Timestamp"]: dp["Sum"] for dp in network_in_response.get("Datapoints", [])} out_points = {dp["Timestamp"]: dp["Sum"] for dp in network_out_response.get("Datapoints", [])} all_timestamps = sorted(set(in_points.keys()) | set(out_points.keys())) for ts in all_timestamps: data_points.append({ "timestamp": ts.astimezone(SHANGHAI_TZ).isoformat(), "network_in": in_points.get(ts, 0), "network_out": out_points.get(ts, 0), }) return { "ok": True, "network_in": network_in_total, "network_out": network_out_total, "total": network_in_total + network_out_total, "data_points": data_points, } except Exception as e: logger.exception("Failed to get EC2 traffic for %s", instance_id) return {"ok": False, "message": str(e), "network_in": 0, "network_out": 0, "total": 0, "data_points": []} def get_lightsail_traffic( region: str, instance_name: str, aws_access_key: str, aws_secret_key: str, start_time: datetime, end_time: datetime, period: int = 3600, ) -> dict[str, Any]: """ 获取 Lightsail 实例的流量数据 Args: region: AWS 区域 instance_name: Lightsail 实例名称 aws_access_key: AWS Access Key aws_secret_key: AWS Secret Key start_time: 开始时间 end_time: 结束时间 period: 数据点间隔(秒),默认1小时 Returns: { "network_in": 下载流量(字节), "network_out": 上传流量(字节), "data_points": 详细数据点列表 } """ try: lightsail = boto3.client( "lightsail", region_name=region, aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, ) # 获取 NetworkIn(下载) network_in_response = lightsail.get_instance_metric_data( instanceName=instance_name, metricName="NetworkIn", period=period, startTime=start_time, endTime=end_time, unit="Bytes", statistics=["Sum"], ) # 获取 NetworkOut(上传) network_out_response = lightsail.get_instance_metric_data( instanceName=instance_name, metricName="NetworkOut", period=period, startTime=start_time, endTime=end_time, unit="Bytes", statistics=["Sum"], ) # 计算总流量 in_data = network_in_response.get("metricData", []) out_data = network_out_response.get("metricData", []) network_in_total = sum(dp.get("sum", 0) for dp in in_data) network_out_total = sum(dp.get("sum", 0) for dp in out_data) # 合并数据点 data_points = [] in_points = {dp["timestamp"]: dp.get("sum", 0) for dp in in_data} out_points = {dp["timestamp"]: dp.get("sum", 0) for dp in out_data} all_timestamps = sorted(set(in_points.keys()) | set(out_points.keys())) for ts in all_timestamps: data_points.append({ "timestamp": ts.astimezone(SHANGHAI_TZ).isoformat(), "network_in": in_points.get(ts, 0), "network_out": out_points.get(ts, 0), }) return { "ok": True, "network_in": network_in_total, "network_out": network_out_total, "total": network_in_total + network_out_total, "data_points": data_points, } except Exception as e: logger.exception("Failed to get Lightsail traffic for %s", instance_name) return {"ok": False, "message": str(e), "network_in": 0, "network_out": 0, "total": 0, "data_points": []} def get_machine_traffic( aws_service: str, region: str, instance_id: str, aws_access_key: str, aws_secret_key: str, start_time: datetime, end_time: datetime, period: int = 3600, ) -> dict[str, Any]: """ 统一接口:根据服务类型获取流量数据 """ if aws_service == "lightsail": return get_lightsail_traffic( region=region, instance_name=instance_id, aws_access_key=aws_access_key, aws_secret_key=aws_secret_key, start_time=start_time, end_time=end_time, period=period, ) else: return get_ec2_traffic( region=region, instance_id=instance_id, aws_access_key=aws_access_key, aws_secret_key=aws_secret_key, start_time=start_time, end_time=end_time, period=period, ) def get_current_month_traffic( aws_service: str, region: str, instance_id: str, aws_access_key: str, aws_secret_key: str, ) -> dict[str, Any]: """获取当月流量数据""" now = datetime.now(SHANGHAI_TZ) start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) return get_machine_traffic( aws_service=aws_service, region=region, instance_id=instance_id, aws_access_key=aws_access_key, aws_secret_key=aws_secret_key, start_time=start_of_month, end_time=now, period=3600, # 每小时一个数据点 ) def get_current_day_traffic( aws_service: str, region: str, instance_id: str, aws_access_key: str, aws_secret_key: str, ) -> dict[str, Any]: """获取当日流量数据""" now = datetime.now(SHANGHAI_TZ) start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0) return get_machine_traffic( aws_service=aws_service, region=region, instance_id=instance_id, aws_access_key=aws_access_key, aws_secret_key=aws_secret_key, start_time=start_of_day, end_time=now, period=300, # 每5分钟一个数据点 ) def get_all_time_traffic( aws_service: str, region: str, instance_id: str, aws_access_key: str, aws_secret_key: str, created_at: datetime = None, ) -> dict[str, Any]: """ 获取建站至今的总流量数据 注意: CloudWatch 数据保留期限有限: - 小于60秒的数据点保留3小时 - 60秒(1分钟)的数据点保留15天 - 300秒(5分钟)的数据点保留63天 - 3600秒(1小时)的数据点保留455天(约15个月) 因此这里只能获取最近约15个月的数据 """ now = datetime.now(SHANGHAI_TZ) # 如果提供了创建时间,使用它;否则使用15个月前 if created_at: # 确保时区正确 if created_at.tzinfo is None: start_time = created_at.replace(tzinfo=SHANGHAI_TZ) else: start_time = created_at.astimezone(SHANGHAI_TZ) else: # CloudWatch 最多保留约15个月的小时级数据 start_time = now - timedelta(days=455) return get_machine_traffic( aws_service=aws_service, region=region, instance_id=instance_id, aws_access_key=aws_access_key, aws_secret_key=aws_secret_key, start_time=start_time, end_time=now, period=86400, # 每天一个数据点 ) def format_bytes(bytes_value: float) -> str: """格式化字节数为可读格式""" if bytes_value < 0: return "0 B" units = ["B", "KB", "MB", "GB", "TB"] unit_index = 0 value = float(bytes_value) while value >= 1024 and unit_index < len(units) - 1: value /= 1024 unit_index += 1 if unit_index == 0: return f"{int(value)} {units[unit_index]}" return f"{value:.2f} {units[unit_index]}"