Pipeline replay visuel : - VLM-first : l'agent appelle Ollama directement pour trouver les éléments - Template matching en fallback (seuil strict 0.90) - Stop immédiat si élément non trouvé (pas de clic blind) - Replay depuis session brute (/replay-session) sans attendre le VLM - Vérification post-action (screenshot hash avant/après) - Gestion des popups (Enter/Escape/Tab+Enter) Worker VLM séparé : - run_worker.py : process distinct du serveur HTTP - Communication par fichiers (_worker_queue.txt + _replay_active.lock) - Le serveur HTTP ne fait plus jamais de VLM → toujours réactif - Service systemd rpa-worker.service Capture clavier : - raw_keys (vk + press/release) pour replay exact indépendant du layout - Fix AZERTY : ToUnicodeEx + AltGr detection - Enter capturé comme \n, Tab comme \t - Filtrage modificateurs seuls (Ctrl/Alt/Shift parasites) - Fusion text_input consécutifs, dédup key_combo Sécurité & Internet : - HTTPS Let's Encrypt (lea.labs + vwb.labs.laurinebazin.design) - Token API fixe dans .env.local - HTTP Basic Auth sur VWB - Security headers (HSTS, CSP, nosniff) - CORS domaines publics, plus de wildcard Infrastructure : - DPI awareness (SetProcessDpiAwareness) Python + Rust - Métadonnées système (dpi_scale, window_bounds, monitors, os_theme) - Template matching multi-scale [0.5, 2.0] - Résolution dynamique (plus de hardcode 1920x1080) - VLM prefill fix (47x speedup, 3.5s au lieu de 180s) Modules : - core/auth/ : credential vault (Fernet AES), TOTP (RFC 6238), auth handler - core/federation/ : LearningPack export/import anonymisé, FAISS global - deploy/ : package Léa (config.txt, Lea.bat, install.bat, LISEZMOI.txt) UX : - Filtrage OS (VWB + Chat montrent que les workflows de l'OS courant) - Bibliothèque persistante (cache local + SQLite) - Clustering hybride (titre fenêtre + DBSCAN) - EdgeConstraints + PostConditions peuplés - GraphBuilder compound actions (toutes les frappes) Agent Rust : - Token Bearer auth (network.rs) - sysinfo.rs (DPI, résolution, window bounds via Win32 API) - config.txt lu automatiquement - Support Chrome/Brave/Firefox (pas que Edge) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
722 lines
26 KiB
Python
722 lines
26 KiB
Python
"""
|
||
Tests unitaires pour core.federation.learning_pack
|
||
|
||
Vérifie :
|
||
- Export d'un workflow simple → pas de screenshots/OCR dans le pack
|
||
- Merge de 2 packs → déduplication correcte des prototypes
|
||
- Sérialisation / désérialisation JSON round-trip
|
||
- Anonymisation du client_id (SHA-256, pas en clair)
|
||
- Filtrage des données sensibles (textes OCR longs, métadonnées)
|
||
- Index FAISS global (construction, recherche, persistance)
|
||
"""
|
||
|
||
import hashlib
|
||
import json
|
||
import tempfile
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import List
|
||
|
||
import numpy as np
|
||
import pytest
|
||
|
||
from core.federation.learning_pack import (
|
||
DEDUP_COSINE_THRESHOLD,
|
||
LEARNING_PACK_VERSION,
|
||
AppSignature,
|
||
EdgeStatistic,
|
||
ErrorPattern,
|
||
LearningPack,
|
||
LearningPackExporter,
|
||
LearningPackMerger,
|
||
ScreenPrototype,
|
||
UIPattern,
|
||
WorkflowSkeleton,
|
||
_hash_client_id,
|
||
_sanitize_text,
|
||
)
|
||
from core.models.workflow_graph import (
|
||
Action,
|
||
EdgeConstraints,
|
||
EdgeStats,
|
||
EmbeddingPrototype,
|
||
PostConditionCheck,
|
||
PostConditions,
|
||
ScreenTemplate,
|
||
TargetSpec,
|
||
TextConstraint,
|
||
UIConstraint,
|
||
WindowConstraint,
|
||
Workflow,
|
||
WorkflowEdge,
|
||
WorkflowNode,
|
||
)
|
||
|
||
|
||
# ============================================================================
|
||
# Helpers — construction de workflows de test
|
||
# ============================================================================
|
||
|
||
def _make_node(
|
||
node_id: str,
|
||
name: str,
|
||
process_name: str = "Notepad.exe",
|
||
title_pattern: str = ".*Sans titre.*",
|
||
required_roles: List[str] = None,
|
||
prototype_vector: List[float] = None,
|
||
) -> WorkflowNode:
|
||
"""Créer un WorkflowNode minimal pour les tests."""
|
||
window = WindowConstraint(
|
||
title_pattern=title_pattern,
|
||
process_name=process_name,
|
||
)
|
||
text = TextConstraint(
|
||
required_texts=["Fichier", "Edition"],
|
||
forbidden_texts=["Erreur critique"],
|
||
)
|
||
ui = UIConstraint(
|
||
required_roles=required_roles or ["button", "textfield"],
|
||
)
|
||
embedding = EmbeddingPrototype(
|
||
provider="openclip_ViT-B-32",
|
||
vector_id="",
|
||
min_cosine_similarity=0.85,
|
||
sample_count=5,
|
||
)
|
||
template = ScreenTemplate(window=window, text=text, ui=ui, embedding=embedding)
|
||
|
||
metadata = {}
|
||
if prototype_vector is not None:
|
||
metadata["_prototype_vector"] = prototype_vector
|
||
|
||
return WorkflowNode(
|
||
node_id=node_id,
|
||
name=name,
|
||
description=f"Node de test : {name}",
|
||
template=template,
|
||
metadata=metadata,
|
||
)
|
||
|
||
|
||
def _make_edge(
|
||
edge_id: str,
|
||
from_node: str,
|
||
to_node: str,
|
||
action_type: str = "mouse_click",
|
||
target_role: str = "button",
|
||
fail_fast_texts: List[str] = None,
|
||
) -> WorkflowEdge:
|
||
"""Créer un WorkflowEdge minimal pour les tests."""
|
||
target = TargetSpec(by_role=target_role)
|
||
action = Action(type=action_type, target=target)
|
||
constraints = EdgeConstraints()
|
||
|
||
fail_fast = []
|
||
for txt in (fail_fast_texts or []):
|
||
fail_fast.append(PostConditionCheck(kind="text_present", value=txt))
|
||
|
||
post_conditions = PostConditions(fail_fast=fail_fast)
|
||
stats = EdgeStats(execution_count=10, success_count=9, avg_execution_time_ms=150.0)
|
||
|
||
return WorkflowEdge(
|
||
edge_id=edge_id,
|
||
from_node=from_node,
|
||
to_node=to_node,
|
||
action=action,
|
||
constraints=constraints,
|
||
post_conditions=post_conditions,
|
||
stats=stats,
|
||
)
|
||
|
||
|
||
def _make_workflow(
|
||
workflow_id: str = "wf_test_001",
|
||
name: str = "Workflow Test",
|
||
with_vectors: bool = True,
|
||
) -> Workflow:
|
||
"""Créer un Workflow complet minimal pour les tests."""
|
||
vec_a = np.random.randn(512).tolist() if with_vectors else None
|
||
vec_b = np.random.randn(512).tolist() if with_vectors else None
|
||
|
||
node_a = _make_node("node_a", "Écran principal", prototype_vector=vec_a)
|
||
node_b = _make_node(
|
||
"node_b", "Dialogue Enregistrer",
|
||
process_name="Notepad.exe",
|
||
title_pattern=".*Enregistrer.*",
|
||
prototype_vector=vec_b,
|
||
)
|
||
|
||
edge_ab = _make_edge(
|
||
"edge_ab", "node_a", "node_b",
|
||
fail_fast_texts=["Accès refusé", "Fichier introuvable"],
|
||
)
|
||
|
||
now = datetime.now()
|
||
return Workflow(
|
||
workflow_id=workflow_id,
|
||
name=name,
|
||
description="Workflow de test pour Learning Pack",
|
||
version=1,
|
||
learning_state="COACHING",
|
||
created_at=now,
|
||
updated_at=now,
|
||
entry_nodes=["node_a"],
|
||
end_nodes=["node_b"],
|
||
nodes=[node_a, node_b],
|
||
edges=[edge_ab],
|
||
safety_rules=Workflow.from_dict({
|
||
"workflow_id": "tmp", "name": "tmp", "nodes": [], "edges": [],
|
||
"safety_rules": {}, "stats": {}, "learning": {},
|
||
"entry_nodes": [], "end_nodes": [], "created_at": now.isoformat(),
|
||
"updated_at": now.isoformat(),
|
||
}).safety_rules,
|
||
stats=Workflow.from_dict({
|
||
"workflow_id": "tmp", "name": "tmp", "nodes": [], "edges": [],
|
||
"safety_rules": {}, "stats": {}, "learning": {},
|
||
"entry_nodes": [], "end_nodes": [], "created_at": now.isoformat(),
|
||
"updated_at": now.isoformat(),
|
||
}).stats,
|
||
learning=Workflow.from_dict({
|
||
"workflow_id": "tmp", "name": "tmp", "nodes": [], "edges": [],
|
||
"safety_rules": {}, "stats": {}, "learning": {},
|
||
"entry_nodes": [], "end_nodes": [], "created_at": now.isoformat(),
|
||
"updated_at": now.isoformat(),
|
||
}).learning,
|
||
)
|
||
|
||
|
||
# ============================================================================
|
||
# Tests — Anonymisation
|
||
# ============================================================================
|
||
|
||
class TestAnonymisation:
|
||
"""Vérifier que l'anonymisation fonctionne correctement."""
|
||
|
||
def test_client_id_est_hashe(self):
|
||
"""Le client_id ne doit PAS apparaître en clair dans le pack."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="CHU-Lyon-001")
|
||
|
||
pack_json = json.dumps(pack.to_dict())
|
||
assert "CHU-Lyon-001" not in pack_json, \
|
||
"Le client_id apparaît en clair dans le pack !"
|
||
|
||
def test_source_hash_est_sha256(self):
|
||
"""Le source_hash doit être un hash SHA-256 du client_id."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="CHU-Lyon-001")
|
||
|
||
expected_hash = hashlib.sha256(b"CHU-Lyon-001").hexdigest()
|
||
assert pack.source_hash == expected_hash
|
||
|
||
def test_hash_client_id_deterministe(self):
|
||
"""Le même client_id doit toujours donner le même hash."""
|
||
h1 = _hash_client_id("Clinique-Pasteur")
|
||
h2 = _hash_client_id("Clinique-Pasteur")
|
||
assert h1 == h2
|
||
|
||
def test_hash_client_id_differents(self):
|
||
"""Deux client_id différents doivent donner des hash différents."""
|
||
h1 = _hash_client_id("CHU-Lyon")
|
||
h2 = _hash_client_id("CHU-Marseille")
|
||
assert h1 != h2
|
||
|
||
def test_pas_de_screenshots_dans_pack(self):
|
||
"""Le pack ne doit contenir aucun chemin de screenshot."""
|
||
wf = _make_workflow()
|
||
# Ajouter un chemin screenshot dans les métadonnées du node
|
||
wf.nodes[0].metadata["screenshot_path"] = "/tmp/capture_001.png"
|
||
wf.nodes[0].metadata["ocr_text"] = "Texte OCR brut avec données patient"
|
||
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test")
|
||
|
||
pack_json = json.dumps(pack.to_dict())
|
||
assert "/tmp/capture_001.png" not in pack_json
|
||
assert "données patient" not in pack_json
|
||
|
||
def test_texte_ocr_long_filtre(self):
|
||
"""Les textes OCR longs (> 120 chars) doivent être filtrés."""
|
||
assert _sanitize_text("OK") == "OK"
|
||
assert _sanitize_text("x" * 200) is None
|
||
assert _sanitize_text("") is None
|
||
|
||
def test_texte_patient_filtre(self):
|
||
"""Les textes contenant des identifiants patient doivent être filtrés."""
|
||
assert _sanitize_text("patient Dupont") is None
|
||
assert _sanitize_text("NIP: 123456") is None
|
||
assert _sanitize_text("Dossier n°789") is None
|
||
|
||
def test_texte_court_et_sur_passe(self):
|
||
"""Les textes courts et non-sensibles doivent passer."""
|
||
assert _sanitize_text("Enregistrer") == "Enregistrer"
|
||
assert _sanitize_text("Fichier") == "Fichier"
|
||
assert _sanitize_text("Erreur de connexion") == "Erreur de connexion"
|
||
|
||
|
||
# ============================================================================
|
||
# Tests — Export
|
||
# ============================================================================
|
||
|
||
class TestExport:
|
||
"""Vérifier l'export de workflows en Learning Pack."""
|
||
|
||
def test_export_basique(self):
|
||
"""Export d'un workflow simple doit produire un pack valide."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test_client")
|
||
|
||
assert pack.version == LEARNING_PACK_VERSION
|
||
assert pack.pack_id.startswith("lp_")
|
||
assert pack.source_hash # Non vide
|
||
assert pack.created_at # Non vide
|
||
|
||
def test_export_stats(self):
|
||
"""Les stats du pack doivent refléter le contenu."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test")
|
||
|
||
assert pack.stats["workflows_count"] == 1
|
||
assert pack.stats["total_nodes"] == 2
|
||
assert pack.stats["total_edges"] == 1
|
||
assert "Notepad.exe" in pack.stats["apps_seen"]
|
||
|
||
def test_export_prototypes_avec_vecteurs(self):
|
||
"""Les prototypes doivent contenir les vecteurs 512d."""
|
||
wf = _make_workflow(with_vectors=True)
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test")
|
||
|
||
assert len(pack.screen_prototypes) == 2
|
||
for proto in pack.screen_prototypes:
|
||
assert proto.vector is not None
|
||
assert len(proto.vector) == 512
|
||
|
||
def test_export_prototypes_sans_vecteurs(self):
|
||
"""L'export doit fonctionner même sans vecteurs prototype."""
|
||
wf = _make_workflow(with_vectors=False)
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test")
|
||
|
||
# Les prototypes sont exportés mais sans vecteur
|
||
assert len(pack.screen_prototypes) == 2
|
||
for proto in pack.screen_prototypes:
|
||
assert proto.vector is None
|
||
|
||
def test_export_app_signatures(self):
|
||
"""Les signatures d'application doivent être collectées."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test")
|
||
|
||
app_names = [sig.app_name for sig in pack.app_signatures]
|
||
assert "Notepad.exe" in app_names
|
||
|
||
def test_export_error_patterns(self):
|
||
"""Les patterns d'erreur des PostConditions doivent être extraits."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test")
|
||
|
||
error_texts = [ep.error_text for ep in pack.error_patterns]
|
||
assert "Accès refusé" in error_texts
|
||
assert "Fichier introuvable" in error_texts
|
||
|
||
def test_export_edge_statistics(self):
|
||
"""Les statistiques d'edges doivent être exportées."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test")
|
||
|
||
assert len(pack.edge_statistics) == 1
|
||
stat = pack.edge_statistics[0]
|
||
assert stat.action_type == "mouse_click"
|
||
assert stat.execution_count == 10
|
||
assert stat.success_rate == 0.9
|
||
|
||
def test_export_workflow_skeleton(self):
|
||
"""Le squelette du workflow doit refléter la structure."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test")
|
||
|
||
assert len(pack.workflow_skeletons) == 1
|
||
skel = pack.workflow_skeletons[0]
|
||
assert skel.node_count == 2
|
||
assert skel.edge_count == 1
|
||
assert "Écran principal" in skel.node_names
|
||
assert skel.learning_state == "COACHING"
|
||
|
||
def test_export_action_sans_texte_saisi(self):
|
||
"""L'export ne doit PAS inclure le texte saisi (action text_input)."""
|
||
wf = _make_workflow()
|
||
# Ajouter un edge text_input avec un texte sensible
|
||
edge_text = _make_edge(
|
||
"edge_text", "node_a", "node_b",
|
||
action_type="text_input", target_role="textfield",
|
||
)
|
||
edge_text.action.parameters["text"] = "mot_de_passe_secret_123"
|
||
wf.edges.append(edge_text)
|
||
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="test")
|
||
|
||
pack_json = json.dumps(pack.to_dict())
|
||
assert "mot_de_passe_secret_123" not in pack_json
|
||
|
||
|
||
# ============================================================================
|
||
# Tests — Sérialisation
|
||
# ============================================================================
|
||
|
||
class TestSerialisation:
|
||
"""Vérifier le round-trip JSON (to_dict → from_dict)."""
|
||
|
||
def test_round_trip_learning_pack(self):
|
||
"""Sérialisation → désérialisation doit être idempotente."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="round_trip_test")
|
||
|
||
# Sérialiser → désérialiser
|
||
data = pack.to_dict()
|
||
restored = LearningPack.from_dict(data)
|
||
|
||
assert restored.version == pack.version
|
||
assert restored.source_hash == pack.source_hash
|
||
assert restored.pack_id == pack.pack_id
|
||
assert len(restored.screen_prototypes) == len(pack.screen_prototypes)
|
||
assert len(restored.workflow_skeletons) == len(pack.workflow_skeletons)
|
||
assert len(restored.error_patterns) == len(pack.error_patterns)
|
||
assert len(restored.edge_statistics) == len(pack.edge_statistics)
|
||
|
||
def test_round_trip_json_string(self):
|
||
"""Le JSON doit être parseable et reproductible."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="json_test")
|
||
|
||
json_str = json.dumps(pack.to_dict(), sort_keys=True)
|
||
data = json.loads(json_str)
|
||
restored = LearningPack.from_dict(data)
|
||
|
||
assert json.dumps(restored.to_dict(), sort_keys=True) == json_str
|
||
|
||
def test_save_load_fichier(self, tmp_path):
|
||
"""Sauvegarde → chargement fichier doit être idempotent."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="file_test")
|
||
|
||
filepath = tmp_path / "test_pack.json"
|
||
pack.save(filepath)
|
||
|
||
loaded = LearningPack.load(filepath)
|
||
assert loaded.pack_id == pack.pack_id
|
||
assert loaded.source_hash == pack.source_hash
|
||
assert len(loaded.screen_prototypes) == len(pack.screen_prototypes)
|
||
|
||
def test_all_sub_dataclasses_round_trip(self):
|
||
"""Chaque sous-structure doit supporter le round-trip."""
|
||
sig = AppSignature(app_name="Chrome.exe", version="120.0", observation_count=5)
|
||
assert AppSignature.from_dict(sig.to_dict()).app_name == "Chrome.exe"
|
||
|
||
proto = ScreenPrototype(
|
||
prototype_id="test",
|
||
vector=[1.0, 2.0, 3.0],
|
||
provider="test_provider",
|
||
)
|
||
restored = ScreenPrototype.from_dict(proto.to_dict())
|
||
assert restored.vector == [1.0, 2.0, 3.0]
|
||
|
||
skel = WorkflowSkeleton(
|
||
skeleton_id="sk1", name="Test", description="",
|
||
learning_state="OBSERVATION", node_names=["A", "B"],
|
||
edge_summaries=[], entry_nodes=["A"], end_nodes=["B"],
|
||
)
|
||
assert WorkflowSkeleton.from_dict(skel.to_dict()).name == "Test"
|
||
|
||
err = ErrorPattern(pattern_id="e1", error_text="Timeout")
|
||
assert ErrorPattern.from_dict(err.to_dict()).error_text == "Timeout"
|
||
|
||
|
||
# ============================================================================
|
||
# Tests — Merge
|
||
# ============================================================================
|
||
|
||
class TestMerge:
|
||
"""Vérifier la fusion de plusieurs Learning Packs."""
|
||
|
||
def test_merge_deux_packs(self):
|
||
"""Fusionner 2 packs doit produire un pack combiné."""
|
||
wf1 = _make_workflow("wf_1", "Workflow A")
|
||
wf2 = _make_workflow("wf_2", "Workflow B")
|
||
|
||
exporter = LearningPackExporter()
|
||
pack_a = exporter.export([wf1], client_id="Client-A")
|
||
pack_b = exporter.export([wf2], client_id="Client-B")
|
||
|
||
merger = LearningPackMerger()
|
||
merged = merger.merge([pack_a, pack_b])
|
||
|
||
assert merged.stats["workflows_count"] == 2
|
||
assert merged.stats["source_packs_count"] == 2
|
||
assert merged.pack_id.startswith("lp_merged_")
|
||
|
||
def test_merge_deduplication_prototypes_identiques(self):
|
||
"""Deux prototypes avec le même vecteur doivent être fusionnés."""
|
||
# Créer un vecteur fixe pour les deux packs
|
||
fixed_vec = np.random.randn(512).tolist()
|
||
|
||
wf1 = _make_workflow("wf_same_1")
|
||
wf1.nodes[0].metadata["_prototype_vector"] = fixed_vec
|
||
wf2 = _make_workflow("wf_same_2")
|
||
wf2.nodes[0].metadata["_prototype_vector"] = fixed_vec
|
||
|
||
exporter = LearningPackExporter()
|
||
pack_a = exporter.export([wf1], client_id="A")
|
||
pack_b = exporter.export([wf2], client_id="B")
|
||
|
||
# Avant merge : 2 prototypes avec le même vecteur pour node_a
|
||
total_before = len(pack_a.screen_prototypes) + len(pack_b.screen_prototypes)
|
||
assert total_before == 4 # 2 nodes × 2 packs
|
||
|
||
merger = LearningPackMerger()
|
||
merged = merger.merge([pack_a, pack_b])
|
||
|
||
# Après merge : les prototypes identiques (node_a) doivent être dédupliqués
|
||
# node_b a des vecteurs différents (random), donc pas de dédup
|
||
# node_a est identique → fusionné en 1
|
||
# Résultat attendu : entre 2 et 3 prototypes (1 dédupliqué + 2 différents)
|
||
assert len(merged.screen_prototypes) < total_before
|
||
|
||
def test_merge_prototypes_differents_conserves(self):
|
||
"""Deux prototypes très différents ne doivent PAS être fusionnés."""
|
||
# Créer deux vecteurs orthogonaux
|
||
vec_a = np.zeros(512, dtype=np.float32)
|
||
vec_a[0] = 1.0
|
||
vec_b = np.zeros(512, dtype=np.float32)
|
||
vec_b[1] = 1.0
|
||
|
||
wf1 = _make_workflow("wf_diff_1")
|
||
wf1.nodes[0].metadata["_prototype_vector"] = vec_a.tolist()
|
||
# Supprimer node_b pour simplifier
|
||
wf1.nodes = [wf1.nodes[0]]
|
||
wf1.edges = []
|
||
|
||
wf2 = _make_workflow("wf_diff_2")
|
||
wf2.nodes[0].metadata["_prototype_vector"] = vec_b.tolist()
|
||
wf2.nodes = [wf2.nodes[0]]
|
||
wf2.edges = []
|
||
|
||
exporter = LearningPackExporter()
|
||
pack_a = exporter.export([wf1], client_id="A")
|
||
pack_b = exporter.export([wf2], client_id="B")
|
||
|
||
merger = LearningPackMerger()
|
||
merged = merger.merge([pack_a, pack_b])
|
||
|
||
# Les deux prototypes sont très différents → pas de dédup
|
||
assert len(merged.screen_prototypes) == 2
|
||
|
||
def test_merge_error_patterns_cross_clients(self):
|
||
"""Les patterns d'erreur vus par plusieurs clients ont un cross_client_count > 1."""
|
||
# Même erreur dans les deux packs
|
||
wf1 = _make_workflow("wf_err_1")
|
||
wf2 = _make_workflow("wf_err_2")
|
||
|
||
exporter = LearningPackExporter()
|
||
pack_a = exporter.export([wf1], client_id="Hôpital-A")
|
||
pack_b = exporter.export([wf2], client_id="Hôpital-B")
|
||
|
||
merger = LearningPackMerger()
|
||
merged = merger.merge([pack_a, pack_b])
|
||
|
||
# "Accès refusé" et "Fichier introuvable" sont dans les deux packs
|
||
for ep in merged.error_patterns:
|
||
if ep.error_text == "Accès refusé":
|
||
assert ep.cross_client_count == 2
|
||
assert ep.observation_count == 2 # 1 par pack
|
||
break
|
||
else:
|
||
pytest.fail("Pattern 'Accès refusé' non trouvé dans le merge")
|
||
|
||
def test_merge_app_signatures_union(self):
|
||
"""Les signatures d'application doivent être l'union des packs."""
|
||
wf1 = _make_workflow("wf_app_1")
|
||
wf2 = _make_workflow("wf_app_2")
|
||
# Changer l'app du deuxième workflow
|
||
wf2.nodes[0].template.window.process_name = "Chrome.exe"
|
||
|
||
exporter = LearningPackExporter()
|
||
pack_a = exporter.export([wf1], client_id="A")
|
||
pack_b = exporter.export([wf2], client_id="B")
|
||
|
||
merger = LearningPackMerger()
|
||
merged = merger.merge([pack_a, pack_b])
|
||
|
||
app_names = {sig.app_name for sig in merged.app_signatures}
|
||
assert "Notepad.exe" in app_names
|
||
assert "Chrome.exe" in app_names
|
||
|
||
def test_merge_liste_vide(self):
|
||
"""Merger une liste vide retourne un pack vide."""
|
||
merger = LearningPackMerger()
|
||
merged = merger.merge([])
|
||
assert merged.pack_id.startswith("lp_merged_")
|
||
assert len(merged.screen_prototypes) == 0
|
||
|
||
def test_merge_un_seul_pack(self):
|
||
"""Merger un seul pack le retourne avec un nouveau pack_id."""
|
||
wf = _make_workflow()
|
||
exporter = LearningPackExporter()
|
||
pack = exporter.export([wf], client_id="solo")
|
||
|
||
merger = LearningPackMerger()
|
||
merged = merger.merge([pack])
|
||
|
||
assert merged.pack_id != pack.pack_id
|
||
assert merged.pack_id.startswith("lp_merged_")
|
||
assert len(merged.screen_prototypes) == len(pack.screen_prototypes)
|
||
|
||
def test_merge_edge_statistics_moyennes(self):
|
||
"""Les statistiques d'edges doivent être combinées par moyenne pondérée."""
|
||
wf1 = _make_workflow("wf_stat_1")
|
||
wf2 = _make_workflow("wf_stat_2")
|
||
|
||
exporter = LearningPackExporter()
|
||
pack_a = exporter.export([wf1], client_id="A")
|
||
pack_b = exporter.export([wf2], client_id="B")
|
||
|
||
merger = LearningPackMerger()
|
||
merged = merger.merge([pack_a, pack_b])
|
||
|
||
# Les edges ont les mêmes noms de nodes → ils sont mergés
|
||
for stat in merged.edge_statistics:
|
||
if stat.from_node_name == "Écran principal":
|
||
# 10 exécutions par pack → 20 au total
|
||
assert stat.execution_count == 20
|
||
# success_rate = 0.9 pour les deux → moyenne = 0.9
|
||
assert abs(stat.success_rate - 0.9) < 0.01
|
||
break
|
||
|
||
|
||
# ============================================================================
|
||
# Tests — Index FAISS Global
|
||
# ============================================================================
|
||
|
||
class TestGlobalFAISSIndex:
|
||
"""Tests de l'index FAISS global (nécessite faiss-cpu)."""
|
||
|
||
@pytest.fixture
|
||
def sample_packs(self):
|
||
"""Créer deux packs de test avec des vecteurs."""
|
||
wf1 = _make_workflow("wf_faiss_1", "Workflow FAISS A")
|
||
wf2 = _make_workflow("wf_faiss_2", "Workflow FAISS B")
|
||
|
||
exporter = LearningPackExporter()
|
||
pack_a = exporter.export([wf1], client_id="Client-FAISS-A")
|
||
pack_b = exporter.export([wf2], client_id="Client-FAISS-B")
|
||
return [pack_a, pack_b]
|
||
|
||
def test_build_from_packs(self, sample_packs):
|
||
"""Construction de l'index depuis les packs."""
|
||
try:
|
||
from core.federation.faiss_global import GlobalFAISSIndex
|
||
except ImportError:
|
||
pytest.skip("FAISS non installé")
|
||
|
||
index = GlobalFAISSIndex(dimensions=512)
|
||
count = index.build_from_packs(sample_packs)
|
||
|
||
assert count > 0
|
||
assert index.total_vectors == count
|
||
|
||
def test_search(self, sample_packs):
|
||
"""Recherche dans l'index global."""
|
||
try:
|
||
from core.federation.faiss_global import GlobalFAISSIndex
|
||
except ImportError:
|
||
pytest.skip("FAISS non installé")
|
||
|
||
index = GlobalFAISSIndex(dimensions=512)
|
||
index.build_from_packs(sample_packs)
|
||
|
||
# Chercher avec un vecteur aléatoire
|
||
query = np.random.randn(512).astype(np.float32)
|
||
results = index.search(query, k=3)
|
||
|
||
assert len(results) > 0
|
||
assert len(results) <= 3
|
||
for r in results:
|
||
assert r.prototype_id
|
||
assert r.pack_source_hash
|
||
assert -1.0 <= r.similarity <= 1.0
|
||
|
||
def test_search_index_vide(self):
|
||
"""Recherche dans un index vide retourne une liste vide."""
|
||
try:
|
||
from core.federation.faiss_global import GlobalFAISSIndex
|
||
except ImportError:
|
||
pytest.skip("FAISS non installé")
|
||
|
||
index = GlobalFAISSIndex(dimensions=512)
|
||
results = index.search(np.random.randn(512).astype(np.float32))
|
||
assert results == []
|
||
|
||
def test_add_pack_incremental(self, sample_packs):
|
||
"""Ajout incrémental d'un pack à l'index."""
|
||
try:
|
||
from core.federation.faiss_global import GlobalFAISSIndex
|
||
except ImportError:
|
||
pytest.skip("FAISS non installé")
|
||
|
||
index = GlobalFAISSIndex(dimensions=512)
|
||
count1 = index.add_pack(sample_packs[0])
|
||
count2 = index.add_pack(sample_packs[1])
|
||
|
||
assert count1 > 0
|
||
assert count2 > 0
|
||
assert index.total_vectors == count1 + count2
|
||
|
||
def test_save_load(self, sample_packs, tmp_path):
|
||
"""Sauvegarde et chargement de l'index."""
|
||
try:
|
||
from core.federation.faiss_global import GlobalFAISSIndex
|
||
except ImportError:
|
||
pytest.skip("FAISS non installé")
|
||
|
||
index = GlobalFAISSIndex(dimensions=512)
|
||
index.build_from_packs(sample_packs)
|
||
|
||
base_path = tmp_path / "global_index"
|
||
index.save(base_path)
|
||
|
||
loaded = GlobalFAISSIndex.load(base_path)
|
||
assert loaded.total_vectors == index.total_vectors
|
||
assert loaded.dimensions == index.dimensions
|
||
|
||
# Vérifier que la recherche fonctionne sur l'index chargé
|
||
query = np.random.randn(512).astype(np.float32)
|
||
results = loaded.search(query, k=2)
|
||
assert len(results) > 0
|
||
|
||
def test_get_stats(self, sample_packs):
|
||
"""Statistiques de l'index global."""
|
||
try:
|
||
from core.federation.faiss_global import GlobalFAISSIndex
|
||
except ImportError:
|
||
pytest.skip("FAISS non installé")
|
||
|
||
index = GlobalFAISSIndex(dimensions=512)
|
||
index.build_from_packs(sample_packs)
|
||
|
||
stats = index.get_stats()
|
||
assert stats["dimensions"] == 512
|
||
assert stats["total_vectors"] > 0
|
||
assert stats["unique_sources"] >= 1
|