""" Tests du bridge Process Mining (PM4Py) pour rpa_vision_v3. Couvre : - Conversion sessions JSONL -> event log PM4Py - Conversion workflow core -> event log PM4Py - Decouverte BPMN (Inductive Miner) - Calcul de KPIs - Test avec donnees reelles (marque @slow) """ import json import os import shutil import tempfile from datetime import datetime, timezone from pathlib import Path import pandas as pd import pytest from core.analytics.process_mining_bridge import ( PM4PY_AVAILABLE, _build_activity_label, _extract_timestamp, compute_kpis, discover_bpmn, load_jsonl_session, sessions_to_event_log, workflow_to_event_log, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- SAMPLE_EVENTS = [ { "session_id": "sess_test_001", "timestamp": 1776062946.0, "event": { "type": "window_focus_change", "from": None, "to": {"title": "Bureau", "app_name": "explorer.exe"}, "timestamp": 1776062946.0, "window": {"title": "Bureau", "app_name": "explorer.exe"}, }, }, { "session_id": "sess_test_001", "timestamp": 1776062948.0, "event": { "type": "mouse_click", "button": "left", "pos": [500, 300], "timestamp": 1776062948.0, "window": {"title": "Bloc-notes", "app_name": "Notepad.exe"}, }, }, { "session_id": "sess_test_001", "timestamp": 1776062950.0, "event": { "type": "text_input", "text": "Bonjour Dom", "timestamp": 1776062950.0, "window": {"title": "Bloc-notes", "app_name": "Notepad.exe"}, }, }, { "session_id": "sess_test_001", "timestamp": 1776062952.0, "event": { "type": "key_combo", "keys": ["ctrl", "s"], "timestamp": 1776062952.0, "window": {"title": "Bloc-notes", "app_name": "Notepad.exe"}, }, }, # Deuxieme session (meme pattern) { "session_id": "sess_test_002", "timestamp": 1776063000.0, "event": { "type": "window_focus_change", "from": None, "to": {"title": "Bureau", "app_name": "explorer.exe"}, "timestamp": 1776063000.0, "window": {"title": "Bureau", "app_name": "explorer.exe"}, }, }, { "session_id": "sess_test_002", "timestamp": 1776063002.0, "event": { "type": "mouse_click", "button": "left", "pos": [500, 300], "timestamp": 1776063002.0, "window": {"title": "Bloc-notes", "app_name": "Notepad.exe"}, }, }, { "session_id": "sess_test_002", "timestamp": 1776063005.0, "event": { "type": "text_input", "text": "Bonjour Claude", "timestamp": 1776063005.0, "window": {"title": "Bloc-notes", "app_name": "Notepad.exe"}, }, }, { "session_id": "sess_test_002", "timestamp": 1776063007.0, "event": { "type": "key_combo", "keys": ["ctrl", "s"], "timestamp": 1776063007.0, "window": {"title": "Bloc-notes", "app_name": "Notepad.exe"}, }, }, # Evenements de bruit (doivent etre filtres) { "session_id": "sess_test_001", "timestamp": 1776062947.0, "event": { "type": "heartbeat", "image": "shots/heartbeat.png", "timestamp": 1776062947.0, }, }, { "session_id": "sess_test_001", "timestamp": 1776062949.0, "event": { "type": "action_result", "base_shot_id": "shot_0001", "image": "", }, }, ] SAMPLE_WORKFLOW = { "workflow_id": "wf_test_001", "name": "Ouvrir Bloc-notes et saisir texte", "created_at": "2026-04-13T08:49:06+00:00", "entry_nodes": ["n1"], "end_nodes": ["n4"], "nodes": [ {"node_id": "n1", "name": "Bureau Windows", "description": "Bureau"}, {"node_id": "n2", "name": "Recherche Windows", "description": "Barre de recherche"}, {"node_id": "n3", "name": "Bloc-notes ouvert", "description": "Fenetre Notepad"}, {"node_id": "n4", "name": "Texte saisi", "description": "Texte ecrit dans Notepad"}, ], "edges": [ { "edge_id": "e1", "from_node": "n1", "to_node": "n2", "action": {"type": "mouse_click"}, "stats": {"execution_count": 5, "avg_duration": 1.5}, }, { "edge_id": "e2", "from_node": "n2", "to_node": "n3", "action": {"type": "text_input"}, "stats": {"execution_count": 5, "avg_duration": 3.0}, }, { "edge_id": "e3", "from_node": "n3", "to_node": "n4", "action": {"type": "text_input"}, "stats": {"execution_count": 5, "avg_duration": 5.0}, }, ], } @pytest.fixture def sample_events(): return SAMPLE_EVENTS @pytest.fixture def sample_workflow(): return SAMPLE_WORKFLOW @pytest.fixture def output_dir(): """Repertoire temporaire pour les sorties.""" d = tempfile.mkdtemp(prefix="pm_test_") yield d shutil.rmtree(d, ignore_errors=True) @pytest.fixture def sample_jsonl_file(tmp_path): """Cree un fichier JSONL temporaire avec les events de test.""" jsonl_file = tmp_path / "live_events.jsonl" with open(jsonl_file, "w", encoding="utf-8") as f: for event in SAMPLE_EVENTS: f.write(json.dumps(event, ensure_ascii=False) + "\n") return str(jsonl_file) # =========================================================================== # Tests unitaires : fonctions internes # =========================================================================== class TestBuildActivityLabel: """Tests de la construction des labels d'activite.""" def test_mouse_click(self): event = { "event": { "type": "mouse_click", "window": {"title": "Bloc-notes", "app_name": "Notepad.exe"}, } } label = _build_activity_label(event) assert label is not None assert "Clic" in label assert "Notepad.exe" in label assert "Bloc-notes" in label def test_text_input(self): event = { "event": { "type": "text_input", "text": "Bonjour", "window": {"title": "Bloc-notes", "app_name": "Notepad.exe"}, } } label = _build_activity_label(event) assert label is not None assert "Saisie" in label assert "Bonjour" in label def test_text_input_truncation(self): event = { "event": { "type": "text_input", "text": "A" * 50, "window": {"title": "X", "app_name": "X.exe"}, } } label = _build_activity_label(event) assert "..." in label def test_key_combo(self): event = { "event": { "type": "key_combo", "keys": ["ctrl", "s"], "window": {"title": "Bloc-notes", "app_name": "Notepad.exe"}, } } label = _build_activity_label(event) assert "Raccourci" in label assert "ctrl+s" in label def test_window_focus_change(self): event = { "event": { "type": "window_focus_change", "to": {"title": "Chrome", "app_name": "chrome.exe"}, "window": {"title": "Chrome", "app_name": "chrome.exe"}, } } label = _build_activity_label(event) assert "Fenetre" in label assert "Chrome" in label def test_heartbeat_filtered(self): event = { "event": { "type": "heartbeat", "image": "something.png", } } assert _build_activity_label(event) is None def test_action_result_filtered(self): event = { "event": { "type": "action_result", "base_shot_id": "shot_0001", } } assert _build_activity_label(event) is None class TestExtractTimestamp: """Tests de l'extraction de timestamp.""" def test_from_event_timestamp(self): event = {"event": {"timestamp": 1776062946.0}} assert _extract_timestamp(event) == 1776062946.0 def test_from_root_timestamp(self): event = {"timestamp": 1776062946.0} assert _extract_timestamp(event) == 1776062946.0 def test_from_t_field(self): event = {"t": 1712345678.123} assert _extract_timestamp(event) == pytest.approx(1712345678.123) def test_missing_timestamp(self): event = {"event": {"type": "unknown"}} assert _extract_timestamp(event) is None # =========================================================================== # Tests : conversion sessions -> event log # =========================================================================== class TestSessionsToEventLog: """Tests de la conversion sessions JSONL -> event log PM4Py.""" def test_basic_conversion(self, sample_events): df = sessions_to_event_log(sample_events) assert not df.empty assert "case:concept:name" in df.columns assert "concept:name" in df.columns assert "time:timestamp" in df.columns def test_correct_case_ids(self, sample_events): df = sessions_to_event_log(sample_events) case_ids = df["case:concept:name"].unique() assert "sess_test_001" in case_ids assert "sess_test_002" in case_ids def test_noise_filtered(self, sample_events): df = sessions_to_event_log(sample_events) # Les heartbeat et action_result ne doivent pas apparaitre event_types = df["event_type"].unique() assert "heartbeat" not in event_types assert "action_result" not in event_types def test_timestamps_ordered(self, sample_events): df = sessions_to_event_log(sample_events) for _case_id, group in df.groupby("case:concept:name"): timestamps = group["time:timestamp"].values for i in range(len(timestamps) - 1): assert timestamps[i] <= timestamps[i + 1] def test_window_deduplication(self): """Les window_focus_change consecutifs identiques sont dedupliques.""" events = [ { "session_id": "s1", "timestamp": 1.0, "event": { "type": "window_focus_change", "to": {"title": "A", "app_name": "a.exe"}, "timestamp": 1.0, "window": {"title": "A", "app_name": "a.exe"}, }, }, { "session_id": "s1", "timestamp": 2.0, "event": { "type": "window_focus_change", "to": {"title": "A", "app_name": "a.exe"}, "timestamp": 2.0, "window": {"title": "A", "app_name": "a.exe"}, }, }, { "session_id": "s1", "timestamp": 3.0, "event": { "type": "window_focus_change", "to": {"title": "B", "app_name": "b.exe"}, "timestamp": 3.0, "window": {"title": "B", "app_name": "b.exe"}, }, }, ] df = sessions_to_event_log(events, deduplicate_windows=True) # Seulement 2 lignes : A puis B (le 2eme A est un doublon) assert len(df) == 2 def test_empty_input(self): df = sessions_to_event_log([]) assert df.empty assert "case:concept:name" in df.columns def test_events_count(self, sample_events): df = sessions_to_event_log(sample_events) # 2 sessions x 4 events pertinents = 8 lignes assert len(df) == 8 # =========================================================================== # Tests : conversion workflow -> event log # =========================================================================== class TestWorkflowToEventLog: """Tests de la conversion workflow core -> event log PM4Py.""" def test_basic_conversion(self, sample_workflow): df = workflow_to_event_log(sample_workflow) assert not df.empty assert "case:concept:name" in df.columns assert "concept:name" in df.columns def test_path_traversal(self, sample_workflow): df = workflow_to_event_log(sample_workflow) # Le workflow n1->n2->n3->n4 est lineaire, 1 seul chemin assert df["case:concept:name"].nunique() == 1 # 4 nodes dans le chemin assert len(df) == 4 def test_node_names(self, sample_workflow): df = workflow_to_event_log(sample_workflow) activities = df["concept:name"].tolist() assert "Bureau Windows" in activities assert "Recherche Windows" in activities assert "Bloc-notes ouvert" in activities assert "Texte saisi" in activities def test_empty_workflow(self): df = workflow_to_event_log({"workflow_id": "empty", "nodes": [], "edges": []}) assert df.empty def test_branching_workflow(self): """Un workflow avec branches produit plusieurs chemins.""" wf = { "workflow_id": "wf_branch", "created_at": "2026-01-01T00:00:00+00:00", "entry_nodes": ["n1"], "end_nodes": ["n3", "n4"], "nodes": [ {"node_id": "n1", "name": "Start"}, {"node_id": "n2", "name": "Step A"}, {"node_id": "n3", "name": "End A"}, {"node_id": "n4", "name": "End B"}, ], "edges": [ {"edge_id": "e1", "from_node": "n1", "to_node": "n2"}, {"edge_id": "e2", "from_node": "n1", "to_node": "n4"}, {"edge_id": "e3", "from_node": "n2", "to_node": "n3"}, ], } df = workflow_to_event_log(wf) # 2 chemins : n1->n2->n3 et n1->n4 assert df["case:concept:name"].nunique() == 2 # =========================================================================== # Tests : decouverte BPMN # =========================================================================== @pytest.mark.skipif(not PM4PY_AVAILABLE, reason="pm4py non installe") class TestDiscoverBpmn: """Tests de la decouverte BPMN.""" def test_produces_files(self, sample_events, output_dir): df = sessions_to_event_log(sample_events) result = discover_bpmn(df, output_dir=output_dir, name="test") # Verifier que le BPMN XML existe assert result["bpmn_xml_path"] is not None assert Path(result["bpmn_xml_path"]).exists() assert Path(result["bpmn_xml_path"]).suffix == ".bpmn" # Verifier le contenu XML xml_content = Path(result["bpmn_xml_path"]).read_text() assert "bpmn" in xml_content.lower() or "definitions" in xml_content.lower() def test_produces_png(self, sample_events, output_dir): df = sessions_to_event_log(sample_events) result = discover_bpmn(df, output_dir=output_dir, name="test") if result["bpmn_image_path"]: assert Path(result["bpmn_image_path"]).exists() # Verifier que c'est un PNG (magic bytes) with open(result["bpmn_image_path"], "rb") as f: header = f.read(4) assert header[:4] == b"\x89PNG" def test_stats_populated(self, sample_events, output_dir): df = sessions_to_event_log(sample_events) result = discover_bpmn(df, output_dir=output_dir, name="test") stats = result["stats"] assert stats["activities"] > 0 assert stats["cases"] == 2 assert stats["variants"] >= 1 def test_empty_raises(self, output_dir): df = pd.DataFrame(columns=["case:concept:name", "concept:name", "time:timestamp"]) with pytest.raises(ValueError, match="vide"): discover_bpmn(df, output_dir=output_dir) def test_dfg_image_produced(self, sample_events, output_dir): df = sessions_to_event_log(sample_events) result = discover_bpmn(df, output_dir=output_dir, name="test") if result["dfg_image_path"]: assert Path(result["dfg_image_path"]).exists() # =========================================================================== # Tests : KPIs # =========================================================================== class TestComputeKpis: """Tests du calcul de KPIs.""" def test_returns_expected_keys(self, sample_events): df = sessions_to_event_log(sample_events) kpis = compute_kpis(df) expected_keys = { "total_cases", "total_events", "unique_activities", "variants_count", "variants_top5", "avg_case_duration_seconds", "median_case_duration_seconds", "avg_events_per_case", "activity_stats", "bottlenecks", "app_distribution", } assert expected_keys.issubset(set(kpis.keys())) def test_case_count(self, sample_events): df = sessions_to_event_log(sample_events) kpis = compute_kpis(df) assert kpis["total_cases"] == 2 def test_events_count(self, sample_events): df = sessions_to_event_log(sample_events) kpis = compute_kpis(df) assert kpis["total_events"] == 8 def test_activity_stats_populated(self, sample_events): df = sessions_to_event_log(sample_events) kpis = compute_kpis(df) assert len(kpis["activity_stats"]) > 0 # Chaque activite doit avoir les cles attendues for activity, stats in kpis["activity_stats"].items(): assert "count" in stats assert "avg_duration_seconds" in stats assert "min_duration_seconds" in stats assert "max_duration_seconds" in stats def test_bottlenecks_sorted(self, sample_events): df = sessions_to_event_log(sample_events) kpis = compute_kpis(df) bottlenecks = kpis["bottlenecks"] # Verifier l'ordre decroissant for i in range(len(bottlenecks) - 1): assert ( bottlenecks[i]["avg_duration_seconds"] >= bottlenecks[i + 1]["avg_duration_seconds"] ) def test_app_distribution(self, sample_events): df = sessions_to_event_log(sample_events) kpis = compute_kpis(df) assert "app_distribution" in kpis assert "Notepad.exe" in kpis["app_distribution"] def test_empty_kpis(self): df = pd.DataFrame(columns=["case:concept:name", "concept:name", "time:timestamp"]) kpis = compute_kpis(df) assert kpis["total_cases"] == 0 assert kpis["total_events"] == 0 def test_duration_positive(self, sample_events): df = sessions_to_event_log(sample_events) kpis = compute_kpis(df) assert kpis["avg_case_duration_seconds"] > 0 @pytest.mark.skipif(not PM4PY_AVAILABLE, reason="pm4py non installe") def test_variants_detected(self, sample_events): df = sessions_to_event_log(sample_events) kpis = compute_kpis(df) assert kpis["variants_count"] >= 1 assert len(kpis["variants_top5"]) >= 1 # =========================================================================== # Tests : chargement JSONL # =========================================================================== class TestLoadJsonlSession: """Tests du chargement de fichiers JSONL.""" def test_load_basic(self, sample_jsonl_file): events = load_jsonl_session(sample_jsonl_file) assert len(events) == len(SAMPLE_EVENTS) def test_load_nonexistent(self): with pytest.raises(FileNotFoundError): load_jsonl_session("/tmp/nonexistent_file.jsonl") def test_load_with_blank_lines(self, tmp_path): jsonl_file = tmp_path / "with_blanks.jsonl" with open(jsonl_file, "w") as f: f.write('{"session_id": "s1", "timestamp": 1.0, "event": {"type": "mouse_click", "timestamp": 1.0, "window": {"title": "X", "app_name": "x.exe"}}}\n') f.write("\n") f.write('{"session_id": "s1", "timestamp": 2.0, "event": {"type": "mouse_click", "timestamp": 2.0, "window": {"title": "X", "app_name": "x.exe"}}}\n') events = load_jsonl_session(str(jsonl_file)) assert len(events) == 2 def test_load_with_invalid_line(self, tmp_path): jsonl_file = tmp_path / "with_invalid.jsonl" with open(jsonl_file, "w") as f: f.write('{"valid": true}\n') f.write("this is not json\n") f.write('{"also_valid": true}\n') events = load_jsonl_session(str(jsonl_file)) assert len(events) == 2 # =========================================================================== # Test avec donnees reelles # =========================================================================== # Chercher une session reelle disponible _REAL_SESSION_DIRS = [ "/home/dom/ai/rpa_vision_v3/data/training/live_sessions/DESKTOP-ST3VBSD_windows/sess_20260413T084906_748092", "/home/dom/ai/rpa_vision_v3/data/training/live_sessions/sess_20260314T102557_dada53", ] _REAL_SESSION = None for d in _REAL_SESSION_DIRS: jsonl = Path(d) / "live_events.jsonl" if jsonl.exists(): _REAL_SESSION = str(jsonl) break @pytest.mark.slow @pytest.mark.skipif(_REAL_SESSION is None, reason="Pas de session reelle disponible") @pytest.mark.skipif(not PM4PY_AVAILABLE, reason="pm4py non installe") class TestWithRealSessionData: """Test complet avec une session reelle.""" def test_full_pipeline(self): """Charge -> Convertit -> BPMN -> KPIs sur donnees reelles.""" # 1. Charger events = load_jsonl_session(_REAL_SESSION) assert len(events) > 0, f"Session vide : {_REAL_SESSION}" # 2. Convertir en event log df = sessions_to_event_log(events) assert not df.empty assert df["case:concept:name"].nunique() >= 1 # 3. Decouvrir BPMN with tempfile.TemporaryDirectory(prefix="pm_real_") as tmpdir: result = discover_bpmn(df, output_dir=tmpdir, name="real_session") assert Path(result["bpmn_xml_path"]).exists() xml_content = Path(result["bpmn_xml_path"]).read_text() assert len(xml_content) > 100 # Verifier image si generee if result["bpmn_image_path"]: assert Path(result["bpmn_image_path"]).exists() # 4. Calculer KPIs kpis = compute_kpis(df) assert kpis["total_events"] > 0 assert kpis["unique_activities"] > 0 # 5. Afficher un resume (visible dans le stdout pytest -s) print("\n=== Process Mining - Session reelle ===") print(f"Fichier : {_REAL_SESSION}") print(f"Events bruts : {len(events)}") print(f"Events pertinents : {kpis['total_events']}") print(f"Activites uniques : {kpis['unique_activities']}") print(f"Variantes : {kpis['variants_count']}") print(f"Duree moyenne : {kpis['avg_case_duration_seconds']:.1f}s") print(f"Top variantes : {kpis['variants_top5'][:3]}") print(f"Goulots : {kpis['bottlenecks']}") print(f"Apps : {kpis['app_distribution']}")