"""Supervision — Monitoring systeme avec interface web.""" import socket import sys from datetime import timedelta from functools import wraps from flask import ( Flask, render_template, request, redirect, url_for, flash, jsonify, session, ) from flask_login import ( LoginManager, UserMixin, login_user, logout_user, login_required, current_user, ) from flask_limiter import Limiter from flask_limiter.util import get_remote_address from werkzeug.security import check_password_hash, generate_password_hash from config_manager import ConfigManager from monitor import SystemMonitor from alerter import EmailAlerter from user_monitor import UserMonitor # --- Init --- config = ConfigManager() app = Flask(__name__) app.secret_key = config.get("secret_key") app.permanent_session_lifetime = timedelta(hours=8) # En-tetes de securite @app.after_request def security_headers(response): response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" return response # Rate limiting limiter = Limiter( app=app, key_func=get_remote_address, default_limits=[], storage_uri="memory://", ) # Flask-Login login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "login" login_manager.login_message = "Veuillez vous connecter." # Services alerter = EmailAlerter(config) monitor = SystemMonitor(config, alerter) user_monitor = UserMonitor(config) class AdminUser(UserMixin): def __init__(self, username): self.id = username @login_manager.user_loader def load_user(user_id): admin = config.get("admin", {}) if user_id == admin.get("username"): return AdminUser(user_id) return None def is_default_password(): admin = config.get("admin", {}) return check_password_hash(admin.get("password_hash", ""), "admin") # --- Routes --- @app.route("/login", methods=["GET", "POST"]) @limiter.limit("10 per minute") def login(): if current_user.is_authenticated: return redirect(url_for("dashboard")) if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "") admin = config.get("admin", {}) if username == admin.get("username") and check_password_hash( admin.get("password_hash", ""), password ): user = AdminUser(username) login_user(user, remember=True) session.permanent = True next_page = request.args.get("next") return redirect(next_page or url_for("dashboard")) flash("Identifiants incorrects.", "danger") return render_template("login.html") @app.route("/logout") @login_required def logout(): logout_user() return redirect(url_for("login")) @app.route("/") @login_required def dashboard(): metrics = monitor.metrics default_pw = is_default_password() return render_template("dashboard.html", metrics=metrics, default_pw=default_pw) @app.route("/api/metrics") @login_required def api_metrics(): return jsonify(monitor.metrics) @app.route("/settings", methods=["GET"]) @login_required def settings(): cfg = config.config smtp = cfg.get("smtp", {}) # Masquer le mot de passe SMTP dans l'affichage smtp_display = dict(smtp) if smtp_display.get("password"): smtp_display["password_masked"] = "*" * 8 else: smtp_display["password_masked"] = "" return render_template( "settings.html", config=cfg, smtp=smtp_display, default_pw=is_default_password(), ) @app.route("/settings/thresholds", methods=["POST"]) @login_required def update_thresholds(): try: thresholds = { "cpu_percent": int(request.form["cpu_percent"]), "ram_percent": int(request.form["ram_percent"]), "disk_percent": int(request.form["disk_percent"]), } # Validation: seuils entre 1 et 100 for key, val in thresholds.items(): if not 1 <= val <= 100: flash(f"Le seuil {key} doit etre entre 1 et 100.", "danger") return redirect(url_for("settings")) config.set("thresholds", thresholds) flash("Seuils mis a jour.", "success") except (ValueError, KeyError) as e: flash(f"Erreur de validation: {e}", "danger") return redirect(url_for("settings")) @app.route("/settings/monitoring", methods=["POST"]) @login_required def update_monitoring(): try: interval = int(request.form["check_interval_minutes"]) cooldown = int(request.form["alert_cooldown_minutes"]) if interval < 1: flash("L'intervalle doit etre d'au moins 1 minute.", "danger") return redirect(url_for("settings")) if cooldown < 1: flash("Le cooldown doit etre d'au moins 1 minute.", "danger") return redirect(url_for("settings")) config.set("check_interval_minutes", interval) config.set("alert_cooldown_minutes", cooldown) flash("Parametres de monitoring mis a jour.", "success") except (ValueError, KeyError) as e: flash(f"Erreur: {e}", "danger") return redirect(url_for("settings")) @app.route("/settings/smtp", methods=["POST"]) @login_required def update_smtp(): try: smtp = { "server": request.form["smtp_server"].strip(), "port": int(request.form["smtp_port"]), "use_tls": "smtp_tls" in request.form, "username": request.form["smtp_username"].strip(), "from_email": request.form["smtp_from"].strip(), "to_emails": [ e.strip() for e in request.form["smtp_to"].split(",") if e.strip() ], } # Ne mettre a jour le mot de passe que s'il est fourni new_password = request.form.get("smtp_password", "") if new_password: smtp["password"] = new_password else: # Garder l'ancien mot de passe old_smtp = config.get("smtp", {}) smtp["password"] = old_smtp.get("password", "") config.set("smtp", smtp) flash("Configuration SMTP mise a jour.", "success") except (ValueError, KeyError) as e: flash(f"Erreur: {e}", "danger") return redirect(url_for("settings")) @app.route("/settings/smtp/test", methods=["POST"]) @login_required def test_smtp(): success, message = alerter.send_test() if success: flash(f"Test reussi : {message}", "success") else: flash(f"Test echoue : {message}", "danger") return redirect(url_for("settings")) @app.route("/settings/processes", methods=["POST"]) @login_required def update_processes(): try: processes = [] names = request.form.getlist("proc_name[]") patterns = request.form.getlist("proc_pattern[]") mem_thresholds = request.form.getlist("proc_mem_threshold[]") enableds = request.form.getlist("proc_enabled[]") alert_downs = request.form.getlist("proc_alert_down[]") for i in range(len(names)): if not names[i].strip(): continue processes.append({ "name": names[i].strip(), "pattern": patterns[i].strip().lower() if i < len(patterns) else "", "memory_threshold_mb": int(mem_thresholds[i]) if i < len(mem_thresholds) and mem_thresholds[i] else 0, "enabled": str(i) in enableds, "alert_on_down": str(i) in alert_downs, }) config.set("processes", processes) flash("Processus surveilles mis a jour.", "success") except (ValueError, KeyError) as e: flash(f"Erreur: {e}", "danger") return redirect(url_for("settings")) @app.route("/settings/password", methods=["POST"]) @login_required def update_password(): current_pw = request.form.get("current_password", "") new_pw = request.form.get("new_password", "") confirm_pw = request.form.get("confirm_password", "") admin = config.get("admin", {}) if not check_password_hash(admin.get("password_hash", ""), current_pw): flash("Mot de passe actuel incorrect.", "danger") return redirect(url_for("settings")) if len(new_pw) < 8: flash("Le nouveau mot de passe doit faire au moins 8 caracteres.", "danger") return redirect(url_for("settings")) if new_pw != confirm_pw: flash("Les mots de passe ne correspondent pas.", "danger") return redirect(url_for("settings")) admin["password_hash"] = generate_password_hash(new_pw) config.set("admin", admin) flash("Mot de passe mis a jour.", "success") return redirect(url_for("settings")) @app.route("/settings/port", methods=["POST"]) @login_required def update_port(): try: port = int(request.form["port"]) if not 1024 <= port <= 65535: flash("Le port doit etre entre 1024 et 65535.", "danger") return redirect(url_for("settings")) config.set("port", port) flash(f"Port mis a jour a {port}. Redemarrez l'application pour appliquer.", "warning") except (ValueError, KeyError): flash("Port invalide.", "danger") return redirect(url_for("settings")) @app.route("/alerts") @login_required def alerts(): alert_list = config.load_alerts() return render_template("alerts.html", alerts=alert_list) @app.route("/alerts/clear", methods=["POST"]) @login_required def clear_alerts(): config.clear_alerts() flash("Historique des alertes efface.", "success") return redirect(url_for("alerts")) @app.route("/api/monitoring/toggle", methods=["POST"]) @login_required def toggle_monitoring(): if monitor._running: monitor.stop() flash("Monitoring arrete.", "warning") else: monitor.start() flash("Monitoring demarre.", "success") return redirect(url_for("dashboard")) @app.route("/users") @login_required def users(): return render_template("users.html") @app.route("/api/users") @login_required def api_users(): cache = user_monitor.data if cache.get("error"): return jsonify({"error": cache["error"]}) if cache.get("no_files"): return jsonify({"no_files": True}) users_list = [ { "login": u["login"], "status": u["status"], "last_action_time": u["last_action_time"].strftime("%H:%M:%S") if u.get("last_action_time") else None, "last_action_label": u.get("last_action_label", ""), "action_count_24h": u.get("action_count_24h", 0), "connected_since": u["connected_since"].strftime("%H:%M") if u.get("connected_since") else None, "explicit_logout": u.get("explicit_logout", False), } for u in cache.get("users", {}).values() ] return jsonify({"users": users_list, "hourly": cache.get("hourly", [])}) @app.route("/api/users/activity/weekly") @login_required def api_users_weekly(): return jsonify({"weekly": user_monitor.get_weekly_activity()}) @app.route("/settings/amadea-log-path", methods=["POST"]) @login_required def update_amadea_log_path(): path = request.form.get("amadea_log_path", "").strip() if not path: flash("Le chemin ne peut pas etre vide.", "danger") return redirect(url_for("settings")) config.set("amadea_log_path", path) flash("Chemin des logs Amadea mis a jour.", "success") return redirect(url_for("settings")) @app.route("/settings/user-thresholds", methods=["POST"]) @login_required def update_user_thresholds(): try: active = int(request.form["active_minutes"]) inactive = int(request.form["inactive_minutes"]) if active < 1 or inactive < 1: flash("Les seuils doivent etre d'au moins 1 minute.", "danger") return redirect(url_for("settings")) if active >= inactive: flash("Le seuil 'actif' doit etre inferieur au seuil 'inactif'.", "danger") return redirect(url_for("settings")) config.set("user_status_thresholds", {"active_minutes": active, "inactive_minutes": inactive}) flash("Seuils utilisateurs mis a jour.", "success") except (ValueError, KeyError) as e: flash(f"Erreur: {e}", "danger") return redirect(url_for("settings")) def check_port_available(port): """Verifie si un port est disponible.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(("0.0.0.0", port)) return True except OSError: return False def main(): port = config.get("port", 5000) if not check_port_available(port): print(f"[ERREUR] Le port {port} est deja utilise.") print("Modifiez le port dans data/config.json ou liberez le port.") sys.exit(1) print(f"[Supervision] Demarrage sur le port {port}") print(f"[Supervision] Interface : http://localhost:{port}") if is_default_password(): print("[ATTENTION] Le mot de passe admin est encore 'admin'. Changez-le immediatement !") # Demarrer le monitoring monitor.start() # Collecte initiale monitor.collect_metrics() user_monitor.start() user_monitor.parse_logs() print("[Supervision] Monitoring actif") app.run(host="0.0.0.0", port=port, debug=False) if __name__ == "__main__": main()