Files
JieXi/utils/health_check.py
2025-11-28 21:20:40 +08:00

140 lines
4.3 KiB
Python

from models import ParserAPI, HealthCheckConfig, HealthCheckLog
from models import db
from parsers.factory import ParserFactory
from utils.email import EmailService
from datetime import datetime
import time
class HealthChecker:
"""健康检查器"""
@staticmethod
def check_api(api: ParserAPI, test_url: str) -> dict:
"""检查单个API"""
start_time = time.time()
try:
# 创建解析器
parser = ParserFactory.create_parser(api)
# 执行解析
result = parser.parse(test_url)
# 计算响应时间
response_time = int((time.time() - start_time) * 1000)
# 检查结果是否有效
if result and result.get('video_url'):
return {
'success': True,
'response_time': response_time,
'error': None
}
else:
return {
'success': False,
'response_time': response_time,
'error': '解析结果无效'
}
except Exception as e:
response_time = int((time.time() - start_time) * 1000)
return {
'success': False,
'response_time': response_time,
'error': str(e)
}
@staticmethod
def check_platform(platform: str):
"""检查指定平台的所有API"""
# 获取健康检查配置
config = HealthCheckConfig.query.filter_by(
platform=platform,
is_enabled=True
).first()
if not config:
return
# 获取该平台的所有API
apis = ParserAPI.query.filter_by(platform=platform).all()
failed_apis = []
for api in apis:
# 执行健康检查
result = HealthChecker.check_api(api, config.test_url)
# 记录日志
log = HealthCheckLog(
parser_api_id=api.id,
status='success' if result['success'] else 'failed',
response_time=result['response_time'],
error_message=result['error']
)
db.session.add(log)
# 更新API状态
api.last_check_at = datetime.utcnow()
if result['success']:
api.health_status = True
api.fail_count = 0
else:
api.fail_count += 1
# 连续失败3次标记为不健康
if api.fail_count >= 3:
api.health_status = False
failed_apis.append({
'name': api.name,
'error': result['error']
})
db.session.commit()
# 发送告警邮件
if failed_apis and config.alert_email:
HealthChecker.send_alert_email(platform, failed_apis, config.alert_email)
@staticmethod
def send_alert_email(platform: str, failed_apis: list, alert_email: str):
"""发送告警邮件"""
subject = f"【短视频解析平台】{platform}接口健康检查告警"
content = f"""
<html>
<body style="font-family: Arial, sans-serif; padding: 20px;">
<h2>接口健康检查告警</h2>
<p>以下{platform}解析接口健康检查失败:</p>
<ul>
"""
for api in failed_apis:
content += f"<li><strong>{api['name']}</strong>: {api['error']}</li>"
content += """
</ul>
<p>请及时检查并处理。</p>
<hr>
<p style="color: #999; font-size: 12px;">此邮件由系统自动发送,请勿回复。</p>
</body>
</html>
"""
try:
EmailService.send_email(alert_email, subject, content, html=True)
except Exception as e:
print(f"发送告警邮件失败: {str(e)}")
@staticmethod
def check_all_platforms():
"""检查所有平台"""
configs = HealthCheckConfig.query.filter_by(is_enabled=True).all()
for config in configs:
try:
HealthChecker.check_platform(config.platform)
except Exception as e:
print(f"检查{config.platform}平台失败: {str(e)}")