"""Envoi d'alertes par email via Brevo API ou SMTP.""" import json import smtplib import urllib.error import urllib.request from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText class EmailAlerter: def __init__(self, config_manager): self.config = config_manager def _get_smtp_config(self): return self.config.get("smtp", {}) def is_configured(self): smtp = self._get_smtp_config() has_recipients = bool(smtp.get("from_email") and smtp.get("to_emails")) if smtp.get("brevo_api_key"): return has_recipients return has_recipients and bool(smtp.get("server")) def send_alert(self, subject, body): """Envoie un email d'alerte. Silencieux si non configure.""" if not self.is_configured(): return False, "Email non configure" return self._send_email(subject, body) def send_test(self): """Envoie un email de test pour valider la configuration.""" if not self.is_configured(): return False, "Configuration email incomplete" subject = "[TEST] Supervision - Test de configuration email" body = ( "Ceci est un email de test.\n\n" "Si vous recevez ce message, la configuration est correcte.\n\n" "-- Supervision" ) return self._send_email(subject, body) def _send_email(self, subject, body): smtp_cfg = self._get_smtp_config() if smtp_cfg.get("brevo_api_key"): return self._send_via_brevo(subject, body, smtp_cfg) return self._send_via_smtp(subject, body, smtp_cfg) def _send_via_brevo(self, subject, body, smtp_cfg): """Envoi via l'API REST Brevo.""" data = { "sender": {"email": smtp_cfg["from_email"]}, "to": [{"email": e} for e in smtp_cfg["to_emails"]], "subject": subject, "textContent": body, } req = urllib.request.Request( "https://api.brevo.com/v3/smtp/email", data=json.dumps(data).encode("utf-8"), headers={ "Content-Type": "application/json", "api-key": smtp_cfg["brevo_api_key"], }, method="POST", ) try: with urllib.request.urlopen(req, timeout=15): return True, "Email envoye via Brevo" except urllib.error.HTTPError as e: return False, f"Erreur Brevo API: {e.code} {e.reason}" except urllib.error.URLError as e: return False, f"Impossible de joindre Brevo: {e.reason}" except Exception as e: return False, f"Erreur Brevo: {str(e)}" def _send_via_smtp(self, subject, body, smtp_cfg): """Envoi via SMTP classique.""" try: msg = MIMEMultipart() msg["From"] = smtp_cfg["from_email"] msg["To"] = ", ".join(smtp_cfg["to_emails"]) msg["Subject"] = subject msg.attach(MIMEText(body, "plain", "utf-8")) server_addr = smtp_cfg["server"] port = int(smtp_cfg.get("port", 587)) use_tls = smtp_cfg.get("use_tls", True) if use_tls: server = smtplib.SMTP(server_addr, port, timeout=15) server.ehlo() server.starttls() server.ehlo() else: server = smtplib.SMTP(server_addr, port, timeout=15) server.ehlo() username = smtp_cfg.get("username", "") password = smtp_cfg.get("password", "") if username and password: server.login(username, password) server.sendmail( smtp_cfg["from_email"], smtp_cfg["to_emails"], msg.as_string(), ) server.quit() return True, "Email envoye avec succes" except smtplib.SMTPAuthenticationError: return False, "Erreur d'authentification SMTP (identifiants incorrects)" except smtplib.SMTPConnectError: return False, f"Impossible de se connecter au serveur SMTP {server_addr}:{port}" except smtplib.SMTPRecipientsRefused: return False, "Destinataire(s) refuse(s) par le serveur" except TimeoutError: return False, f"Timeout de connexion vers {server_addr}:{port}" except Exception as e: return False, f"Erreur SMTP: {str(e)}"