Files
supervision/alerter.py
2026-04-20 16:54:55 +02:00

121 lines
4.4 KiB
Python

"""Envoi d'alertes par email via Brevo API ou SMTP."""
import json
import smtplib
import urllib.error
import urllib.request
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
class EmailAlerter:
def __init__(self, config_manager):
self.config = config_manager
def _get_smtp_config(self):
return self.config.get("smtp", {})
def is_configured(self):
smtp = self._get_smtp_config()
has_recipients = bool(smtp.get("from_email") and smtp.get("to_emails"))
if smtp.get("brevo_api_key"):
return has_recipients
return has_recipients and bool(smtp.get("server"))
def send_alert(self, subject, body):
"""Envoie un email d'alerte. Silencieux si non configure."""
if not self.is_configured():
return False, "Email non configure"
return self._send_email(subject, body)
def send_test(self):
"""Envoie un email de test pour valider la configuration."""
if not self.is_configured():
return False, "Configuration email incomplete"
subject = "[TEST] Supervision - Test de configuration email"
body = (
"Ceci est un email de test.\n\n"
"Si vous recevez ce message, la configuration est correcte.\n\n"
"-- Supervision"
)
return self._send_email(subject, body)
def _send_email(self, subject, body):
smtp_cfg = self._get_smtp_config()
if smtp_cfg.get("brevo_api_key"):
return self._send_via_brevo(subject, body, smtp_cfg)
return self._send_via_smtp(subject, body, smtp_cfg)
def _send_via_brevo(self, subject, body, smtp_cfg):
"""Envoi via l'API REST Brevo."""
data = {
"sender": {"email": smtp_cfg["from_email"]},
"to": [{"email": e} for e in smtp_cfg["to_emails"]],
"subject": subject,
"textContent": body,
}
req = urllib.request.Request(
"https://api.brevo.com/v3/smtp/email",
data=json.dumps(data).encode("utf-8"),
headers={
"Content-Type": "application/json",
"api-key": smtp_cfg["brevo_api_key"],
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15):
return True, "Email envoye via Brevo"
except urllib.error.HTTPError as e:
return False, f"Erreur Brevo API: {e.code} {e.reason}"
except urllib.error.URLError as e:
return False, f"Impossible de joindre Brevo: {e.reason}"
except Exception as e:
return False, f"Erreur Brevo: {str(e)}"
def _send_via_smtp(self, subject, body, smtp_cfg):
"""Envoi via SMTP classique."""
try:
msg = MIMEMultipart()
msg["From"] = smtp_cfg["from_email"]
msg["To"] = ", ".join(smtp_cfg["to_emails"])
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain", "utf-8"))
server_addr = smtp_cfg["server"]
port = int(smtp_cfg.get("port", 587))
use_tls = smtp_cfg.get("use_tls", True)
if use_tls:
server = smtplib.SMTP(server_addr, port, timeout=15)
server.ehlo()
server.starttls()
server.ehlo()
else:
server = smtplib.SMTP(server_addr, port, timeout=15)
server.ehlo()
username = smtp_cfg.get("username", "")
password = smtp_cfg.get("password", "")
if username and password:
server.login(username, password)
server.sendmail(
smtp_cfg["from_email"],
smtp_cfg["to_emails"],
msg.as_string(),
)
server.quit()
return True, "Email envoye avec succes"
except smtplib.SMTPAuthenticationError:
return False, "Erreur d'authentification SMTP (identifiants incorrects)"
except smtplib.SMTPConnectError:
return False, f"Impossible de se connecter au serveur SMTP {server_addr}:{port}"
except smtplib.SMTPRecipientsRefused:
return False, "Destinataire(s) refuse(s) par le serveur"
except TimeoutError:
return False, f"Timeout de connexion vers {server_addr}:{port}"
except Exception as e:
return False, f"Erreur SMTP: {str(e)}"