Refonte majeure du système Agent Chat et ajout de nombreux modules : - Chat unifié : suppression du dual Workflows/Agent Libre, tout passe par /api/chat avec résolution en 3 niveaux (workflow → geste → "montre-moi") - GestureCatalog : 38 raccourcis clavier universels Windows avec matching sémantique, substitution automatique dans les replays, et endpoint /api/gestures - Mode Copilot : exécution pas-à-pas des workflows avec validation humaine via WebSocket (approve/skip/abort) avant chaque action - Léa UI (agent_v0/lea_ui/) : interface PyQt5 pour Windows avec overlay transparent pour feedback visuel pendant le replay - Data Extraction (core/extraction/) : moteur d'extraction visuelle de données (OCR + VLM → SQLite), avec schémas YAML et export CSV/Excel - ReplayVerifier (agent_v0/server_v1/) : vérification post-action par comparaison de screenshots, avec logique de retry (max 3) - IntentParser durci : meilleur fallback regex, type GREETING, patterns améliorés - Dashboard : nouvelles pages gestures, streaming, extractions - Tests : 63 tests GestureCatalog, 47 tests extraction, corrections tests existants - Dépréciation : /api/agent/plan et /api/agent/execute retournent HTTP 410, suppression du code hardcodé _plan_to_replay_actions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
544 lines
20 KiB
Python
544 lines
20 KiB
Python
"""
|
|
Tests unitaires pour le moteur d'extraction de donnees.
|
|
|
|
Couvre : ExtractionSchema, ExtractionField, DataStore, FieldExtractor,
|
|
IterationController, ExtractionEngine.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
import yaml
|
|
|
|
from core.extraction import (
|
|
DataStore,
|
|
ExtractionEngine,
|
|
ExtractionField,
|
|
ExtractionSchema,
|
|
FieldExtractor,
|
|
IterationController,
|
|
)
|
|
|
|
|
|
# ======================================================================
|
|
# Fixtures
|
|
# ======================================================================
|
|
|
|
@pytest.fixture
|
|
def sample_schema():
|
|
"""Schema d'extraction minimal pour les tests."""
|
|
return ExtractionSchema(
|
|
name="test_patient",
|
|
description="Schema de test",
|
|
fields=[
|
|
ExtractionField(name="nom", description="Nom du patient", field_type="text", required=True),
|
|
ExtractionField(name="prenom", description="Prenom", field_type="text", required=True),
|
|
ExtractionField(
|
|
name="date_naissance",
|
|
description="Date de naissance",
|
|
field_type="date",
|
|
required=True,
|
|
validation_regex=r"\d{2}/\d{2}/\d{4}",
|
|
),
|
|
ExtractionField(name="ipp", description="IPP", field_type="text", required=True),
|
|
ExtractionField(name="age", description="Age", field_type="number", required=False),
|
|
],
|
|
navigation={"type": "manual", "max_records": 5, "delay_ms": 0},
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_db(tmp_path):
|
|
"""Base SQLite temporaire."""
|
|
return str(tmp_path / "test_store.db")
|
|
|
|
|
|
@pytest.fixture
|
|
def data_store(tmp_db):
|
|
"""DataStore avec base temporaire."""
|
|
return DataStore(db_path=tmp_db)
|
|
|
|
|
|
@pytest.fixture
|
|
def yaml_path(tmp_path, sample_schema):
|
|
"""Fichier YAML temporaire pour un schema."""
|
|
path = str(tmp_path / "test_schema.yaml")
|
|
sample_schema.to_yaml(path)
|
|
return path
|
|
|
|
|
|
# ======================================================================
|
|
# ExtractionField
|
|
# ======================================================================
|
|
|
|
class TestExtractionField:
|
|
|
|
def test_validate_required_present(self):
|
|
f = ExtractionField(name="nom", description="Nom", field_type="text", required=True)
|
|
assert f.validate_value("DUPONT") is True
|
|
|
|
def test_validate_required_missing(self):
|
|
f = ExtractionField(name="nom", description="Nom", field_type="text", required=True)
|
|
assert f.validate_value(None) is False
|
|
assert f.validate_value("") is False
|
|
|
|
def test_validate_optional_missing(self):
|
|
f = ExtractionField(name="note", description="Note", field_type="text", required=False)
|
|
assert f.validate_value(None) is True
|
|
assert f.validate_value("") is True
|
|
|
|
def test_validate_number(self):
|
|
f = ExtractionField(name="age", description="Age", field_type="number")
|
|
assert f.validate_value("42") is True
|
|
assert f.validate_value("3,14") is True # FR format
|
|
assert f.validate_value("abc") is False
|
|
|
|
def test_validate_boolean(self):
|
|
f = ExtractionField(name="actif", description="Actif", field_type="boolean")
|
|
assert f.validate_value("oui") is True
|
|
assert f.validate_value("true") is True
|
|
assert f.validate_value("faux") is True
|
|
assert f.validate_value("maybe") is False
|
|
|
|
def test_validate_date(self):
|
|
f = ExtractionField(name="date", description="Date", field_type="date")
|
|
assert f.validate_value("15/03/1965") is True
|
|
assert f.validate_value("2024-01-15") is True
|
|
assert f.validate_value("invalid") is False
|
|
|
|
def test_validate_regex(self):
|
|
f = ExtractionField(
|
|
name="ipp",
|
|
description="IPP",
|
|
field_type="text",
|
|
validation_regex=r"\d{6}",
|
|
)
|
|
assert f.validate_value("123456") is True
|
|
assert f.validate_value("12345") is False
|
|
assert f.validate_value("abcdef") is False
|
|
|
|
|
|
# ======================================================================
|
|
# ExtractionSchema
|
|
# ======================================================================
|
|
|
|
class TestExtractionSchema:
|
|
|
|
def test_from_dict(self, sample_schema):
|
|
data = sample_schema.to_dict()
|
|
rebuilt = ExtractionSchema.from_dict(data)
|
|
assert rebuilt.name == sample_schema.name
|
|
assert len(rebuilt.fields) == len(sample_schema.fields)
|
|
assert rebuilt.fields[0].name == "nom"
|
|
|
|
def test_yaml_roundtrip(self, tmp_path, sample_schema):
|
|
yaml_file = str(tmp_path / "schema.yaml")
|
|
sample_schema.to_yaml(yaml_file)
|
|
|
|
loaded = ExtractionSchema.from_yaml(yaml_file)
|
|
assert loaded.name == sample_schema.name
|
|
assert len(loaded.fields) == len(sample_schema.fields)
|
|
assert loaded.navigation == sample_schema.navigation
|
|
|
|
def test_from_yaml_not_found(self):
|
|
with pytest.raises(FileNotFoundError):
|
|
ExtractionSchema.from_yaml("/tmp/nonexistent_schema.yaml")
|
|
|
|
def test_required_fields(self, sample_schema):
|
|
required = sample_schema.required_fields
|
|
names = [f.name for f in required]
|
|
assert "nom" in names
|
|
assert "age" not in names
|
|
|
|
def test_field_names(self, sample_schema):
|
|
names = sample_schema.field_names
|
|
assert names == ["nom", "prenom", "date_naissance", "ipp", "age"]
|
|
|
|
def test_get_field(self, sample_schema):
|
|
f = sample_schema.get_field("ipp")
|
|
assert f is not None
|
|
assert f.field_type == "text"
|
|
assert sample_schema.get_field("inconnu") is None
|
|
|
|
def test_validate_record_valid(self, sample_schema):
|
|
record = {
|
|
"nom": "DUPONT",
|
|
"prenom": "Jean",
|
|
"date_naissance": "15/03/1965",
|
|
"ipp": "123456",
|
|
"age": "58",
|
|
}
|
|
result = sample_schema.validate_record(record)
|
|
assert result["valid"] is True
|
|
assert result["errors"] == []
|
|
assert result["completeness"] == 1.0
|
|
|
|
def test_validate_record_missing_required(self, sample_schema):
|
|
record = {
|
|
"nom": "DUPONT",
|
|
"prenom": "",
|
|
"date_naissance": "15/03/1965",
|
|
"ipp": "123456",
|
|
}
|
|
result = sample_schema.validate_record(record)
|
|
assert result["valid"] is False
|
|
assert len(result["errors"]) > 0
|
|
|
|
def test_validate_record_invalid_format(self, sample_schema):
|
|
record = {
|
|
"nom": "DUPONT",
|
|
"prenom": "Jean",
|
|
"date_naissance": "invalid_date",
|
|
"ipp": "123456",
|
|
}
|
|
result = sample_schema.validate_record(record)
|
|
assert result["valid"] is False
|
|
|
|
def test_load_example_yaml(self):
|
|
"""Charger le fichier d'exemple dossier_patient.yaml"""
|
|
yaml_path = Path(__file__).parent.parent.parent / "data" / "extraction_schemas" / "dossier_patient.yaml"
|
|
if yaml_path.exists():
|
|
schema = ExtractionSchema.from_yaml(str(yaml_path))
|
|
assert schema.name == "dossier_patient"
|
|
assert len(schema.fields) >= 4
|
|
assert schema.navigation["type"] == "list_detail"
|
|
|
|
|
|
# ======================================================================
|
|
# DataStore
|
|
# ======================================================================
|
|
|
|
class TestDataStore:
|
|
|
|
def test_create_extraction(self, data_store, sample_schema):
|
|
eid = data_store.create_extraction(sample_schema)
|
|
assert eid is not None
|
|
assert len(eid) == 36 # UUID format
|
|
|
|
def test_get_extraction(self, data_store, sample_schema):
|
|
eid = data_store.create_extraction(sample_schema)
|
|
ext = data_store.get_extraction(eid)
|
|
assert ext is not None
|
|
assert ext["schema_name"] == "test_patient"
|
|
assert ext["status"] == "in_progress"
|
|
|
|
def test_add_and_get_records(self, data_store, sample_schema):
|
|
eid = data_store.create_extraction(sample_schema)
|
|
|
|
data_store.add_record(
|
|
extraction_id=eid,
|
|
data={"nom": "DUPONT", "prenom": "Jean"},
|
|
confidence=0.85,
|
|
)
|
|
data_store.add_record(
|
|
extraction_id=eid,
|
|
data={"nom": "MARTIN", "prenom": "Marie"},
|
|
confidence=0.92,
|
|
)
|
|
|
|
records = data_store.get_records(eid)
|
|
assert len(records) == 2
|
|
assert records[0]["data"]["nom"] == "DUPONT"
|
|
assert records[1]["confidence"] == 0.92
|
|
|
|
def test_finish_extraction(self, data_store, sample_schema):
|
|
eid = data_store.create_extraction(sample_schema)
|
|
data_store.finish_extraction(eid, status="completed")
|
|
ext = data_store.get_extraction(eid)
|
|
assert ext["status"] == "completed"
|
|
|
|
def test_list_extractions(self, data_store, sample_schema):
|
|
data_store.create_extraction(sample_schema)
|
|
data_store.create_extraction(sample_schema)
|
|
extractions = data_store.list_extractions()
|
|
assert len(extractions) == 2
|
|
|
|
def test_export_csv(self, data_store, sample_schema, tmp_path):
|
|
eid = data_store.create_extraction(sample_schema)
|
|
data_store.add_record(eid, {"nom": "DUPONT", "prenom": "Jean"}, confidence=0.9)
|
|
data_store.add_record(eid, {"nom": "MARTIN", "prenom": "Marie"}, confidence=0.8)
|
|
|
|
csv_path = str(tmp_path / "export.csv")
|
|
data_store.export_csv(eid, csv_path)
|
|
|
|
content = Path(csv_path).read_text(encoding="utf-8-sig")
|
|
assert "DUPONT" in content
|
|
assert "MARTIN" in content
|
|
# Verifier l'en-tete
|
|
lines = content.strip().split("\n")
|
|
assert "nom" in lines[0]
|
|
assert "prenom" in lines[0]
|
|
|
|
def test_export_csv_empty(self, data_store, sample_schema):
|
|
eid = data_store.create_extraction(sample_schema)
|
|
with pytest.raises(ValueError, match="Aucun enregistrement"):
|
|
data_store.export_csv(eid, "/tmp/empty.csv")
|
|
|
|
def test_get_stats(self, data_store, sample_schema):
|
|
eid = data_store.create_extraction(sample_schema)
|
|
data_store.add_record(eid, {"nom": "DUPONT", "prenom": "Jean", "ipp": "123"}, confidence=0.9)
|
|
data_store.add_record(eid, {"nom": "MARTIN", "prenom": None, "ipp": "456"}, confidence=0.7)
|
|
|
|
stats = data_store.get_stats(eid)
|
|
assert stats["record_count"] == 2
|
|
assert stats["avg_confidence"] == 0.8
|
|
assert "field_coverage" in stats
|
|
|
|
def test_delete_extraction(self, data_store, sample_schema):
|
|
eid = data_store.create_extraction(sample_schema)
|
|
data_store.add_record(eid, {"nom": "TEST"}, confidence=0.5)
|
|
|
|
assert data_store.delete_extraction(eid) is True
|
|
assert data_store.get_extraction(eid) is None
|
|
assert data_store.get_records(eid) == []
|
|
|
|
def test_record_count_updated(self, data_store, sample_schema):
|
|
eid = data_store.create_extraction(sample_schema)
|
|
data_store.add_record(eid, {"nom": "A"}, confidence=0.5)
|
|
data_store.add_record(eid, {"nom": "B"}, confidence=0.6)
|
|
|
|
ext = data_store.get_extraction(eid)
|
|
assert ext["record_count"] == 2
|
|
|
|
|
|
# ======================================================================
|
|
# FieldExtractor (mock VLM)
|
|
# ======================================================================
|
|
|
|
class TestFieldExtractor:
|
|
|
|
def test_extract_file_not_found(self, sample_schema):
|
|
extractor = FieldExtractor()
|
|
result = extractor.extract_fields("/tmp/nonexistent.png", sample_schema)
|
|
assert result["confidence"] == 0.0
|
|
assert len(result["errors"]) > 0
|
|
|
|
def test_parse_vlm_response_valid_json(self):
|
|
extractor = FieldExtractor()
|
|
data = extractor._parse_vlm_response('{"nom": "DUPONT", "prenom": "Jean"}')
|
|
assert data == {"nom": "DUPONT", "prenom": "Jean"}
|
|
|
|
def test_parse_vlm_response_json_in_text(self):
|
|
extractor = FieldExtractor()
|
|
text = 'Voici les resultats:\n{"nom": "DUPONT", "prenom": "Jean"}\nFin.'
|
|
data = extractor._parse_vlm_response(text)
|
|
assert data is not None
|
|
assert data["nom"] == "DUPONT"
|
|
|
|
def test_parse_vlm_response_markdown_json(self):
|
|
extractor = FieldExtractor()
|
|
text = '```json\n{"nom": "DUPONT"}\n```'
|
|
data = extractor._parse_vlm_response(text)
|
|
assert data is not None
|
|
assert data["nom"] == "DUPONT"
|
|
|
|
def test_parse_vlm_response_invalid(self):
|
|
extractor = FieldExtractor()
|
|
data = extractor._parse_vlm_response("pas du json du tout")
|
|
assert data is None
|
|
|
|
def test_parse_vlm_response_empty(self):
|
|
extractor = FieldExtractor()
|
|
assert extractor._parse_vlm_response("") is None
|
|
assert extractor._parse_vlm_response(None) is None
|
|
|
|
def test_build_extraction_prompt(self, sample_schema):
|
|
extractor = FieldExtractor()
|
|
prompt = extractor._build_extraction_prompt(sample_schema.fields)
|
|
assert "nom" in prompt
|
|
assert "prenom" in prompt
|
|
assert "OBLIGATOIRE" in prompt
|
|
assert "JSON" in prompt
|
|
|
|
@patch("core.extraction.field_extractor.requests.post")
|
|
def test_extract_via_vlm_success(self, mock_post, sample_schema, tmp_path):
|
|
# Creer un faux screenshot
|
|
img_path = tmp_path / "test.png"
|
|
img_path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
|
|
|
|
# Mocker la reponse Ollama
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"response": json.dumps({
|
|
"nom": "DUPONT",
|
|
"prenom": "Jean",
|
|
"date_naissance": "15/03/1965",
|
|
"ipp": "123456",
|
|
"age": "58",
|
|
})
|
|
}
|
|
mock_post.return_value = mock_response
|
|
|
|
extractor = FieldExtractor()
|
|
result = extractor.extract_fields(str(img_path), sample_schema)
|
|
|
|
assert result["data"]["nom"] == "DUPONT"
|
|
assert result["data"]["prenom"] == "Jean"
|
|
assert result["confidence"] > 0.0
|
|
assert len(result["errors"]) == 0
|
|
|
|
@patch("core.extraction.field_extractor.requests.post")
|
|
def test_extract_via_vlm_connection_error(self, mock_post, sample_schema, tmp_path):
|
|
"""VLM indisponible -> donnees vides."""
|
|
img_path = tmp_path / "test.png"
|
|
img_path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
|
|
|
|
import requests as req
|
|
mock_post.side_effect = req.exceptions.ConnectionError("Connection refused")
|
|
|
|
extractor = FieldExtractor()
|
|
result = extractor.extract_fields(str(img_path), sample_schema)
|
|
|
|
# Doit retourner un resultat (meme vide) sans lever d'exception
|
|
assert "data" in result
|
|
assert result["confidence"] == 0.0
|
|
|
|
def test_check_vlm_available_down(self):
|
|
extractor = FieldExtractor(ollama_url="http://localhost:99999")
|
|
assert extractor.check_vlm_available() is False
|
|
|
|
|
|
# ======================================================================
|
|
# IterationController
|
|
# ======================================================================
|
|
|
|
class TestIterationController:
|
|
|
|
def test_has_next(self, sample_schema):
|
|
ctrl = IterationController(sample_schema)
|
|
assert ctrl.has_next() is True
|
|
|
|
def test_max_records(self, sample_schema):
|
|
ctrl = IterationController(sample_schema)
|
|
assert ctrl.max_records == 5
|
|
|
|
def test_mark_finished(self, sample_schema):
|
|
ctrl = IterationController(sample_schema)
|
|
assert ctrl.has_next() is True
|
|
ctrl.mark_finished()
|
|
assert ctrl.has_next() is False
|
|
|
|
def test_reset(self, sample_schema):
|
|
ctrl = IterationController(sample_schema)
|
|
ctrl.current_index = 3
|
|
ctrl.mark_finished()
|
|
ctrl.reset()
|
|
assert ctrl.current_index == 0
|
|
assert ctrl.has_next() is True
|
|
|
|
def test_progress(self, sample_schema):
|
|
ctrl = IterationController(sample_schema)
|
|
ctrl.current_index = 2
|
|
progress = ctrl.progress
|
|
assert progress["current_index"] == 2
|
|
assert progress["max_records"] == 5
|
|
assert progress["progress_pct"] == 40.0
|
|
|
|
@patch("core.extraction.iteration_controller.time.sleep")
|
|
def test_navigate_manual(self, mock_sleep, sample_schema):
|
|
"""Navigation manuelle = juste un delai."""
|
|
ctrl = IterationController(sample_schema)
|
|
result = ctrl.navigate_to_next("test-session")
|
|
assert result is True
|
|
assert ctrl.current_index == 1
|
|
|
|
|
|
# ======================================================================
|
|
# ExtractionEngine (integration avec mocks)
|
|
# ======================================================================
|
|
|
|
class TestExtractionEngine:
|
|
|
|
def test_extract_current_screen_mock(self, sample_schema, tmp_path):
|
|
"""Test d'extraction ponctuelle avec VLM mocke."""
|
|
# Creer un faux screenshot
|
|
img_path = tmp_path / "screen.png"
|
|
img_path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
|
|
|
|
# Mocker le FieldExtractor
|
|
mock_extractor = MagicMock()
|
|
mock_extractor.extract_fields.return_value = {
|
|
"data": {"nom": "DUPONT", "prenom": "Jean", "date_naissance": "15/03/1965", "ipp": "123"},
|
|
"confidence": 0.9,
|
|
"errors": [],
|
|
"raw_response": "{}",
|
|
}
|
|
|
|
engine = ExtractionEngine(
|
|
schema=sample_schema,
|
|
store=DataStore(db_path=str(tmp_path / "test.db")),
|
|
field_extractor=mock_extractor,
|
|
)
|
|
|
|
result = engine.extract_current_screen(str(img_path))
|
|
assert result["data"]["nom"] == "DUPONT"
|
|
assert result["confidence"] == 0.9
|
|
assert "validation" in result
|
|
|
|
def test_extract_from_file(self, sample_schema, tmp_path):
|
|
"""Test extract_from_file (extraction + stockage)."""
|
|
img_path = tmp_path / "screen.png"
|
|
img_path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
|
|
|
|
mock_extractor = MagicMock()
|
|
mock_extractor.extract_fields.return_value = {
|
|
"data": {"nom": "MARTIN", "prenom": "Marie", "date_naissance": "01/01/1980", "ipp": "456"},
|
|
"confidence": 0.85,
|
|
"errors": [],
|
|
"raw_response": "{}",
|
|
}
|
|
|
|
store = DataStore(db_path=str(tmp_path / "test.db"))
|
|
engine = ExtractionEngine(
|
|
schema=sample_schema,
|
|
store=store,
|
|
field_extractor=mock_extractor,
|
|
)
|
|
|
|
result = engine.extract_from_file(str(img_path))
|
|
assert result["data"]["nom"] == "MARTIN"
|
|
assert "record_id" in result
|
|
assert "extraction_id" in result
|
|
|
|
# Verifier le stockage
|
|
records = store.get_records(result["extraction_id"])
|
|
assert len(records) == 1
|
|
|
|
def test_get_progress_not_running(self, sample_schema, tmp_path):
|
|
engine = ExtractionEngine(
|
|
schema=sample_schema,
|
|
store=DataStore(db_path=str(tmp_path / "test.db")),
|
|
)
|
|
progress = engine.get_progress()
|
|
assert progress["is_running"] is False
|
|
assert progress["schema_name"] == "test_patient"
|
|
|
|
|
|
# ======================================================================
|
|
# Import smoke test
|
|
# ======================================================================
|
|
|
|
class TestImports:
|
|
|
|
def test_import_all(self):
|
|
"""Verifier que tous les imports fonctionnent."""
|
|
from core.extraction import (
|
|
ExtractionEngine,
|
|
ExtractionSchema,
|
|
ExtractionField,
|
|
FieldExtractor,
|
|
DataStore,
|
|
IterationController,
|
|
)
|
|
assert ExtractionEngine is not None
|
|
assert ExtractionSchema is not None
|
|
assert ExtractionField is not None
|
|
assert FieldExtractor is not None
|
|
assert DataStore is not None
|
|
assert IterationController is not None
|