feat: user_monitor - parsing logs Amadea
This commit is contained in:
@@ -1 +1,366 @@
|
||||
// UserMonitor module — Task 5
|
||||
use chrono::{Duration, Local, NaiveDateTime};
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
|
||||
use crate::config::ConfigManager;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct UserEntry {
|
||||
pub login: String,
|
||||
pub last_action_time: NaiveDateTime,
|
||||
pub last_action_label: String,
|
||||
pub action_count_24h: u32,
|
||||
pub status: String,
|
||||
pub explicit_logout: bool,
|
||||
pub logout_time: Option<NaiveDateTime>,
|
||||
pub connected_since: Option<NaiveDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct HourlyCount {
|
||||
pub hour: u32,
|
||||
pub count: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct UserData {
|
||||
pub users: Vec<UserEntry>,
|
||||
pub hourly: Vec<HourlyCount>,
|
||||
pub error: Option<String>,
|
||||
pub no_files: bool,
|
||||
}
|
||||
|
||||
fn log_files_for_date(log_path: &Path, prefix: &str, date_str: &str) -> Vec<std::path::PathBuf> {
|
||||
let pattern = format!("{}/{}_{}_*", log_path.to_string_lossy(), prefix, date_str);
|
||||
let re = Regex::new(r"_(\d+)\.[^.]+$").unwrap();
|
||||
let mut files: Vec<_> = glob::glob(&pattern)
|
||||
.unwrap_or_else(|_| glob::glob("__nonexistent__").unwrap())
|
||||
.filter_map(|f| f.ok())
|
||||
.filter(|f| !f.to_string_lossy().ends_with(".zip"))
|
||||
.collect();
|
||||
files.sort_by_key(|f| {
|
||||
re.captures(&f.to_string_lossy())
|
||||
.and_then(|c| c.get(1))
|
||||
.and_then(|m| m.as_str().parse::<u32>().ok())
|
||||
.unwrap_or(0)
|
||||
});
|
||||
files
|
||||
}
|
||||
|
||||
pub fn parse_awevents_line(
|
||||
line: &str,
|
||||
users: &mut HashMap<String, UserEntry>,
|
||||
cutoff_24h: NaiveDateTime,
|
||||
hourly: &mut HashMap<u32, HashSet<String>>,
|
||||
) {
|
||||
let re = Regex::new(
|
||||
r#"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\.\d+;[^;]*;;;;"login=([^,]+),action=([^,]+),Label=(.+?)"?\s*$"#,
|
||||
)
|
||||
.unwrap();
|
||||
let m = match re.captures(line) {
|
||||
Some(m) => m,
|
||||
None => return,
|
||||
};
|
||||
let ts_str = &m[1];
|
||||
let login = m[2].trim().to_string();
|
||||
let label = m[4].to_string();
|
||||
|
||||
if login.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let ts = match NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S") {
|
||||
Ok(t) => t,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
let is_logout = label.to_lowercase().contains("se deconnecter");
|
||||
|
||||
let entry = users.entry(login.clone()).or_insert_with(|| UserEntry {
|
||||
login: login.clone(),
|
||||
last_action_time: ts,
|
||||
last_action_label: label.chars().take(60).collect(),
|
||||
action_count_24h: 0,
|
||||
status: "deconnecte".into(),
|
||||
explicit_logout: is_logout,
|
||||
logout_time: if is_logout { Some(ts) } else { None },
|
||||
connected_since: Some(ts),
|
||||
});
|
||||
|
||||
if ts > entry.last_action_time {
|
||||
entry.last_action_time = ts;
|
||||
entry.last_action_label = label.chars().take(60).collect();
|
||||
}
|
||||
if is_logout {
|
||||
entry.explicit_logout = true;
|
||||
entry.logout_time = Some(ts);
|
||||
} else if entry.explicit_logout {
|
||||
if let Some(lt) = entry.logout_time {
|
||||
if ts > lt {
|
||||
entry.explicit_logout = false;
|
||||
entry.logout_time = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ts >= cutoff_24h {
|
||||
entry.action_count_24h += 1;
|
||||
}
|
||||
|
||||
hourly.entry(ts.hour() as u32).or_default().insert(login);
|
||||
}
|
||||
|
||||
pub fn compute_statuses(
|
||||
users: &mut HashMap<String, UserEntry>,
|
||||
active_min: u64,
|
||||
inactive_min: u64,
|
||||
now: NaiveDateTime,
|
||||
) {
|
||||
for user in users.values_mut() {
|
||||
let delta = (now - user.last_action_time).num_minutes().max(0) as u64;
|
||||
user.status = if user.explicit_logout {
|
||||
"deconnecte".into()
|
||||
} else if delta > inactive_min {
|
||||
"deconnecte".into()
|
||||
} else if delta > active_min {
|
||||
"inactif".into()
|
||||
} else {
|
||||
"actif".into()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UserMonitor {
|
||||
config_manager: Arc<AsyncMutex<ConfigManager>>,
|
||||
pub data: Arc<Mutex<UserData>>,
|
||||
running: Arc<std::sync::atomic::AtomicBool>,
|
||||
}
|
||||
|
||||
impl UserMonitor {
|
||||
pub fn new(config_manager: Arc<AsyncMutex<ConfigManager>>) -> Self {
|
||||
UserMonitor {
|
||||
config_manager,
|
||||
data: Arc::new(Mutex::new(UserData::default())),
|
||||
running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn parse_logs(&self) {
|
||||
let (log_path, active_min, inactive_min) = {
|
||||
let cm = self.config_manager.lock().await;
|
||||
(
|
||||
cm.config.amadea_log_path.clone(),
|
||||
cm.config.user_status_thresholds.active_minutes,
|
||||
cm.config.user_status_thresholds.inactive_minutes,
|
||||
)
|
||||
};
|
||||
|
||||
let log_dir = Path::new(&log_path);
|
||||
if !log_dir.is_dir() {
|
||||
let mut d = self.data.lock().unwrap();
|
||||
*d = UserData {
|
||||
error: Some(format!("Dossier de logs introuvable : {}", log_path)),
|
||||
..Default::default()
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
let now = Local::now().naive_local();
|
||||
let date_str = Local::now().format("%y-%m-%d").to_string();
|
||||
let cutoff_24h = now - Duration::hours(24);
|
||||
let awevents_files = log_files_for_date(log_dir, "awevents", &date_str);
|
||||
|
||||
if awevents_files.is_empty() {
|
||||
let mut d = self.data.lock().unwrap();
|
||||
*d = UserData {
|
||||
no_files: true,
|
||||
..Default::default()
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
let mut users: HashMap<String, UserEntry> = HashMap::new();
|
||||
let mut hourly: HashMap<u32, HashSet<String>> =
|
||||
(0..24).map(|h| (h, HashSet::new())).collect();
|
||||
|
||||
for file in &awevents_files {
|
||||
if let Ok(content) = fs::read_to_string(file) {
|
||||
for line in content.lines() {
|
||||
parse_awevents_line(line, &mut users, cutoff_24h, &mut hourly);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let re_isoft = Regex::new(
|
||||
r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*method=OpenUserSession.*login=([A-Za-z0-9_]+)",
|
||||
)
|
||||
.unwrap();
|
||||
for file in log_files_for_date(log_dir, "isoft", &date_str) {
|
||||
if let Ok(content) = fs::read_to_string(file) {
|
||||
for line in content.lines() {
|
||||
if let Some(m) = re_isoft.captures(line) {
|
||||
let login = m[2].to_string();
|
||||
if let Ok(ts) =
|
||||
NaiveDateTime::parse_from_str(&m[1], "%Y-%m-%d %H:%M:%S")
|
||||
{
|
||||
if let Some(u) = users.get_mut(&login) {
|
||||
if u.connected_since.is_none() {
|
||||
u.connected_since = Some(ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
compute_statuses(&mut users, active_min, inactive_min, now);
|
||||
|
||||
let status_order = |s: &str| match s {
|
||||
"actif" => 0,
|
||||
"inactif" => 1,
|
||||
_ => 2,
|
||||
};
|
||||
let mut sorted: Vec<UserEntry> = users.into_values().collect();
|
||||
sorted.sort_by_key(|u| status_order(&u.status));
|
||||
|
||||
let hourly_data: Vec<HourlyCount> = {
|
||||
let mut v: Vec<_> = hourly.iter().collect();
|
||||
v.sort_by_key(|(h, _)| *h);
|
||||
v.iter()
|
||||
.map(|(h, s)| HourlyCount {
|
||||
hour: **h,
|
||||
count: s.len(),
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
let mut d = self.data.lock().unwrap();
|
||||
*d = UserData {
|
||||
users: sorted,
|
||||
hourly: hourly_data,
|
||||
error: None,
|
||||
no_files: false,
|
||||
};
|
||||
}
|
||||
|
||||
pub async fn get_weekly_activity(&self) -> Vec<serde_json::Value> {
|
||||
let log_path = {
|
||||
let cm = self.config_manager.lock().await;
|
||||
cm.config.amadea_log_path.clone()
|
||||
};
|
||||
let log_dir = Path::new(&log_path);
|
||||
if !log_dir.is_dir() {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
let today = Local::now().date_naive();
|
||||
let mut result = Vec::new();
|
||||
let re = Regex::new(
|
||||
r"^(\d{4}-\d{2}-\d{2} (\d{2}):\d{2}:\d{2}).*login=([^,]+),",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
for delta in (0..=6i64).rev() {
|
||||
let day = today - Duration::days(delta);
|
||||
let date_str = day.format("%y-%m-%d").to_string();
|
||||
let files = log_files_for_date(log_dir, "awevents", &date_str);
|
||||
if files.is_empty() {
|
||||
result.push(serde_json::json!({ "date": day.to_string(), "count": null }));
|
||||
continue;
|
||||
}
|
||||
let mut hourly: HashMap<u32, HashSet<String>> =
|
||||
(0..24u32).map(|h| (h, HashSet::new())).collect();
|
||||
for file in &files {
|
||||
if let Ok(content) = fs::read_to_string(file) {
|
||||
for line in content.lines() {
|
||||
if let Some(m) = re.captures(line) {
|
||||
let hour: u32 = m[2].parse().unwrap_or(0);
|
||||
let login = m[3].trim().to_string();
|
||||
if !login.is_empty() {
|
||||
hourly.entry(hour).or_default().insert(login);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let max_concurrent = hourly.values().map(|s| s.len()).max().unwrap_or(0);
|
||||
result.push(
|
||||
serde_json::json!({ "date": day.to_string(), "count": max_concurrent }),
|
||||
);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
pub async fn start(self: Arc<Self>) {
|
||||
self.running
|
||||
.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
let um = self.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if !um.running.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
um.parse_logs().await;
|
||||
let interval = {
|
||||
let cm = um.config_manager.lock().await;
|
||||
cm.config.check_interval_minutes
|
||||
};
|
||||
tokio::time::sleep(std::time::Duration::from_secs(interval * 60)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_awevents_line_extracts_user_and_action() {
|
||||
let line =
|
||||
r#"2026-04-07 14:23:45.123;server;;;;"login=jdupont,action=consulter,Label=Consulter dossier""#;
|
||||
let mut users = HashMap::new();
|
||||
let cutoff = chrono::Local::now().naive_local()
|
||||
- chrono::Duration::hours(25);
|
||||
let mut hourly = (0..24u32).map(|h| (h, HashSet::new())).collect();
|
||||
parse_awevents_line(line, &mut users, cutoff, &mut hourly);
|
||||
assert!(users.contains_key("jdupont"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_awevents_line_ignores_malformed() {
|
||||
let line = "not a valid log line";
|
||||
let mut users = HashMap::new();
|
||||
let cutoff = chrono::Local::now().naive_local();
|
||||
let mut hourly = (0..24u32).map(|h| (h, HashSet::new())).collect();
|
||||
parse_awevents_line(line, &mut users, cutoff, &mut hourly);
|
||||
assert!(users.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compute_statuses_marks_recent_as_active() {
|
||||
let now = chrono::Local::now().naive_local();
|
||||
let mut users = HashMap::new();
|
||||
users.insert(
|
||||
"alice".into(),
|
||||
UserEntry {
|
||||
login: "alice".into(),
|
||||
last_action_time: now,
|
||||
last_action_label: "test".into(),
|
||||
action_count_24h: 1,
|
||||
status: "deconnecte".into(),
|
||||
explicit_logout: false,
|
||||
logout_time: None,
|
||||
connected_since: Some(now),
|
||||
},
|
||||
);
|
||||
compute_statuses(&mut users, 5, 30, now);
|
||||
assert_eq!(users["alice"].status, "actif");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user