Files
supervision/user_monitor.py
2026-04-20 16:54:55 +02:00

314 lines
11 KiB
Python

"""Suivi des utilisateurs connectes a Amadea via parsing des logs."""
import gzip
import glob
import os
import re
import threading
import time
from datetime import datetime, timedelta
# Regex compilees au niveau module (performance)
_AWEVENTS_RE = re.compile(
r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\.\d+;[^;]*;;;;"login=([^,]+),action=([^,]+),Label=(.+?)"?\s*$'
)
_ISOFT_LOGIN_RE = re.compile(
r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*method=OpenUserSession.*login=([A-Za-z0-9_]+)'
)
def _read_log_file(filepath):
"""Lit un fichier log, supporte .log et .log.gz."""
try:
if filepath.endswith('.gz'):
with gzip.open(filepath, 'rt', encoding='utf-8', errors='ignore') as f:
return f.read()
else:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
except (PermissionError, OSError):
return None
_DATE_IN_NAME_RE = re.compile(r'_\d{2}-\d{2}-\d{2}[_.]')
def _log_files_for_date(log_path, prefix, date_str):
"""Retourne les fichiers de logs pour un prefixe et une date donnes, tries par index.
Cherche les fichiers avec la date dans le nom (ex: awevents_26-04-13_1.log.gz).
Pour le jour courant uniquement, inclut aussi les fichiers sans date dans le nom
(ex: awevents.log, isoft.log) qui sont les logs actifs du jour.
"""
def is_valid(f):
return f.endswith('.log') or f.endswith('.log.gz')
def sort_key(f):
m = re.search(r'_(\d+)\.log(\.gz)?$', f)
return int(m.group(1)) if m else 0
# Fichiers avec la date dans le nom
pattern = os.path.join(log_path, f"{prefix}_{date_str}_*")
files = [f for f in glob.glob(pattern) if is_valid(f)]
# Pour le jour courant uniquement : inclure aussi les fichiers sans date dans le nom
today_str = datetime.now().strftime("%y-%m-%d")
if date_str == today_str:
active_pattern = os.path.join(log_path, f"{prefix}*")
for f in glob.glob(active_pattern):
fname = os.path.basename(f)
if is_valid(f) and not _DATE_IN_NAME_RE.search(fname) and f not in files:
files.append(f)
return sorted(files, key=sort_key)
class UserMonitor:
def __init__(self, config_manager):
self.config = config_manager
self._cache = {"users": {}, "hourly": [], "error": None, "no_files": False}
self._lock = threading.Lock()
self._running = False
self._thread = None
@property
def data(self):
with self._lock:
return dict(self._cache)
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self):
self._running = False
def _loop(self):
last_parse = 0
while self._running:
interval = self.config.get("check_interval_minutes", 1) * 60
if time.time() - last_parse >= interval:
try:
self.parse_logs()
except Exception as e:
print(f"[UserMonitor] Erreur: {e}")
last_parse = time.time()
time.sleep(5)
def parse_logs(self):
log_path = self.config.get(
"amadea_log_path",
r"C:\ProgramData\ISoft\Amadea Web 8 x64\data\logs"
)
thresholds = self.config.get(
"user_status_thresholds",
{"active_minutes": 5, "inactive_minutes": 30}
)
if not os.path.isdir(log_path):
with self._lock:
self._cache = {
"error": f"Dossier de logs introuvable : {log_path}",
"users": {}, "hourly": [], "no_files": False,
}
return
date_str = datetime.now().strftime("%y-%m-%d")
awevents_files = _log_files_for_date(log_path, "awevents", date_str)
if not awevents_files:
with self._lock:
self._cache = {"no_files": True, "error": None, "users": {}, "hourly": []}
return
now = datetime.now()
cutoff_24h = now - timedelta(hours=24)
users = {}
hourly = {h: set() for h in range(24)}
for filepath in awevents_files:
content = _read_log_file(filepath)
if content:
for line in content.splitlines():
self._parse_awevents_line(line, users, cutoff_24h, hourly)
isoft_files = _log_files_for_date(log_path, "isoft", date_str)
for filepath in isoft_files:
content = _read_log_file(filepath)
if content:
for line in content.splitlines():
self._parse_isoft_line(line, users)
self._compute_statuses(users, thresholds, now)
status_order = {"actif": 0, "inactif": 1, "deconnecte": 2}
sorted_users = dict(
sorted(users.items(), key=lambda x: (
status_order.get(x[1]["status"], 3),
-(x[1]["last_action_time"].timestamp() if x[1].get("last_action_time") else 0),
))
)
hourly_data = [{"hour": h, "count": len(logins)} for h, logins in sorted(hourly.items())]
with self._lock:
self._cache = {
"error": None,
"no_files": False,
"users": sorted_users,
"hourly": hourly_data,
}
def _parse_awevents_line(self, line, users, cutoff_24h, hourly):
m = _AWEVENTS_RE.match(line)
if not m:
return
ts_str, login, action, label = m.group(1), m.group(2), m.group(3), m.group(4)
try:
ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
return
login = login.strip()
if not login:
return
is_logout = "se deconnecter" in label.lower()
if login not in users:
users[login] = {
"login": login,
"last_action_time": ts,
"last_action_label": label[:60],
"action_count_24h": 0,
"status": "deconnecte",
"explicit_logout": is_logout,
"logout_time": ts if is_logout else None,
"connected_since": ts,
}
else:
user = users[login]
if ts > user["last_action_time"]:
user["last_action_time"] = ts
user["last_action_label"] = label[:60]
if is_logout:
user["explicit_logout"] = True
user["logout_time"] = ts
elif user["explicit_logout"] and user.get("logout_time") and ts > user["logout_time"]:
# Activite apres deconnexion explicite = reconnexion
user["explicit_logout"] = False
user["logout_time"] = None
if ts >= cutoff_24h:
users[login]["action_count_24h"] += 1
hourly[ts.hour].add(login)
def _parse_isoft_line(self, line, users):
m = _ISOFT_LOGIN_RE.match(line)
if not m:
return
ts_str, login = m.group(1), m.group(2)
try:
ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
return
if login in users and users[login]["connected_since"] is None:
users[login]["connected_since"] = ts
def _compute_statuses(self, users, thresholds, now):
active_min = thresholds.get("active_minutes", 5)
inactive_min = thresholds.get("inactive_minutes", 30)
for user in users.values():
delta = (now - user["last_action_time"]).total_seconds() / 60
if user.get("explicit_logout"):
user["status"] = "deconnecte"
elif delta > inactive_min:
user["status"] = "deconnecte"
elif delta > active_min:
user["status"] = "inactif"
else:
user["status"] = "actif"
def get_users_for_date(self, date):
"""Retourne la liste des utilisateurs ayant agi a une date donnee, tries par nb d'actions."""
log_path = self.config.get(
"amadea_log_path",
r"C:\ProgramData\ISoft\Amadea Web 8 x64\data\logs"
)
if not os.path.isdir(log_path):
return []
date_str = date.strftime("%y-%m-%d")
cutoff = datetime(date.year, date.month, date.day)
files = _log_files_for_date(log_path, "awevents", date_str)
if not files:
return []
users = {}
hourly = {h: set() for h in range(24)}
for filepath in files:
content = _read_log_file(filepath)
if content:
for line in content.splitlines():
self._parse_awevents_line(line, users, cutoff, hourly)
result = sorted(users.values(), key=lambda u: -u.get("action_count_24h", 0))
output = []
for u in result:
duration = None
if u.get("connected_since") and u.get("last_action_time"):
mins = int((u["last_action_time"] - u["connected_since"]).total_seconds() / 60)
if mins >= 60:
duration = f"{mins // 60}h{mins % 60:02d}"
else:
duration = f"{mins}min"
output.append({
"login": u["login"],
"last_action_time": u["last_action_time"].strftime("%H:%M:%S") if u.get("last_action_time") else None,
"last_action_label": u.get("last_action_label", ""),
"action_count": u.get("action_count_24h", 0),
"first_action_time": u["connected_since"].strftime("%H:%M") if u.get("connected_since") else None,
"duration": duration,
})
return output
def get_weekly_activity(self):
"""Retourne le nombre max d'utilisateurs actifs simultanes par jour (7 derniers jours)."""
log_path = self.config.get(
"amadea_log_path",
r"C:\ProgramData\ISoft\Amadea Web 8 x64\data\logs"
)
if not os.path.isdir(log_path):
return []
result = []
today = datetime.now().date()
for delta in range(6, -1, -1):
day = today - timedelta(days=delta)
date_str = day.strftime("%y-%m-%d")
files = _log_files_for_date(log_path, "awevents", date_str)
if not files:
result.append({"date": day.isoformat(), "count": None})
continue
hourly = {h: set() for h in range(24)}
for filepath in files:
content = _read_log_file(filepath)
if not content:
continue
for line in content.splitlines():
m = re.match(
r'^(\d{4}-\d{2}-\d{2} (\d{2}):\d{2}:\d{2}).*login=([^,]+),',
line
)
if m:
hour = int(m.group(2))
login = m.group(3).strip()
if login:
hourly[hour].add(login)
max_concurrent = max((len(v) for v in hourly.values()), default=0)
result.append({"date": day.isoformat(), "count": max_concurrent})
return result