import smtplib import ssl from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.utils import formataddr from email.header import Header from models import SMTPConfig import random class EmailService: """邮件服务类""" @staticmethod def get_smtp_config(): """获取可用的SMTP配置(负载均衡)""" configs = SMTPConfig.query.filter_by(is_enabled=True).all() if not configs: raise Exception("没有可用的SMTP配置") # 加权随机选择 total_weight = sum(c.weight for c in configs) if total_weight == 0: return random.choice(configs) rand = random.uniform(0, total_weight) current = 0 for config in configs: current += config.weight if rand <= current: return config return configs[-1] @staticmethod def _do_send(smtp_config, to_email: str, subject: str, content: str, html: bool = True): """实际发送邮件的内部方法""" server = None try: print(f"[SMTP调试] 开始发送邮件") print(f"[SMTP调试] 服务器: {smtp_config.host}:{smtp_config.port}") print(f"[SMTP调试] 用户名: {smtp_config.username}") print(f"[SMTP调试] 使用TLS: {smtp_config.use_tls}") print(f"[SMTP调试] 发件人: {smtp_config.from_email}") msg = MIMEMultipart('alternative') from_email = smtp_config.from_email or smtp_config.username from_name = smtp_config.from_name or smtp_config.username msg['From'] = formataddr((str(Header(from_name, 'utf-8')), from_email)) msg['To'] = to_email msg['Subject'] = Header(subject, 'utf-8') if html: msg.attach(MIMEText(content, 'html', 'utf-8')) else: msg.attach(MIMEText(content, 'plain', 'utf-8')) print(f"[SMTP调试] 开始连接服务器...") if smtp_config.port == 465: print(f"[SMTP调试] 使用 SSL 模式(端口 465)") server = smtplib.SMTP_SSL(smtp_config.host, smtp_config.port, timeout=30) elif smtp_config.use_tls: print(f"[SMTP调试] 使用 STARTTLS 模式(端口 {smtp_config.port})") server = smtplib.SMTP(smtp_config.host, smtp_config.port, timeout=30) server.ehlo() context = ssl.create_default_context() server.starttls(context=context) server.ehlo() else: print(f"[SMTP调试] 使用普通 SMTP 模式(端口 {smtp_config.port})") server = smtplib.SMTP(smtp_config.host, smtp_config.port, timeout=30) print(f"[SMTP调试] 连接成功,开始登录...") server.login(smtp_config.username, smtp_config.password) print(f"[SMTP调试] 登录成功,开始发送邮件...") server.sendmail(from_email, [to_email], msg.as_string()) print(f"[SMTP调试] 邮件发送成功!") # 更新发送统计 smtp_config.send_count += 1 from models import db db.session.commit() return True finally: if server: try: server.quit() except: pass @staticmethod def send_email(to_email: str, subject: str, content: str, html: bool = True): """发送邮件(支持故障自动转移)""" configs = SMTPConfig.query.filter_by(is_enabled=True).all() if not configs: raise Exception("没有可用的SMTP配置") # 按权重排序,优先尝试高权重的配置 configs.sort(key=lambda x: x.weight, reverse=True) last_error = None tried_ids = [] for smtp_config in configs: tried_ids.append(smtp_config.id) try: return EmailService._do_send(smtp_config, to_email, subject, content, html) except Exception as e: last_error = str(e) print(f"[SMTP调试] {smtp_config.name} 发送失败: {last_error}") # 更新失败统计 smtp_config.fail_count += 1 from models import db db.session.commit() # 如果还有其他配置,继续尝试 remaining = len(configs) - len(tried_ids) if remaining > 0: print(f"[SMTP调试] 尝试下一个SMTP配置,剩余 {remaining} 个") continue # 所有配置都失败了 raise Exception(f"所有SMTP配置均发送失败,最后错误: {last_error}") @staticmethod def send_verification_code(to_email: str, code: str, purpose: str): """发送验证码邮件""" purpose_text = { 'register': '注册账号', 'reset_password': '重置密码', 'forgot_password': '找回密码' }.get(purpose, '验证') subject = f"【短视频解析平台】{purpose_text}验证码" content = f"""
您正在进行{purpose_text}操作,验证码为:
验证码有效期为10分钟,请勿泄露给他人。
如果这不是您的操作,请忽略此邮件。
""" return EmailService.send_email(to_email, subject, content, html=True)