"""Suivi des utilisateurs connectes a Amadea via parsing des logs.""" import glob import os import re import threading import time from datetime import datetime, timedelta # Regex compilees au niveau module (performance) _AWEVENTS_RE = re.compile( r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\.\d+;[^;]*;;;;"login=([^,]+),action=([^,]+),Label=(.+?)"?\s*$' ) _ISOFT_LOGIN_RE = re.compile( r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*method=OpenUserSession.*login=([A-Za-z0-9_]+)' ) def _log_files_for_date(log_path, prefix, date_str): """Retourne les fichiers de logs pour un prefixe et une date donnes, tries par index.""" pattern = os.path.join(log_path, f"{prefix}_{date_str}_*") files = [f for f in glob.glob(pattern) if not f.endswith('.zip')] return sorted(files, key=lambda f: int(re.search(r'_(\d+)\.[^.]+$', f).group(1))) class UserMonitor: def __init__(self, config_manager): self.config = config_manager self._cache = {"users": {}, "hourly": [], "error": None, "no_files": False} self._lock = threading.Lock() self._running = False self._thread = None @property def data(self): with self._lock: return dict(self._cache) def start(self): if self._running: return self._running = True self._thread = threading.Thread(target=self._loop, daemon=True) self._thread.start() def stop(self): self._running = False def _loop(self): last_parse = 0 while self._running: interval = self.config.get("check_interval_minutes", 1) * 60 if time.time() - last_parse >= interval: try: self.parse_logs() except Exception as e: print(f"[UserMonitor] Erreur: {e}") last_parse = time.time() time.sleep(5) def parse_logs(self): log_path = self.config.get( "amadea_log_path", r"C:\ProgramData\ISoft\Amadea Web 8 x64\data\logs" ) thresholds = self.config.get( "user_status_thresholds", {"active_minutes": 5, "inactive_minutes": 30} ) if not os.path.isdir(log_path): with self._lock: self._cache = { "error": f"Dossier de logs introuvable : {log_path}", "users": {}, "hourly": [], "no_files": False, } return date_str = datetime.now().strftime("%y-%m-%d") awevents_files = _log_files_for_date(log_path, "awevents", date_str) if not awevents_files: with self._lock: self._cache = {"no_files": True, "error": None, "users": {}, "hourly": []} return now = datetime.now() cutoff_24h = now - timedelta(hours=24) users = {} hourly = {h: set() for h in range(24)} for filepath in awevents_files: try: with open(filepath, "r", encoding="utf-8", errors="ignore") as f: for line in f: self._parse_awevents_line(line, users, cutoff_24h, hourly) except (PermissionError, OSError): continue isoft_files = _log_files_for_date(log_path, "isoft", date_str) for filepath in isoft_files: try: with open(filepath, "r", encoding="utf-8", errors="ignore") as f: for line in f: self._parse_isoft_line(line, users) except (PermissionError, OSError): continue self._compute_statuses(users, thresholds, now) status_order = {"actif": 0, "inactif": 1, "deconnecte": 2} sorted_users = dict( sorted(users.items(), key=lambda x: status_order.get(x[1]["status"], 3)) ) hourly_data = [{"hour": h, "count": len(logins)} for h, logins in sorted(hourly.items())] with self._lock: self._cache = { "error": None, "no_files": False, "users": sorted_users, "hourly": hourly_data, } def _parse_awevents_line(self, line, users, cutoff_24h, hourly): m = _AWEVENTS_RE.match(line) if not m: return ts_str, login, action, label = m.group(1), m.group(2), m.group(3), m.group(4) try: ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") except ValueError: return login = login.strip() if not login: return is_logout = "se deconnecter" in label.lower() if login not in users: users[login] = { "login": login, "last_action_time": ts, "last_action_label": label[:60], "action_count_24h": 0, "status": "deconnecte", "explicit_logout": is_logout, "logout_time": ts if is_logout else None, "connected_since": ts, } else: user = users[login] if ts > user["last_action_time"]: user["last_action_time"] = ts user["last_action_label"] = label[:60] if is_logout: user["explicit_logout"] = True user["logout_time"] = ts elif user["explicit_logout"] and user.get("logout_time") and ts > user["logout_time"]: # Activite apres deconnexion explicite = reconnexion user["explicit_logout"] = False user["logout_time"] = None if ts >= cutoff_24h: users[login]["action_count_24h"] += 1 hourly[ts.hour].add(login) def _parse_isoft_line(self, line, users): m = _ISOFT_LOGIN_RE.match(line) if not m: return ts_str, login = m.group(1), m.group(2) try: ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") except ValueError: return if login in users and users[login]["connected_since"] is None: users[login]["connected_since"] = ts def _compute_statuses(self, users, thresholds, now): active_min = thresholds.get("active_minutes", 5) inactive_min = thresholds.get("inactive_minutes", 30) for user in users.values(): delta = (now - user["last_action_time"]).total_seconds() / 60 if user.get("explicit_logout"): user["status"] = "deconnecte" elif delta > inactive_min: user["status"] = "deconnecte" elif delta > active_min: user["status"] = "inactif" else: user["status"] = "actif" def get_weekly_activity(self): """Retourne le nombre max d'utilisateurs actifs simultanes par jour (7 derniers jours).""" log_path = self.config.get( "amadea_log_path", r"C:\ProgramData\ISoft\Amadea Web 8 x64\data\logs" ) if not os.path.isdir(log_path): return [] result = [] today = datetime.now().date() for delta in range(6, -1, -1): day = today - timedelta(days=delta) date_str = day.strftime("%y-%m-%d") files = _log_files_for_date(log_path, "awevents", date_str) if not files: result.append({"date": day.isoformat(), "count": None}) continue hourly = {h: set() for h in range(24)} for filepath in files: try: with open(filepath, "r", encoding="utf-8", errors="ignore") as f: for line in f: m = re.match( r'^(\d{4}-\d{2}-\d{2} (\d{2}):\d{2}:\d{2}).*login=([^,]+),', line ) if m: hour = int(m.group(2)) login = m.group(3).strip() if login: hourly[hour].add(login) except (PermissionError, OSError): continue max_concurrent = max((len(v) for v in hourly.values()), default=0) result.append({"date": day.isoformat(), "count": max_concurrent}) return result