from models import ParserAPI, HealthCheckConfig, HealthCheckLog from models import db from parsers.factory import ParserFactory from utils.email import EmailService from datetime import datetime import time class HealthChecker: """健康检查器""" @staticmethod def check_api(api: ParserAPI, test_url: str) -> dict: """检查单个API""" start_time = time.time() try: # 创建解析器 parser = ParserFactory.create_parser(api) # 执行解析 result = parser.parse(test_url) # 计算响应时间 response_time = int((time.time() - start_time) * 1000) # 检查结果是否有效 if result and result.get('video_url'): return { 'success': True, 'response_time': response_time, 'error': None } else: return { 'success': False, 'response_time': response_time, 'error': '解析结果无效' } except Exception as e: response_time = int((time.time() - start_time) * 1000) return { 'success': False, 'response_time': response_time, 'error': str(e) } @staticmethod def check_platform(platform: str): """检查指定平台的所有API""" # 获取健康检查配置 config = HealthCheckConfig.query.filter_by( platform=platform, is_enabled=True ).first() if not config: return # 获取该平台的所有API apis = ParserAPI.query.filter_by(platform=platform).all() failed_apis = [] for api in apis: # 执行健康检查 result = HealthChecker.check_api(api, config.test_url) # 记录日志 log = HealthCheckLog( parser_api_id=api.id, status='success' if result['success'] else 'failed', response_time=result['response_time'], error_message=result['error'] ) db.session.add(log) # 更新API状态 api.last_check_at = datetime.utcnow() if result['success']: api.health_status = True api.fail_count = 0 else: api.fail_count += 1 # 连续失败3次标记为不健康 if api.fail_count >= 3: api.health_status = False failed_apis.append({ 'name': api.name, 'error': result['error'] }) db.session.commit() # 发送告警邮件 if failed_apis and config.alert_email: HealthChecker.send_alert_email(platform, failed_apis, config.alert_email) @staticmethod def send_alert_email(platform: str, failed_apis: list, alert_email: str): """发送告警邮件""" subject = f"【短视频解析平台】{platform}接口健康检查告警" content = f"""
以下{platform}解析接口健康检查失败:
请及时检查并处理。
此邮件由系统自动发送,请勿回复。
""" try: EmailService.send_email(alert_email, subject, content, html=True) except Exception as e: print(f"发送告警邮件失败: {str(e)}") @staticmethod def check_all_platforms(): """检查所有平台""" configs = HealthCheckConfig.query.filter_by(is_enabled=True).all() for config in configs: try: HealthChecker.check_platform(config.platform) except Exception as e: print(f"检查{config.platform}平台失败: {str(e)}")