Files
supervision/app.py
Dom 61d17968a0 feat: init projet supervision — monitoring systeme Windows
Interface web Flask securisee pour surveiller CPU, RAM, disques
et processus (JVM, Nginx, Amadea Web 8 x64).
Alertes email SMTP configurables, seuils reglables, compilation
PyInstaller en .exe, installation service Windows via NSSM.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 09:48:35 +01:00

351 lines
11 KiB
Python

"""Supervision — Monitoring systeme avec interface web."""
import socket
import sys
from datetime import timedelta
from functools import wraps
from flask import (
Flask, render_template, request, redirect, url_for,
flash, jsonify, session,
)
from flask_login import (
LoginManager, UserMixin, login_user, logout_user,
login_required, current_user,
)
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from werkzeug.security import check_password_hash, generate_password_hash
from config_manager import ConfigManager
from monitor import SystemMonitor
from alerter import EmailAlerter
# --- Init ---
config = ConfigManager()
app = Flask(__name__)
app.secret_key = config.get("secret_key")
app.permanent_session_lifetime = timedelta(hours=8)
# En-tetes de securite
@app.after_request
def security_headers(response):
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response
# Rate limiting
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=[],
storage_uri="memory://",
)
# Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
login_manager.login_message = "Veuillez vous connecter."
# Services
alerter = EmailAlerter(config)
monitor = SystemMonitor(config, alerter)
class AdminUser(UserMixin):
def __init__(self, username):
self.id = username
@login_manager.user_loader
def load_user(user_id):
admin = config.get("admin", {})
if user_id == admin.get("username"):
return AdminUser(user_id)
return None
def is_default_password():
admin = config.get("admin", {})
return check_password_hash(admin.get("password_hash", ""), "admin")
# --- Routes ---
@app.route("/login", methods=["GET", "POST"])
@limiter.limit("10 per minute")
def login():
if current_user.is_authenticated:
return redirect(url_for("dashboard"))
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
admin = config.get("admin", {})
if username == admin.get("username") and check_password_hash(
admin.get("password_hash", ""), password
):
user = AdminUser(username)
login_user(user, remember=True)
session.permanent = True
next_page = request.args.get("next")
return redirect(next_page or url_for("dashboard"))
flash("Identifiants incorrects.", "danger")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for("login"))
@app.route("/")
@login_required
def dashboard():
metrics = monitor.metrics
default_pw = is_default_password()
return render_template("dashboard.html", metrics=metrics, default_pw=default_pw)
@app.route("/api/metrics")
@login_required
def api_metrics():
return jsonify(monitor.metrics)
@app.route("/settings", methods=["GET"])
@login_required
def settings():
cfg = config.config
smtp = cfg.get("smtp", {})
# Masquer le mot de passe SMTP dans l'affichage
smtp_display = dict(smtp)
if smtp_display.get("password"):
smtp_display["password_masked"] = "*" * 8
else:
smtp_display["password_masked"] = ""
return render_template(
"settings.html",
config=cfg,
smtp=smtp_display,
default_pw=is_default_password(),
)
@app.route("/settings/thresholds", methods=["POST"])
@login_required
def update_thresholds():
try:
thresholds = {
"cpu_percent": int(request.form["cpu_percent"]),
"ram_percent": int(request.form["ram_percent"]),
"disk_percent": int(request.form["disk_percent"]),
}
# Validation: seuils entre 1 et 100
for key, val in thresholds.items():
if not 1 <= val <= 100:
flash(f"Le seuil {key} doit etre entre 1 et 100.", "danger")
return redirect(url_for("settings"))
config.set("thresholds", thresholds)
flash("Seuils mis a jour.", "success")
except (ValueError, KeyError) as e:
flash(f"Erreur de validation: {e}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/monitoring", methods=["POST"])
@login_required
def update_monitoring():
try:
interval = int(request.form["check_interval_minutes"])
cooldown = int(request.form["alert_cooldown_minutes"])
if interval < 1:
flash("L'intervalle doit etre d'au moins 1 minute.", "danger")
return redirect(url_for("settings"))
if cooldown < 1:
flash("Le cooldown doit etre d'au moins 1 minute.", "danger")
return redirect(url_for("settings"))
config.set("check_interval_minutes", interval)
config.set("alert_cooldown_minutes", cooldown)
flash("Parametres de monitoring mis a jour.", "success")
except (ValueError, KeyError) as e:
flash(f"Erreur: {e}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/smtp", methods=["POST"])
@login_required
def update_smtp():
try:
smtp = {
"server": request.form["smtp_server"].strip(),
"port": int(request.form["smtp_port"]),
"use_tls": "smtp_tls" in request.form,
"username": request.form["smtp_username"].strip(),
"from_email": request.form["smtp_from"].strip(),
"to_emails": [
e.strip() for e in request.form["smtp_to"].split(",") if e.strip()
],
}
# Ne mettre a jour le mot de passe que s'il est fourni
new_password = request.form.get("smtp_password", "")
if new_password:
smtp["password"] = new_password
else:
# Garder l'ancien mot de passe
old_smtp = config.get("smtp", {})
smtp["password"] = old_smtp.get("password", "")
config.set("smtp", smtp)
flash("Configuration SMTP mise a jour.", "success")
except (ValueError, KeyError) as e:
flash(f"Erreur: {e}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/smtp/test", methods=["POST"])
@login_required
def test_smtp():
success, message = alerter.send_test()
if success:
flash(f"Test reussi : {message}", "success")
else:
flash(f"Test echoue : {message}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/processes", methods=["POST"])
@login_required
def update_processes():
try:
processes = []
names = request.form.getlist("proc_name[]")
patterns = request.form.getlist("proc_pattern[]")
mem_thresholds = request.form.getlist("proc_mem_threshold[]")
enableds = request.form.getlist("proc_enabled[]")
alert_downs = request.form.getlist("proc_alert_down[]")
for i in range(len(names)):
if not names[i].strip():
continue
processes.append({
"name": names[i].strip(),
"pattern": patterns[i].strip().lower() if i < len(patterns) else "",
"memory_threshold_mb": int(mem_thresholds[i]) if i < len(mem_thresholds) and mem_thresholds[i] else 0,
"enabled": str(i) in enableds,
"alert_on_down": str(i) in alert_downs,
})
config.set("processes", processes)
flash("Processus surveilles mis a jour.", "success")
except (ValueError, KeyError) as e:
flash(f"Erreur: {e}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/password", methods=["POST"])
@login_required
def update_password():
current_pw = request.form.get("current_password", "")
new_pw = request.form.get("new_password", "")
confirm_pw = request.form.get("confirm_password", "")
admin = config.get("admin", {})
if not check_password_hash(admin.get("password_hash", ""), current_pw):
flash("Mot de passe actuel incorrect.", "danger")
return redirect(url_for("settings"))
if len(new_pw) < 8:
flash("Le nouveau mot de passe doit faire au moins 8 caracteres.", "danger")
return redirect(url_for("settings"))
if new_pw != confirm_pw:
flash("Les mots de passe ne correspondent pas.", "danger")
return redirect(url_for("settings"))
admin["password_hash"] = generate_password_hash(new_pw)
config.set("admin", admin)
flash("Mot de passe mis a jour.", "success")
return redirect(url_for("settings"))
@app.route("/settings/port", methods=["POST"])
@login_required
def update_port():
try:
port = int(request.form["port"])
if not 1024 <= port <= 65535:
flash("Le port doit etre entre 1024 et 65535.", "danger")
return redirect(url_for("settings"))
config.set("port", port)
flash(f"Port mis a jour a {port}. Redemarrez l'application pour appliquer.", "warning")
except (ValueError, KeyError):
flash("Port invalide.", "danger")
return redirect(url_for("settings"))
@app.route("/alerts")
@login_required
def alerts():
alert_list = config.load_alerts()
return render_template("alerts.html", alerts=alert_list)
@app.route("/alerts/clear", methods=["POST"])
@login_required
def clear_alerts():
config.clear_alerts()
flash("Historique des alertes efface.", "success")
return redirect(url_for("alerts"))
@app.route("/api/monitoring/toggle", methods=["POST"])
@login_required
def toggle_monitoring():
if monitor._running:
monitor.stop()
flash("Monitoring arrete.", "warning")
else:
monitor.start()
flash("Monitoring demarre.", "success")
return redirect(url_for("dashboard"))
def check_port_available(port):
"""Verifie si un port est disponible."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("0.0.0.0", port))
return True
except OSError:
return False
def main():
port = config.get("port", 5000)
if not check_port_available(port):
print(f"[ERREUR] Le port {port} est deja utilise.")
print("Modifiez le port dans data/config.json ou liberez le port.")
sys.exit(1)
print(f"[Supervision] Demarrage sur le port {port}")
print(f"[Supervision] Interface : http://localhost:{port}")
if is_default_password():
print("[ATTENTION] Le mot de passe admin est encore 'admin'. Changez-le immediatement !")
# Demarrer le monitoring
monitor.start()
# Collecte initiale
monitor.collect_metrics()
print("[Supervision] Monitoring actif")
app.run(host="0.0.0.0", port=port, debug=False)
if __name__ == "__main__":
main()