Files
rpa_vision_v3/tests/unit/test_extraction_engine.py
Dom cf495dd82f feat: chat unifié, GestureCatalog, Copilot, Léa UI, extraction données, vérification replay
Refonte majeure du système Agent Chat et ajout de nombreux modules :

- Chat unifié : suppression du dual Workflows/Agent Libre, tout passe par /api/chat
  avec résolution en 3 niveaux (workflow → geste → "montre-moi")
- GestureCatalog : 38 raccourcis clavier universels Windows avec matching sémantique,
  substitution automatique dans les replays, et endpoint /api/gestures
- Mode Copilot : exécution pas-à-pas des workflows avec validation humaine via WebSocket
  (approve/skip/abort) avant chaque action
- Léa UI (agent_v0/lea_ui/) : interface PyQt5 pour Windows avec overlay transparent
  pour feedback visuel pendant le replay
- Data Extraction (core/extraction/) : moteur d'extraction visuelle de données
  (OCR + VLM → SQLite), avec schémas YAML et export CSV/Excel
- ReplayVerifier (agent_v0/server_v1/) : vérification post-action par comparaison
  de screenshots, avec logique de retry (max 3)
- IntentParser durci : meilleur fallback regex, type GREETING, patterns améliorés
- Dashboard : nouvelles pages gestures, streaming, extractions
- Tests : 63 tests GestureCatalog, 47 tests extraction, corrections tests existants
- Dépréciation : /api/agent/plan et /api/agent/execute retournent HTTP 410,
  suppression du code hardcodé _plan_to_replay_actions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 10:02:09 +01:00

544 lines
20 KiB
Python

"""
Tests unitaires pour le moteur d'extraction de donnees.
Couvre : ExtractionSchema, ExtractionField, DataStore, FieldExtractor,
IterationController, ExtractionEngine.
"""
import json
import os
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
import yaml
from core.extraction import (
DataStore,
ExtractionEngine,
ExtractionField,
ExtractionSchema,
FieldExtractor,
IterationController,
)
# ======================================================================
# Fixtures
# ======================================================================
@pytest.fixture
def sample_schema():
"""Schema d'extraction minimal pour les tests."""
return ExtractionSchema(
name="test_patient",
description="Schema de test",
fields=[
ExtractionField(name="nom", description="Nom du patient", field_type="text", required=True),
ExtractionField(name="prenom", description="Prenom", field_type="text", required=True),
ExtractionField(
name="date_naissance",
description="Date de naissance",
field_type="date",
required=True,
validation_regex=r"\d{2}/\d{2}/\d{4}",
),
ExtractionField(name="ipp", description="IPP", field_type="text", required=True),
ExtractionField(name="age", description="Age", field_type="number", required=False),
],
navigation={"type": "manual", "max_records": 5, "delay_ms": 0},
)
@pytest.fixture
def tmp_db(tmp_path):
"""Base SQLite temporaire."""
return str(tmp_path / "test_store.db")
@pytest.fixture
def data_store(tmp_db):
"""DataStore avec base temporaire."""
return DataStore(db_path=tmp_db)
@pytest.fixture
def yaml_path(tmp_path, sample_schema):
"""Fichier YAML temporaire pour un schema."""
path = str(tmp_path / "test_schema.yaml")
sample_schema.to_yaml(path)
return path
# ======================================================================
# ExtractionField
# ======================================================================
class TestExtractionField:
def test_validate_required_present(self):
f = ExtractionField(name="nom", description="Nom", field_type="text", required=True)
assert f.validate_value("DUPONT") is True
def test_validate_required_missing(self):
f = ExtractionField(name="nom", description="Nom", field_type="text", required=True)
assert f.validate_value(None) is False
assert f.validate_value("") is False
def test_validate_optional_missing(self):
f = ExtractionField(name="note", description="Note", field_type="text", required=False)
assert f.validate_value(None) is True
assert f.validate_value("") is True
def test_validate_number(self):
f = ExtractionField(name="age", description="Age", field_type="number")
assert f.validate_value("42") is True
assert f.validate_value("3,14") is True # FR format
assert f.validate_value("abc") is False
def test_validate_boolean(self):
f = ExtractionField(name="actif", description="Actif", field_type="boolean")
assert f.validate_value("oui") is True
assert f.validate_value("true") is True
assert f.validate_value("faux") is True
assert f.validate_value("maybe") is False
def test_validate_date(self):
f = ExtractionField(name="date", description="Date", field_type="date")
assert f.validate_value("15/03/1965") is True
assert f.validate_value("2024-01-15") is True
assert f.validate_value("invalid") is False
def test_validate_regex(self):
f = ExtractionField(
name="ipp",
description="IPP",
field_type="text",
validation_regex=r"\d{6}",
)
assert f.validate_value("123456") is True
assert f.validate_value("12345") is False
assert f.validate_value("abcdef") is False
# ======================================================================
# ExtractionSchema
# ======================================================================
class TestExtractionSchema:
def test_from_dict(self, sample_schema):
data = sample_schema.to_dict()
rebuilt = ExtractionSchema.from_dict(data)
assert rebuilt.name == sample_schema.name
assert len(rebuilt.fields) == len(sample_schema.fields)
assert rebuilt.fields[0].name == "nom"
def test_yaml_roundtrip(self, tmp_path, sample_schema):
yaml_file = str(tmp_path / "schema.yaml")
sample_schema.to_yaml(yaml_file)
loaded = ExtractionSchema.from_yaml(yaml_file)
assert loaded.name == sample_schema.name
assert len(loaded.fields) == len(sample_schema.fields)
assert loaded.navigation == sample_schema.navigation
def test_from_yaml_not_found(self):
with pytest.raises(FileNotFoundError):
ExtractionSchema.from_yaml("/tmp/nonexistent_schema.yaml")
def test_required_fields(self, sample_schema):
required = sample_schema.required_fields
names = [f.name for f in required]
assert "nom" in names
assert "age" not in names
def test_field_names(self, sample_schema):
names = sample_schema.field_names
assert names == ["nom", "prenom", "date_naissance", "ipp", "age"]
def test_get_field(self, sample_schema):
f = sample_schema.get_field("ipp")
assert f is not None
assert f.field_type == "text"
assert sample_schema.get_field("inconnu") is None
def test_validate_record_valid(self, sample_schema):
record = {
"nom": "DUPONT",
"prenom": "Jean",
"date_naissance": "15/03/1965",
"ipp": "123456",
"age": "58",
}
result = sample_schema.validate_record(record)
assert result["valid"] is True
assert result["errors"] == []
assert result["completeness"] == 1.0
def test_validate_record_missing_required(self, sample_schema):
record = {
"nom": "DUPONT",
"prenom": "",
"date_naissance": "15/03/1965",
"ipp": "123456",
}
result = sample_schema.validate_record(record)
assert result["valid"] is False
assert len(result["errors"]) > 0
def test_validate_record_invalid_format(self, sample_schema):
record = {
"nom": "DUPONT",
"prenom": "Jean",
"date_naissance": "invalid_date",
"ipp": "123456",
}
result = sample_schema.validate_record(record)
assert result["valid"] is False
def test_load_example_yaml(self):
"""Charger le fichier d'exemple dossier_patient.yaml"""
yaml_path = Path(__file__).parent.parent.parent / "data" / "extraction_schemas" / "dossier_patient.yaml"
if yaml_path.exists():
schema = ExtractionSchema.from_yaml(str(yaml_path))
assert schema.name == "dossier_patient"
assert len(schema.fields) >= 4
assert schema.navigation["type"] == "list_detail"
# ======================================================================
# DataStore
# ======================================================================
class TestDataStore:
def test_create_extraction(self, data_store, sample_schema):
eid = data_store.create_extraction(sample_schema)
assert eid is not None
assert len(eid) == 36 # UUID format
def test_get_extraction(self, data_store, sample_schema):
eid = data_store.create_extraction(sample_schema)
ext = data_store.get_extraction(eid)
assert ext is not None
assert ext["schema_name"] == "test_patient"
assert ext["status"] == "in_progress"
def test_add_and_get_records(self, data_store, sample_schema):
eid = data_store.create_extraction(sample_schema)
data_store.add_record(
extraction_id=eid,
data={"nom": "DUPONT", "prenom": "Jean"},
confidence=0.85,
)
data_store.add_record(
extraction_id=eid,
data={"nom": "MARTIN", "prenom": "Marie"},
confidence=0.92,
)
records = data_store.get_records(eid)
assert len(records) == 2
assert records[0]["data"]["nom"] == "DUPONT"
assert records[1]["confidence"] == 0.92
def test_finish_extraction(self, data_store, sample_schema):
eid = data_store.create_extraction(sample_schema)
data_store.finish_extraction(eid, status="completed")
ext = data_store.get_extraction(eid)
assert ext["status"] == "completed"
def test_list_extractions(self, data_store, sample_schema):
data_store.create_extraction(sample_schema)
data_store.create_extraction(sample_schema)
extractions = data_store.list_extractions()
assert len(extractions) == 2
def test_export_csv(self, data_store, sample_schema, tmp_path):
eid = data_store.create_extraction(sample_schema)
data_store.add_record(eid, {"nom": "DUPONT", "prenom": "Jean"}, confidence=0.9)
data_store.add_record(eid, {"nom": "MARTIN", "prenom": "Marie"}, confidence=0.8)
csv_path = str(tmp_path / "export.csv")
data_store.export_csv(eid, csv_path)
content = Path(csv_path).read_text(encoding="utf-8-sig")
assert "DUPONT" in content
assert "MARTIN" in content
# Verifier l'en-tete
lines = content.strip().split("\n")
assert "nom" in lines[0]
assert "prenom" in lines[0]
def test_export_csv_empty(self, data_store, sample_schema):
eid = data_store.create_extraction(sample_schema)
with pytest.raises(ValueError, match="Aucun enregistrement"):
data_store.export_csv(eid, "/tmp/empty.csv")
def test_get_stats(self, data_store, sample_schema):
eid = data_store.create_extraction(sample_schema)
data_store.add_record(eid, {"nom": "DUPONT", "prenom": "Jean", "ipp": "123"}, confidence=0.9)
data_store.add_record(eid, {"nom": "MARTIN", "prenom": None, "ipp": "456"}, confidence=0.7)
stats = data_store.get_stats(eid)
assert stats["record_count"] == 2
assert stats["avg_confidence"] == 0.8
assert "field_coverage" in stats
def test_delete_extraction(self, data_store, sample_schema):
eid = data_store.create_extraction(sample_schema)
data_store.add_record(eid, {"nom": "TEST"}, confidence=0.5)
assert data_store.delete_extraction(eid) is True
assert data_store.get_extraction(eid) is None
assert data_store.get_records(eid) == []
def test_record_count_updated(self, data_store, sample_schema):
eid = data_store.create_extraction(sample_schema)
data_store.add_record(eid, {"nom": "A"}, confidence=0.5)
data_store.add_record(eid, {"nom": "B"}, confidence=0.6)
ext = data_store.get_extraction(eid)
assert ext["record_count"] == 2
# ======================================================================
# FieldExtractor (mock VLM)
# ======================================================================
class TestFieldExtractor:
def test_extract_file_not_found(self, sample_schema):
extractor = FieldExtractor()
result = extractor.extract_fields("/tmp/nonexistent.png", sample_schema)
assert result["confidence"] == 0.0
assert len(result["errors"]) > 0
def test_parse_vlm_response_valid_json(self):
extractor = FieldExtractor()
data = extractor._parse_vlm_response('{"nom": "DUPONT", "prenom": "Jean"}')
assert data == {"nom": "DUPONT", "prenom": "Jean"}
def test_parse_vlm_response_json_in_text(self):
extractor = FieldExtractor()
text = 'Voici les resultats:\n{"nom": "DUPONT", "prenom": "Jean"}\nFin.'
data = extractor._parse_vlm_response(text)
assert data is not None
assert data["nom"] == "DUPONT"
def test_parse_vlm_response_markdown_json(self):
extractor = FieldExtractor()
text = '```json\n{"nom": "DUPONT"}\n```'
data = extractor._parse_vlm_response(text)
assert data is not None
assert data["nom"] == "DUPONT"
def test_parse_vlm_response_invalid(self):
extractor = FieldExtractor()
data = extractor._parse_vlm_response("pas du json du tout")
assert data is None
def test_parse_vlm_response_empty(self):
extractor = FieldExtractor()
assert extractor._parse_vlm_response("") is None
assert extractor._parse_vlm_response(None) is None
def test_build_extraction_prompt(self, sample_schema):
extractor = FieldExtractor()
prompt = extractor._build_extraction_prompt(sample_schema.fields)
assert "nom" in prompt
assert "prenom" in prompt
assert "OBLIGATOIRE" in prompt
assert "JSON" in prompt
@patch("core.extraction.field_extractor.requests.post")
def test_extract_via_vlm_success(self, mock_post, sample_schema, tmp_path):
# Creer un faux screenshot
img_path = tmp_path / "test.png"
img_path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
# Mocker la reponse Ollama
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"response": json.dumps({
"nom": "DUPONT",
"prenom": "Jean",
"date_naissance": "15/03/1965",
"ipp": "123456",
"age": "58",
})
}
mock_post.return_value = mock_response
extractor = FieldExtractor()
result = extractor.extract_fields(str(img_path), sample_schema)
assert result["data"]["nom"] == "DUPONT"
assert result["data"]["prenom"] == "Jean"
assert result["confidence"] > 0.0
assert len(result["errors"]) == 0
@patch("core.extraction.field_extractor.requests.post")
def test_extract_via_vlm_connection_error(self, mock_post, sample_schema, tmp_path):
"""VLM indisponible -> donnees vides."""
img_path = tmp_path / "test.png"
img_path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
import requests as req
mock_post.side_effect = req.exceptions.ConnectionError("Connection refused")
extractor = FieldExtractor()
result = extractor.extract_fields(str(img_path), sample_schema)
# Doit retourner un resultat (meme vide) sans lever d'exception
assert "data" in result
assert result["confidence"] == 0.0
def test_check_vlm_available_down(self):
extractor = FieldExtractor(ollama_url="http://localhost:99999")
assert extractor.check_vlm_available() is False
# ======================================================================
# IterationController
# ======================================================================
class TestIterationController:
def test_has_next(self, sample_schema):
ctrl = IterationController(sample_schema)
assert ctrl.has_next() is True
def test_max_records(self, sample_schema):
ctrl = IterationController(sample_schema)
assert ctrl.max_records == 5
def test_mark_finished(self, sample_schema):
ctrl = IterationController(sample_schema)
assert ctrl.has_next() is True
ctrl.mark_finished()
assert ctrl.has_next() is False
def test_reset(self, sample_schema):
ctrl = IterationController(sample_schema)
ctrl.current_index = 3
ctrl.mark_finished()
ctrl.reset()
assert ctrl.current_index == 0
assert ctrl.has_next() is True
def test_progress(self, sample_schema):
ctrl = IterationController(sample_schema)
ctrl.current_index = 2
progress = ctrl.progress
assert progress["current_index"] == 2
assert progress["max_records"] == 5
assert progress["progress_pct"] == 40.0
@patch("core.extraction.iteration_controller.time.sleep")
def test_navigate_manual(self, mock_sleep, sample_schema):
"""Navigation manuelle = juste un delai."""
ctrl = IterationController(sample_schema)
result = ctrl.navigate_to_next("test-session")
assert result is True
assert ctrl.current_index == 1
# ======================================================================
# ExtractionEngine (integration avec mocks)
# ======================================================================
class TestExtractionEngine:
def test_extract_current_screen_mock(self, sample_schema, tmp_path):
"""Test d'extraction ponctuelle avec VLM mocke."""
# Creer un faux screenshot
img_path = tmp_path / "screen.png"
img_path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
# Mocker le FieldExtractor
mock_extractor = MagicMock()
mock_extractor.extract_fields.return_value = {
"data": {"nom": "DUPONT", "prenom": "Jean", "date_naissance": "15/03/1965", "ipp": "123"},
"confidence": 0.9,
"errors": [],
"raw_response": "{}",
}
engine = ExtractionEngine(
schema=sample_schema,
store=DataStore(db_path=str(tmp_path / "test.db")),
field_extractor=mock_extractor,
)
result = engine.extract_current_screen(str(img_path))
assert result["data"]["nom"] == "DUPONT"
assert result["confidence"] == 0.9
assert "validation" in result
def test_extract_from_file(self, sample_schema, tmp_path):
"""Test extract_from_file (extraction + stockage)."""
img_path = tmp_path / "screen.png"
img_path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
mock_extractor = MagicMock()
mock_extractor.extract_fields.return_value = {
"data": {"nom": "MARTIN", "prenom": "Marie", "date_naissance": "01/01/1980", "ipp": "456"},
"confidence": 0.85,
"errors": [],
"raw_response": "{}",
}
store = DataStore(db_path=str(tmp_path / "test.db"))
engine = ExtractionEngine(
schema=sample_schema,
store=store,
field_extractor=mock_extractor,
)
result = engine.extract_from_file(str(img_path))
assert result["data"]["nom"] == "MARTIN"
assert "record_id" in result
assert "extraction_id" in result
# Verifier le stockage
records = store.get_records(result["extraction_id"])
assert len(records) == 1
def test_get_progress_not_running(self, sample_schema, tmp_path):
engine = ExtractionEngine(
schema=sample_schema,
store=DataStore(db_path=str(tmp_path / "test.db")),
)
progress = engine.get_progress()
assert progress["is_running"] is False
assert progress["schema_name"] == "test_patient"
# ======================================================================
# Import smoke test
# ======================================================================
class TestImports:
def test_import_all(self):
"""Verifier que tous les imports fonctionnent."""
from core.extraction import (
ExtractionEngine,
ExtractionSchema,
ExtractionField,
FieldExtractor,
DataStore,
IterationController,
)
assert ExtractionEngine is not None
assert ExtractionSchema is not None
assert ExtractionField is not None
assert FieldExtractor is not None
assert DataStore is not None
assert IterationController is not None