Files
supervision/app.py
2026-04-20 16:54:55 +02:00

431 lines
14 KiB
Python

"""Supervision — Monitoring systeme avec interface web."""
import socket
import sys
from datetime import timedelta
from functools import wraps
from flask import (
Flask, render_template, request, redirect, url_for,
flash, jsonify, session,
)
from flask_login import (
LoginManager, UserMixin, login_user, logout_user,
login_required, current_user,
)
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from werkzeug.security import check_password_hash, generate_password_hash
from config_manager import ConfigManager
from monitor import SystemMonitor
from alerter import EmailAlerter
from user_monitor import UserMonitor
# --- Init ---
config = ConfigManager()
app = Flask(__name__)
app.secret_key = config.get("secret_key")
app.permanent_session_lifetime = timedelta(hours=8)
# En-tetes de securite
@app.after_request
def security_headers(response):
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response
# Rate limiting
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=[],
storage_uri="memory://",
)
# Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
login_manager.login_message = "Veuillez vous connecter."
# Services
alerter = EmailAlerter(config)
monitor = SystemMonitor(config, alerter)
user_monitor = UserMonitor(config)
class AdminUser(UserMixin):
def __init__(self, username):
self.id = username
@login_manager.user_loader
def load_user(user_id):
admin = config.get("admin", {})
if user_id == admin.get("username"):
return AdminUser(user_id)
return None
def is_default_password():
admin = config.get("admin", {})
return check_password_hash(admin.get("password_hash", ""), "admin")
# --- Routes ---
@app.route("/login", methods=["GET", "POST"])
@limiter.limit("10 per minute")
def login():
if current_user.is_authenticated:
return redirect(url_for("dashboard"))
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
admin = config.get("admin", {})
if username == admin.get("username") and check_password_hash(
admin.get("password_hash", ""), password
):
user = AdminUser(username)
login_user(user, remember=True)
session.permanent = True
next_page = request.args.get("next")
return redirect(next_page or url_for("dashboard"))
flash("Identifiants incorrects.", "danger")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for("login"))
@app.route("/")
@login_required
def dashboard():
metrics = monitor.metrics
default_pw = is_default_password()
return render_template("dashboard.html", metrics=metrics, default_pw=default_pw)
@app.route("/api/metrics")
@login_required
def api_metrics():
return jsonify(monitor.metrics)
@app.route("/settings", methods=["GET"])
@login_required
def settings():
cfg = config.config
smtp = cfg.get("smtp", {})
# Masquer le mot de passe SMTP dans l'affichage
smtp_display = dict(smtp)
smtp_display["password_masked"] = "*" * 8 if smtp_display.get("password") else ""
smtp_display["brevo_api_key_masked"] = "*" * 8 if smtp_display.get("brevo_api_key") else ""
return render_template(
"settings.html",
config=cfg,
smtp=smtp_display,
default_pw=is_default_password(),
)
@app.route("/settings/thresholds", methods=["POST"])
@login_required
def update_thresholds():
try:
thresholds = {
"cpu_percent": int(request.form["cpu_percent"]),
"ram_percent": int(request.form["ram_percent"]),
"disk_percent": int(request.form["disk_percent"]),
}
# Validation: seuils entre 1 et 100
for key, val in thresholds.items():
if not 1 <= val <= 100:
flash(f"Le seuil {key} doit etre entre 1 et 100.", "danger")
return redirect(url_for("settings"))
config.set("thresholds", thresholds)
flash("Seuils mis a jour.", "success")
except (ValueError, KeyError) as e:
flash(f"Erreur de validation: {e}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/monitoring", methods=["POST"])
@login_required
def update_monitoring():
try:
interval = int(request.form["check_interval_minutes"])
cooldown = int(request.form["alert_cooldown_minutes"])
if interval < 1:
flash("L'intervalle doit etre d'au moins 1 minute.", "danger")
return redirect(url_for("settings"))
if cooldown < 1:
flash("Le cooldown doit etre d'au moins 1 minute.", "danger")
return redirect(url_for("settings"))
config.set("check_interval_minutes", interval)
config.set("alert_cooldown_minutes", cooldown)
flash("Parametres de monitoring mis a jour.", "success")
except (ValueError, KeyError) as e:
flash(f"Erreur: {e}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/smtp", methods=["POST"])
@login_required
def update_smtp():
try:
old_smtp = config.get("smtp", {})
smtp = {
"server": request.form["smtp_server"].strip(),
"port": int(request.form["smtp_port"]),
"use_tls": "smtp_tls" in request.form,
"username": request.form["smtp_username"].strip(),
"from_email": request.form["smtp_from"].strip(),
"to_emails": [
e.strip() for e in request.form["smtp_to"].split(",") if e.strip()
],
}
# Conserver le mot de passe si non fourni
new_password = request.form.get("smtp_password", "")
smtp["password"] = new_password if new_password else old_smtp.get("password", "")
# Conserver la cle Brevo si non fournie
new_brevo_key = request.form.get("brevo_api_key", "").strip()
smtp["brevo_api_key"] = new_brevo_key if new_brevo_key else old_smtp.get("brevo_api_key", "")
config.set("smtp", smtp)
flash("Configuration SMTP mise a jour.", "success")
except (ValueError, KeyError) as e:
flash(f"Erreur: {e}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/smtp/test", methods=["POST"])
@login_required
def test_smtp():
success, message = alerter.send_test()
if success:
flash(f"Test reussi : {message}", "success")
else:
flash(f"Test echoue : {message}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/processes", methods=["POST"])
@login_required
def update_processes():
try:
processes = []
names = request.form.getlist("proc_name[]")
patterns = request.form.getlist("proc_pattern[]")
mem_thresholds = request.form.getlist("proc_mem_threshold[]")
enableds = request.form.getlist("proc_enabled[]")
alert_downs = request.form.getlist("proc_alert_down[]")
for i in range(len(names)):
if not names[i].strip():
continue
processes.append({
"name": names[i].strip(),
"pattern": patterns[i].strip().lower() if i < len(patterns) else "",
"memory_threshold_mb": int(mem_thresholds[i]) if i < len(mem_thresholds) and mem_thresholds[i] else 0,
"enabled": str(i) in enableds,
"alert_on_down": str(i) in alert_downs,
})
config.set("processes", processes)
flash("Processus surveilles mis a jour.", "success")
except (ValueError, KeyError) as e:
flash(f"Erreur: {e}", "danger")
return redirect(url_for("settings"))
@app.route("/settings/password", methods=["POST"])
@login_required
def update_password():
current_pw = request.form.get("current_password", "")
new_pw = request.form.get("new_password", "")
confirm_pw = request.form.get("confirm_password", "")
admin = config.get("admin", {})
if not check_password_hash(admin.get("password_hash", ""), current_pw):
flash("Mot de passe actuel incorrect.", "danger")
return redirect(url_for("settings"))
if len(new_pw) < 8:
flash("Le nouveau mot de passe doit faire au moins 8 caracteres.", "danger")
return redirect(url_for("settings"))
if new_pw != confirm_pw:
flash("Les mots de passe ne correspondent pas.", "danger")
return redirect(url_for("settings"))
admin["password_hash"] = generate_password_hash(new_pw)
config.set("admin", admin)
flash("Mot de passe mis a jour.", "success")
return redirect(url_for("settings"))
@app.route("/settings/port", methods=["POST"])
@login_required
def update_port():
try:
port = int(request.form["port"])
if not 1024 <= port <= 65535:
flash("Le port doit etre entre 1024 et 65535.", "danger")
return redirect(url_for("settings"))
config.set("port", port)
flash(f"Port mis a jour a {port}. Redemarrez l'application pour appliquer.", "warning")
except (ValueError, KeyError):
flash("Port invalide.", "danger")
return redirect(url_for("settings"))
@app.route("/alerts")
@login_required
def alerts():
alert_list = config.load_alerts()
return render_template("alerts.html", alerts=alert_list)
@app.route("/alerts/clear", methods=["POST"])
@login_required
def clear_alerts():
config.clear_alerts()
flash("Historique des alertes efface.", "success")
return redirect(url_for("alerts"))
@app.route("/api/monitoring/toggle", methods=["POST"])
@login_required
def toggle_monitoring():
if monitor._running:
monitor.stop()
flash("Monitoring arrete.", "warning")
else:
monitor.start()
flash("Monitoring demarre.", "success")
return redirect(url_for("dashboard"))
@app.route("/users")
@login_required
def users():
return render_template("users.html")
@app.route("/api/users")
@login_required
def api_users():
cache = user_monitor.data
if cache.get("error"):
return jsonify({"error": cache["error"]})
if cache.get("no_files"):
return jsonify({"no_files": True})
users_list = [
{
"login": u["login"],
"status": u["status"],
"last_action_time": u["last_action_time"].strftime("%H:%M:%S") if u.get("last_action_time") else None,
"last_action_label": u.get("last_action_label", ""),
"action_count_24h": u.get("action_count_24h", 0),
"connected_since": u["connected_since"].strftime("%H:%M") if u.get("connected_since") else None,
"explicit_logout": u.get("explicit_logout", False),
}
for u in cache.get("users", {}).values()
]
return jsonify({"users": users_list, "hourly": cache.get("hourly", [])})
@app.route("/api/users/activity/weekly")
@login_required
def api_users_weekly():
return jsonify({"weekly": user_monitor.get_weekly_activity()})
@app.route("/api/users/day/<date_str>")
@login_required
def api_users_day(date_str):
from datetime import datetime as dt
try:
date = dt.strptime(date_str, "%Y-%m-%d").date()
except ValueError:
return jsonify({"error": "Date invalide"}), 400
users = user_monitor.get_users_for_date(date)
return jsonify({"users": users, "date": date_str})
@app.route("/settings/amadea-log-path", methods=["POST"])
@login_required
def update_amadea_log_path():
path = request.form.get("amadea_log_path", "").strip()
if not path:
flash("Le chemin ne peut pas etre vide.", "danger")
return redirect(url_for("settings"))
config.set("amadea_log_path", path)
flash("Chemin des logs Amadea mis a jour.", "success")
return redirect(url_for("settings"))
@app.route("/settings/user-thresholds", methods=["POST"])
@login_required
def update_user_thresholds():
try:
active = int(request.form["active_minutes"])
inactive = int(request.form["inactive_minutes"])
if active < 1 or inactive < 1:
flash("Les seuils doivent etre d'au moins 1 minute.", "danger")
return redirect(url_for("settings"))
if active >= inactive:
flash("Le seuil 'actif' doit etre inferieur au seuil 'inactif'.", "danger")
return redirect(url_for("settings"))
config.set("user_status_thresholds", {"active_minutes": active, "inactive_minutes": inactive})
flash("Seuils utilisateurs mis a jour.", "success")
except (ValueError, KeyError) as e:
flash(f"Erreur: {e}", "danger")
return redirect(url_for("settings"))
def check_port_available(port):
"""Verifie si un port est disponible."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("0.0.0.0", port))
return True
except OSError:
return False
def main():
port = config.get("port", 5000)
if not check_port_available(port):
print(f"[ERREUR] Le port {port} est deja utilise.")
print("Modifiez le port dans data/config.json ou liberez le port.")
sys.exit(1)
print(f"[Supervision] Demarrage sur le port {port}")
print(f"[Supervision] Interface : http://localhost:{port}")
if is_default_password():
print("[ATTENTION] Le mot de passe admin est encore 'admin'. Changez-le immediatement !")
# Demarrer le monitoring
monitor.start()
# Collecte initiale
monitor.collect_metrics()
user_monitor.start()
user_monitor.parse_logs()
print("[Supervision] Monitoring actif")
app.run(host="0.0.0.0", port=port, debug=False)
if __name__ == "__main__":
main()