From c0616e6c9f9e37d839aa4bc2e77a4cc32bbc0db4 Mon Sep 17 00:00:00 2001 From: dom Date: Thu, 5 Mar 2026 00:37:34 +0100 Subject: [PATCH] chore: add .gitignore --- .gitignore | 78 ++++++- t2a-extractor/.gitignore | 6 + t2a-extractor/README.md | 63 +++++ t2a-extractor/architecture.html | 115 ++++++++++ t2a-extractor/config.py | 44 ++++ t2a-extractor/extractor/__init__.py | 6 + t2a-extractor/extractor/exporter.py | 157 +++++++++++++ t2a-extractor/extractor/llm_extractor.py | 279 +++++++++++++++++++++++ t2a-extractor/extractor/normalizer.py | 248 ++++++++++++++++++++ t2a-extractor/extractor/pdf_reader.py | 169 ++++++++++++++ t2a-extractor/extractor/segmenter.py | 279 +++++++++++++++++++++++ t2a-extractor/extractor/validator.py | 148 ++++++++++++ t2a-extractor/main.py | 205 +++++++++++++++++ t2a-extractor/requirements.txt | 17 ++ t2a-extractor/setup.sh | 78 +++++++ 15 files changed, 1888 insertions(+), 4 deletions(-) create mode 100644 t2a-extractor/.gitignore create mode 100644 t2a-extractor/README.md create mode 100644 t2a-extractor/architecture.html create mode 100644 t2a-extractor/config.py create mode 100644 t2a-extractor/extractor/__init__.py create mode 100644 t2a-extractor/extractor/exporter.py create mode 100644 t2a-extractor/extractor/llm_extractor.py create mode 100644 t2a-extractor/extractor/normalizer.py create mode 100644 t2a-extractor/extractor/pdf_reader.py create mode 100644 t2a-extractor/extractor/segmenter.py create mode 100644 t2a-extractor/extractor/validator.py create mode 100644 t2a-extractor/main.py create mode 100644 t2a-extractor/requirements.txt create mode 100755 t2a-extractor/setup.sh diff --git a/.gitignore b/.gitignore index 0344e59..7cc8b7e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,76 @@ -.venv/ +# === Python === __pycache__/ -*.pyc -output/ +*.py[cod] +*.pyo +*.egg-info/ +*.egg +dist/ +build/ +*.whl + +# === Virtual environments === +.venv/ +venv/ +venv_*/ +env/ + +# === ML Models & Data === +*.pt +*.pth +*.onnx +*.bin +*.safetensors +*.h5 +*.hdf5 +*.pkl +*.pickle +*.npy +*.npz +*.faiss +models/ +*.tar.gz +*.zip + +# === Documents & Media === *.pdf -files.zip +*.docx +*.xlsx +*.csv +*.png +*.jpg +*.jpeg +*.gif +*.mp3 +*.wav +*.mp4 + +# === IDE === +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# === OS === +.DS_Store +Thumbs.db +.~lock.* + +# === Secrets === +.env +*.env +credentials.json +token.pickle + +# === Logs & Cache === +*.log +logs/ +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +htmlcov/ +.coverage + +# === Backups === +*_backup_* +backups/ diff --git a/t2a-extractor/.gitignore b/t2a-extractor/.gitignore new file mode 100644 index 0000000..0344e59 --- /dev/null +++ b/t2a-extractor/.gitignore @@ -0,0 +1,6 @@ +.venv/ +__pycache__/ +*.pyc +output/ +*.pdf +files.zip diff --git a/t2a-extractor/README.md b/t2a-extractor/README.md new file mode 100644 index 0000000..2e4fb4d --- /dev/null +++ b/t2a-extractor/README.md @@ -0,0 +1,63 @@ +# T2A Extractor + +Extraction structurée de rapports de contrôle T2A (décisions UCR) depuis des PDF natifs et scannés. + +## Architecture + +``` +PDF UCR → [pymupdf/docTR] → texte brut → [regex] → blocs OGC → [VLM Ollama] → JSON → [validation] → Excel/CSV +``` + +### Pipeline + +1. **Extraction texte** — Détection automatique natif/scanné par page. pymupdf pour le natif, docTR pour l'OCR. +2. **Segmentation** — Découpage en blocs par Champ et par OGC (individuels et groupés) via regex. +3. **Extraction structurée** — Chaque bloc est envoyé au VLM local (Ollama) qui retourne un JSON structuré. +4. **Validation** — Vérification des codes CIM-10/CCAM, cohérence des décisions. +5. **Export** — Excel formaté (avec coloration des décisions) et CSV optionnel. + +## Schéma de sortie (11 colonnes) + +| Colonne | Description | +|---|---| +| `champ` | Numéro de champ | +| `num_ogc` | Numéro OGC | +| `type_desaccord` | DP / DAS / DP+DAS / Actes | +| `codes_etablissement` | Codes CIM-10/CCAM de l'établissement | +| `libelle_etablissement` | Libellé du codage établissement | +| `codes_controleurs` | Codes CIM-10/CCAM des contrôleurs | +| `libelle_controleurs` | Libellé du codage contrôleurs | +| `decision_ucr` | Favorable / Défavorable (pour l'établissement) | +| `codes_retenus` | Codes finalement retenus | +| `ghm_ghs` | GHM/GHS si mentionnés | +| `texte_decision` | Texte intégral de la décision UCR | + +## Installation + +```bash +chmod +x setup.sh +./setup.sh +``` + +## Usage + +```bash +source .venv/bin/activate +python main.py rapport_ucr.pdf +python main.py rapport_ucr.pdf --csv --verbose +python main.py rapport_ucr.pdf -o /chemin/sortie --csv -v +``` + +## Prérequis + +- Python 3.12+ +- Ollama avec un VLM (gemma3:27b-cloud par défaut) +- GPU recommandé pour docTR (fonctionne aussi en CPU) + +## Configuration + +Éditer `config.py` pour ajuster : +- `OLLAMA_MODEL` — modèle à utiliser +- `OLLAMA_BASE_URL` — URL du serveur Ollama +- `OCR_DPI` — résolution OCR (défaut: 200) +- `NATIVE_TEXT_MIN_CHARS` — seuil de détection natif/scanné diff --git a/t2a-extractor/architecture.html b/t2a-extractor/architecture.html new file mode 100644 index 0000000..d4f0a1a --- /dev/null +++ b/t2a-extractor/architecture.html @@ -0,0 +1,115 @@ + + + + + T2A Extractor — Architecture + + + +

T2A Extractor

+

Pipeline d'extraction structurée de rapports UCR

+
+flowchart TD + subgraph INPUT["📄 Entrée"] + PDF["PDF UCR<br/>natif + scanné"] + end + + subgraph ETAPE1["📖 Étape 1 — Extraction texte"] + DETECT{"Page native<br/>ou scannée ?"} + PYMUPDF["<b>PyMuPDF</b><br/>texte natif"] + DOCTR["<b>docTR + Torch</b><br/>OCR"] + MERGE(["Texte brut complet"]) + end + + subgraph ETAPE2["✂️ Étape 2 — Segmentation"] + REGEX["Regex<br/>par Champ + OGC"] + OGC_BLOCKS["Blocs OGC<br/>individuels / groupés"] + CHAMP_BLOCKS["Blocs Champ<br/>décisions globales"] + end + + subgraph ETAPE3["🤖 Étape 3 — Extraction structurée"] + OLLAMA["<b>Ollama</b><br/>gemma3:27b-cloud"] + JSON["JSON structuré<br/>11 champs par OGC"] + end + + subgraph ETAPE35["🔧 Étape 3.5 — Normalisation"] + CIM10["Correction codes CIM-10<br/>&bull; OCR chiffre ↔ lettre<br/>&bull; Point manquant / mal placé<br/>&bull; Décimales excédentaires"] + RETENUS["Auto-remplissage<br/>codes_retenus"] + TEXTE["Fallback regex<br/>texte_decision"] + end + + subgraph ETAPE4["✅ Étape 4 — Validation"] + VALID["Vérification formats<br/>CIM-10 / CCAM"] + SAFETY["Safety-net<br/>2e passe normalizer"] + COHERENCE["Cohérence<br/>décision ↔ codes"] + end + + subgraph ETAPE5["📊 Étape 5 — Export"] + EXCEL["<b>Excel</b> .xlsx<br/>coloration décisions"] + CSV["<b>CSV</b><br/>optionnel"] + end + + PDF --> DETECT + DETECT -->|"≥ 50 chars"| PYMUPDF + DETECT -->|"< 50 chars"| DOCTR + PYMUPDF --> MERGE + DOCTR --> MERGE + + MERGE --> REGEX + REGEX --> OGC_BLOCKS + REGEX --> CHAMP_BLOCKS + + OGC_BLOCKS --> OLLAMA + CHAMP_BLOCKS --> OLLAMA + OLLAMA --> JSON + + JSON --> CIM10 + CIM10 --> RETENUS + RETENUS --> TEXTE + + TEXTE --> VALID + VALID --> SAFETY + SAFETY --> COHERENCE + + COHERENCE --> EXCEL + COHERENCE --> CSV + + style INPUT fill:#e8f4fd,stroke:#2196F3,stroke-width:2px,color:#000 + style ETAPE1 fill:#fff3e0,stroke:#FF9800,stroke-width:2px,color:#000 + style ETAPE2 fill:#f3e5f5,stroke:#9C27B0,stroke-width:2px,color:#000 + style ETAPE3 fill:#e8f5e9,stroke:#4CAF50,stroke-width:2px,color:#000 + style ETAPE35 fill:#fce4ec,stroke:#E91E63,stroke-width:2px,color:#000 + style ETAPE4 fill:#fff8e1,stroke:#FFC107,stroke-width:2px,color:#000 + style ETAPE5 fill:#e0f2f1,stroke:#009688,stroke-width:2px,color:#000 +
+ + + + diff --git a/t2a-extractor/config.py b/t2a-extractor/config.py new file mode 100644 index 0000000..4024f5d --- /dev/null +++ b/t2a-extractor/config.py @@ -0,0 +1,44 @@ +""" +Configuration T2A Extractor +""" +from pathlib import Path + +# === Ollama === +OLLAMA_BASE_URL = "http://localhost:11434" +OLLAMA_MODEL = "gemma3:27b-cloud" # À adapter selon le tag exact +OLLAMA_TIMEOUT = 120 # secondes par requête +OLLAMA_MAX_RETRIES = 2 + +# === OCR (docTR) === +DOCTR_DET_ARCH = "db_resnet50" +DOCTR_RECO_ARCH = "crnn_vgg16_bn" +OCR_DPI = 200 # résolution pour conversion page → image +OCR_MIN_CONFIDENCE = 0.5 # seuil de confiance minimum docTR + +# === Extraction PDF === +# Seuil de caractères pour considérer une page comme "native" +# (certaines pages scannées ont quelques caractères parasites) +NATIVE_TEXT_MIN_CHARS = 50 + +# === Schéma de sortie === +OUTPUT_COLUMNS = [ + "champ", + "num_ogc", + "type_desaccord", + "codes_etablissement", + "libelle_etablissement", + "codes_controleurs", + "libelle_controleurs", + "decision_ucr", + "codes_retenus", + "ghm_ghs", + "texte_decision", +] + +# Valeurs autorisées pour les enums +DECISION_VALUES = ["Favorable", "Défavorable"] +TYPE_DESACCORD_VALUES = ["DP", "DAS", "DP+DAS", "Actes"] + +# === Chemins === +PROJECT_ROOT = Path(__file__).parent +DEFAULT_OUTPUT_DIR = PROJECT_ROOT / "output" diff --git a/t2a-extractor/extractor/__init__.py b/t2a-extractor/extractor/__init__.py new file mode 100644 index 0000000..6a079c2 --- /dev/null +++ b/t2a-extractor/extractor/__init__.py @@ -0,0 +1,6 @@ +import sys +from pathlib import Path + +_project_root = str(Path(__file__).resolve().parent.parent) +if _project_root not in sys.path: + sys.path.insert(0, _project_root) diff --git a/t2a-extractor/extractor/exporter.py b/t2a-extractor/extractor/exporter.py new file mode 100644 index 0000000..5492df9 --- /dev/null +++ b/t2a-extractor/extractor/exporter.py @@ -0,0 +1,157 @@ +""" +Export des données extraites en Excel et CSV. +""" +import csv +import logging +from pathlib import Path +from openpyxl import Workbook +from openpyxl.styles import Font, PatternFill, Alignment, Border, Side + +from config import OUTPUT_COLUMNS + +logger = logging.getLogger(__name__) + +# Styles Excel +HEADER_FONT = Font(bold=True, color="FFFFFF", size=11, name="Arial") +HEADER_FILL = PatternFill("solid", fgColor="2F5496") +HEADER_ALIGN = Alignment(horizontal="center", vertical="center", wrap_text=True) +CELL_ALIGN = Alignment(vertical="top", wrap_text=True) +CELL_FONT = Font(name="Arial", size=10) +THIN_BORDER = Border( + left=Side(style='thin'), + right=Side(style='thin'), + top=Side(style='thin'), + bottom=Side(style='thin'), +) + +# Couleurs de décision +FILL_FAVORABLE = PatternFill("solid", fgColor="CCFFCC") # Vert clair +FILL_DEFAVORABLE = PatternFill("solid", fgColor="FFCCCC") # Rouge clair +FILL_UNKNOWN = PatternFill("solid", fgColor="FFFFCC") # Jaune clair +FILL_ERROR = PatternFill("solid", fgColor="FFD9CC") # Orange clair + +# Largeurs de colonnes +COLUMN_WIDTHS = { + "champ": 8, + "num_ogc": 10, + "type_desaccord": 14, + "codes_etablissement": 22, + "libelle_etablissement": 40, + "codes_controleurs": 22, + "libelle_controleurs": 40, + "decision_ucr": 16, + "codes_retenus": 22, + "ghm_ghs": 22, + "texte_decision": 80, +} + +# Labels d'en-tête plus lisibles +HEADER_LABELS = { + "champ": "Champ", + "num_ogc": "N° OGC", + "type_desaccord": "Type désaccord", + "codes_etablissement": "Codes Établissement", + "libelle_etablissement": "Libellé Établissement", + "codes_controleurs": "Codes Contrôleurs", + "libelle_controleurs": "Libellé Contrôleurs", + "decision_ucr": "Décision UCR", + "codes_retenus": "Codes retenus", + "ghm_ghs": "GHM / GHS", + "texte_decision": "Texte décision", +} + + +def _extraction_to_row(extraction) -> dict: + """Convertit une extraction en dictionnaire pour l'export.""" + return { + "champ": extraction.champ, + "num_ogc": extraction.num_ogc, + "type_desaccord": extraction.type_desaccord, + "codes_etablissement": extraction.codes_etablissement, + "libelle_etablissement": extraction.libelle_etablissement, + "codes_controleurs": extraction.codes_controleurs, + "libelle_controleurs": extraction.libelle_controleurs, + "decision_ucr": extraction.decision_ucr, + "codes_retenus": extraction.codes_retenus, + "ghm_ghs": extraction.ghm_ghs, + "texte_decision": extraction.texte_decision, + } + + +def export_excel(extractions: list, output_path: str | Path) -> int: + """ + Exporte les extractions en fichier Excel formaté. + Retourne le nombre de lignes exportées. + """ + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + wb = Workbook() + ws = wb.active + ws.title = "Décisions UCR" + + # En-têtes + for col_idx, col_name in enumerate(OUTPUT_COLUMNS, 1): + cell = ws.cell(row=1, column=col_idx, value=HEADER_LABELS.get(col_name, col_name)) + cell.font = HEADER_FONT + cell.fill = HEADER_FILL + cell.alignment = HEADER_ALIGN + cell.border = THIN_BORDER + + # Données + for row_idx, extraction in enumerate(extractions, 2): + row_data = _extraction_to_row(extraction) + + for col_idx, col_name in enumerate(OUTPUT_COLUMNS, 1): + value = row_data.get(col_name) + cell = ws.cell(row=row_idx, column=col_idx, value=value) + cell.alignment = CELL_ALIGN + cell.font = CELL_FONT + cell.border = THIN_BORDER + + # Colorer la cellule décision + decision_col = OUTPUT_COLUMNS.index("decision_ucr") + 1 + decision_cell = ws.cell(row=row_idx, column=decision_col) + decision_value = row_data.get("decision_ucr", "") + + if not extraction.extraction_success: + decision_cell.fill = FILL_ERROR + elif decision_value == "Favorable": + decision_cell.fill = FILL_FAVORABLE + elif decision_value == "Défavorable": + decision_cell.fill = FILL_DEFAVORABLE + else: + decision_cell.fill = FILL_UNKNOWN + + # Largeurs de colonnes + for col_idx, col_name in enumerate(OUTPUT_COLUMNS, 1): + col_letter = chr(64 + col_idx) if col_idx <= 26 else chr(64 + (col_idx - 1) // 26) + chr(65 + (col_idx - 1) % 26) + ws.column_dimensions[col_letter].width = COLUMN_WIDTHS.get(col_name, 15) + + # Filtres et gel + ws.auto_filter.ref = f"A1:{chr(64 + len(OUTPUT_COLUMNS))}{len(extractions) + 1}" + ws.freeze_panes = "A2" + + wb.save(str(output_path)) + logger.info(f"Excel exporté : {output_path} ({len(extractions)} lignes)") + return len(extractions) + + +def export_csv(extractions: list, output_path: str | Path) -> int: + """ + Exporte les extractions en fichier CSV. + Retourne le nombre de lignes exportées. + """ + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=OUTPUT_COLUMNS, delimiter=';') + writer.writeheader() + + for extraction in extractions: + row = _extraction_to_row(extraction) + writer.writerow(row) + + logger.info(f"CSV exporté : {output_path} ({len(extractions)} lignes)") + return len(extractions) diff --git a/t2a-extractor/extractor/llm_extractor.py b/t2a-extractor/extractor/llm_extractor.py new file mode 100644 index 0000000..95de4e9 --- /dev/null +++ b/t2a-extractor/extractor/llm_extractor.py @@ -0,0 +1,279 @@ +""" +Extraction structurée des données OGC via VLM (Ollama). +Envoie chaque bloc de texte au modèle et parse la réponse JSON. +""" +import json +import re +import logging +import requests +from dataclasses import dataclass + +from config import OLLAMA_BASE_URL, OLLAMA_MODEL, OLLAMA_TIMEOUT, OLLAMA_MAX_RETRIES + +logger = logging.getLogger(__name__) + +# Prompt système pour l'extraction +SYSTEM_PROMPT = """Tu es un expert en codage PMSI et contrôle T2A. +Tu extrais des données structurées depuis des rapports de décision UCR (Unité de Coordination Régionale). + +Tu dois retourner UNIQUEMENT un objet JSON valide, sans aucun texte avant ou après. +Pas de markdown, pas de commentaires, pas de ```json```. + +Le JSON doit respecter exactement ce schéma : +{ + "type_desaccord": "DP" | "DAS" | "DP+DAS" | "Actes" | null, + "codes_etablissement": "code(s) CIM-10 ou CCAM séparés par des virgules" | null, + "libelle_etablissement": "libellé/description du codage établissement" | null, + "codes_controleurs": "code(s) CIM-10 ou CCAM séparés par des virgules" | null, + "libelle_controleurs": "libellé/description du codage contrôleurs" | null, + "decision_ucr": "Favorable" | "Défavorable", + "codes_retenus": "code(s) finalement retenus par l'UCR" | null, + "ghm_ghs": "GHM et/ou GHS mentionnés (ex: 07C133 / GHS 2349)" | null, + "texte_decision": "texte intégral de la décision UCR, copié tel quel" +} + +Règles importantes : +- "decision_ucr" : "Favorable" = l'UCR retient/confirme la position de l'établissement. "Défavorable" = l'UCR confirme l'avis des médecins contrôleurs ou rejette la demande de l'établissement. +- "codes_etablissement" : uniquement les codes (K85.1, T81.0, ZZQK002...), PAS les libellés +- "libelle_etablissement" : le texte descriptif du codage (ex: "pancréatite aigüe d'origine biliaire") +- "codes_controleurs" : idem, uniquement les codes. Si "non repris" ou "DAS non repris", mettre null +- "codes_retenus" : les codes qui résultent de la décision finale de l'UCR. Ce champ ne doit JAMAIS être vide : si Défavorable, retenir les codes contrôleurs ; si Favorable, retenir les codes établissement. +- "ghm_ghs" : extraire si mentionné dans le texte, sinon null +- "texte_decision" : le paragraphe complet commençant par "DECISION UCR" ou "PROPOSITION UCR", copié intégralement (minimum 50 caractères). Inclure tout le paragraphe de décision, pas un résumé. +- Les codes CIM-10 commencent TOUJOURS par une lettre majuscule (A-Z), jamais par un chiffre. Si l'OCR a lu "167.3", c'est "I67.3". Si "085", c'est "O85". Corriger systématiquement. +- Pour les OGC groupés, la même décision s'applique à tous les OGC du groupe +""" + +USER_PROMPT_TEMPLATE = """Extrais les données structurées de ce bloc de rapport UCR. + +Champ : {champ} +OGC concerné(s) : {ogc_numbers} + +--- TEXTE DU BLOC --- +{block_text} +--- FIN DU TEXTE --- + +Retourne UNIQUEMENT le JSON structuré.""" + + +@dataclass +class OGCExtraction: + """Données extraites d'un bloc OGC.""" + champ: int + num_ogc: int + type_desaccord: str | None + codes_etablissement: str | None + libelle_etablissement: str | None + codes_controleurs: str | None + libelle_controleurs: str | None + decision_ucr: str | None + codes_retenus: str | None + ghm_ghs: str | None + texte_decision: str | None + extraction_success: bool = True + error_message: str | None = None + _raw_block_text: str | None = None + + +def _call_ollama(system_prompt: str, user_prompt: str) -> str: + """Appelle l'API Ollama et retourne la réponse texte.""" + url = f"{OLLAMA_BASE_URL}/api/chat" + payload = { + "model": OLLAMA_MODEL, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + "stream": False, + "options": { + "temperature": 0.1, # Extraction factuelle → température basse + "num_predict": 4096, + }, + } + + response = requests.post(url, json=payload, timeout=OLLAMA_TIMEOUT) + response.raise_for_status() + data = response.json() + return data["message"]["content"] + + +def _parse_json_response(response_text: str) -> dict | None: + """ + Parse la réponse du VLM en JSON. + Gère les cas où le modèle entoure le JSON de markdown. + """ + text = response_text.strip() + + # Supprimer les blocs markdown ```json ... ``` + json_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', text, re.DOTALL) + if json_match: + text = json_match.group(1).strip() + + # Essayer de trouver un objet JSON dans le texte + # Chercher la première { et la dernière } + first_brace = text.find('{') + last_brace = text.rfind('}') + if first_brace != -1 and last_brace != -1: + text = text[first_brace:last_brace + 1] + + try: + return json.loads(text) + except json.JSONDecodeError as e: + logger.warning(f"Échec parsing JSON : {e}") + logger.debug(f"Réponse brute : {response_text[:500]}") + return None + + +def _normalize_decision(decision: str | None) -> str | None: + """Normalise la décision en Favorable/Défavorable.""" + if not decision: + return None + d = decision.strip().lower() + if d in ("favorable", "favorable établissement", "favorable etab"): + return "Favorable" + if d in ("défavorable", "defavorable", "défavorable établissement", "defavorable etab"): + return "Défavorable" + # Heuristiques + if "favorable" in d and "défavorable" not in d and "defavorable" not in d: + return "Favorable" + if "défavorable" in d or "defavorable" in d: + return "Défavorable" + return decision # Garder tel quel si non reconnu + + +def _normalize_type_desaccord(type_d: str | None) -> str | None: + """Normalise le type de désaccord.""" + if not type_d: + return None + t = type_d.strip().upper() + if t in ("DP", "DAS", "ACTES"): + return t + if "DP" in t and "DAS" in t: + return "DP+DAS" + if t in ("DP+DAS", "DP ET DAS"): + return "DP+DAS" + return type_d + + +def extract_ogc_block(champ: int, ogc_numbers: list[int], block_text: str) -> list[OGCExtraction]: + """ + Extrait les données structurées d'un bloc OGC via le VLM. + Retourne une extraction par numéro OGC (dédoublonnage pour les groupés). + """ + user_prompt = USER_PROMPT_TEMPLATE.format( + champ=champ, + ogc_numbers=", ".join(str(n) for n in ogc_numbers), + block_text=block_text, + ) + + results = [] + parsed_data = None + + for attempt in range(1, OLLAMA_MAX_RETRIES + 1): + try: + logger.debug(f"OGC {ogc_numbers} — tentative {attempt}") + response_text = _call_ollama(SYSTEM_PROMPT, user_prompt) + parsed_data = _parse_json_response(response_text) + + if parsed_data: + break + + logger.warning(f"OGC {ogc_numbers} — réponse non parsable, retry...") + + except requests.exceptions.Timeout: + logger.warning(f"OGC {ogc_numbers} — timeout (tentative {attempt})") + except requests.exceptions.RequestException as e: + logger.error(f"OGC {ogc_numbers} — erreur réseau : {e}") + break + + if not parsed_data: + # Échec total : créer des entrées d'erreur + for num in ogc_numbers: + results.append(OGCExtraction( + champ=champ, + num_ogc=num, + type_desaccord=None, + codes_etablissement=None, + libelle_etablissement=None, + codes_controleurs=None, + libelle_controleurs=None, + decision_ucr=None, + codes_retenus=None, + ghm_ghs=None, + texte_decision=block_text, # On garde au moins le texte brut + extraction_success=False, + error_message="Échec extraction VLM après retries", + _raw_block_text=block_text, + )) + return results + + # Créer une extraction par OGC + for num in ogc_numbers: + results.append(OGCExtraction( + champ=champ, + num_ogc=num, + type_desaccord=_normalize_type_desaccord(parsed_data.get("type_desaccord")), + codes_etablissement=parsed_data.get("codes_etablissement"), + libelle_etablissement=parsed_data.get("libelle_etablissement"), + codes_controleurs=parsed_data.get("codes_controleurs"), + libelle_controleurs=parsed_data.get("libelle_controleurs"), + decision_ucr=_normalize_decision(parsed_data.get("decision_ucr")), + codes_retenus=parsed_data.get("codes_retenus"), + ghm_ghs=parsed_data.get("ghm_ghs"), + texte_decision=parsed_data.get("texte_decision"), + extraction_success=True, + _raw_block_text=block_text, + )) + + return results + + +def extract_champ_block(champ: int, block_text: str) -> OGCExtraction: + """ + Extrait les données d'un bloc Champ (décision globale sans OGC individuels). + """ + extractions = extract_ogc_block(champ, [0], block_text) + if extractions: + extraction = extractions[0] + extraction.num_ogc = None # Pas de numéro OGC pour un champ global + return extraction + + return OGCExtraction( + champ=champ, + num_ogc=None, + type_desaccord=None, + codes_etablissement=None, + libelle_etablissement=None, + codes_controleurs=None, + libelle_controleurs=None, + decision_ucr=None, + codes_retenus=None, + ghm_ghs=None, + texte_decision=block_text, + extraction_success=False, + error_message="Échec extraction VLM pour bloc champ", + ) + + +def check_ollama_available() -> bool: + """Vérifie que Ollama est accessible et que le modèle est chargé.""" + try: + # Vérifier la connexion + response = requests.get(f"{OLLAMA_BASE_URL}/api/tags", timeout=5) + response.raise_for_status() + models = response.json().get("models", []) + model_names = [m["name"] for m in models] + + if OLLAMA_MODEL in model_names or any(OLLAMA_MODEL in name for name in model_names): + logger.info(f"Ollama OK — modèle {OLLAMA_MODEL} disponible") + return True + + logger.error(f"Modèle {OLLAMA_MODEL} non trouvé. Modèles disponibles : {model_names}") + return False + + except requests.exceptions.ConnectionError: + logger.error(f"Ollama non accessible à {OLLAMA_BASE_URL}") + return False + except Exception as e: + logger.error(f"Erreur vérification Ollama : {e}") + return False diff --git a/t2a-extractor/extractor/normalizer.py b/t2a-extractor/extractor/normalizer.py new file mode 100644 index 0000000..23112da --- /dev/null +++ b/t2a-extractor/extractor/normalizer.py @@ -0,0 +1,248 @@ +""" +Post-traitement déterministe des extractions OGC. +Corrige les erreurs OCR sur les codes CIM-10, remplit les champs manquants, +et normalise les données avant validation. +""" +import re +import logging + +logger = logging.getLogger(__name__) + +# Pattern CIM-10 valide : lettre majuscule + 2 chiffres + optionnel .1-2 chiffres +CIM10_PATTERN = re.compile(r'^[A-Z]\d{2}(?:\.\d{1,2})?$') + +# Mapping OCR chiffre → lettre CIM-10 (position 1) +# L'OCR confond fréquemment ces lettres avec des chiffres +OCR_DIGIT_TO_LETTER = { + '1': 'I', # Chapitre I : Appareil circulatoire (I00-I99) + '0': 'O', # Chapitre O : Grossesse (O00-O99) + '5': 'S', # Chapitre S : Traumatismes (S00-S99) + '2': 'Z', # Chapitre Z : Facteurs influençant la santé (Z00-Z99) + '8': 'B', # Chapitre B : Infections (B00-B99) +} + + +def _fix_cim10_format(code: str) -> str: + """ + Corrige les problèmes de formatage d'un code CIM-10 : + - '+' au lieu de '.' (OCR/LLM confusion) : B99+1 → B99.1, J961+0 → J96.10 + - Point manquant : F050 → F05.0, F0110 → F01.10, R410 → R41.0 + - Point mal positionné : J961.0 → J96.10 (après remplacement +→.) + - Trop de décimales : Z04.880 → Z04.88 (max 2 après le point) + - OCR lettre→chiffre en positions 2-3 : LO2.2 → L02.2, LI3.3 → L13.3 + """ + # OCR lettre→chiffre dans les positions qui doivent être des chiffres (pos 1, 2) + # Ex: LO2.2 → L02.2 (O lu au lieu de 0), LI3.3 → L13.3 (I lu au lieu de 1) + OCR_LETTER_TO_DIGIT = {'O': '0', 'I': '1', 'S': '5', 'Z': '2', 'B': '8'} + if len(code) >= 3 and code[0].isalpha(): + chars = list(code) + fixed = False + for pos in (1, 2): + if chars[pos] in OCR_LETTER_TO_DIGIT: + chars[pos] = OCR_LETTER_TO_DIGIT[chars[pos]] + fixed = True + if fixed: + code = ''.join(chars) + + # Remplacer '+' par '.' (confusion OCR fréquente) + if '+' in code: + code = code.replace('+', '.') + + # Point mal positionné : lettre + 3+ chiffres + point + chiffres → repositionner + # Ex: J961.0 (de J961+0) → J96.10 + m = re.match(r'^([A-Z])(\d{3,})\.(\d+)$', code) + if m: + letter = m.group(1) + all_digits = m.group(2) + m.group(3) + code = letter + all_digits[:2] + '.' + all_digits[2:] + + # Point manquant : lettre + 3-4 chiffres sans point → insérer le point après pos 3 + if re.match(r'^[A-Z]\d{3,4}$', code): + code = code[:3] + '.' + code[3:] + + # Trop de décimales : tronquer à 2 chiffres après le point + m = re.match(r'^([A-Z]\d{2}\.)(\d{3,})$', code) + if m: + code = m.group(1) + m.group(2)[:2] + + return code + + +def normalize_cim10_code(code: str) -> tuple[str, bool]: + """ + Corrige un code CIM-10 : + 1. Formatage (point manquant, '+' → '.', décimales excédentaires) + 2. Confusion OCR chiffre/lettre en position 1 + + Retourne (code_corrigé, a_été_corrigé). + """ + code = code.strip() + if not code: + return code, False + + # Déjà valide → ne rien faire + if CIM10_PATTERN.match(code): + return code, False + + original = code + + # Étape 1 : corriger le formatage (point, +, décimales) + code = _fix_cim10_format(code) + if CIM10_PATTERN.match(code): + return code, code != original + + # Étape 2 : corriger la confusion OCR chiffre → lettre en position 1 + if code[0] in OCR_DIGIT_TO_LETTER: + candidate = OCR_DIGIT_TO_LETTER[code[0]] + code[1:] + if CIM10_PATTERN.match(candidate): + return candidate, True + + # Étape 3 : combiner les deux (formatage puis OCR) + # Ex: 1500 → _fix_format → 150.0 → OCR → I50.0 (peu probable mais safe) + if original[0] in OCR_DIGIT_TO_LETTER: + reformatted = _fix_cim10_format(OCR_DIGIT_TO_LETTER[original[0]] + original[1:]) + if CIM10_PATTERN.match(reformatted): + return reformatted, True + + return original, False + + +def normalize_codes_field(codes_str: str | None) -> tuple[str | None, list[str]]: + """ + Applique la correction CIM-10 à chaque code d'une chaîne séparée par virgules. + + Retourne (chaîne_corrigée, liste_des_corrections). + """ + if not codes_str: + return codes_str, [] + + # Purger le littéral "null" (le LLM écrit parfois le mot au lieu de ne rien mettre) + if codes_str.strip().lower() == 'null': + return None, ["'null' (littéral) → supprimé"] + + codes = [c.strip() for c in codes_str.split(',')] + corrections = [] + normalized = [] + + for code in codes: + if not code: + continue + # Purger les "null" individuels dans une liste de codes + if code.lower() == 'null': + corrections.append(f"'{code}' → supprimé") + continue + new_code, was_fixed = normalize_cim10_code(code) + if was_fixed: + corrections.append(f"'{code}' → '{new_code}'") + normalized.append(new_code) + + result = ', '.join(normalized) if normalized else None + return result, corrections + + +def autofill_codes_retenus(extraction) -> list[str]: + """ + Quand codes_retenus est vide, le remplit selon la décision : + - Défavorable → copie codes_controleurs + - Favorable → copie codes_etablissement + + Retourne la liste des corrections appliquées. + """ + fixes = [] + + if extraction.codes_retenus: + return fixes + + if extraction.decision_ucr == "Défavorable" and extraction.codes_controleurs: + extraction.codes_retenus = extraction.codes_controleurs + fixes.append(f"codes_retenus auto-rempli depuis codes_controleurs (Défavorable)") + elif extraction.decision_ucr == "Favorable" and extraction.codes_etablissement: + extraction.codes_retenus = extraction.codes_etablissement + fixes.append(f"codes_retenus auto-rempli depuis codes_etablissement (Favorable)") + + return fixes + + +def normalize_extraction(extraction) -> list[str]: + """ + Orchestre toutes les normalisations sur un OGCExtraction. + Retourne la liste des corrections appliquées. + """ + if not extraction.extraction_success: + return [] + + all_fixes = [] + + # 1. Normaliser les codes CIM-10 + for field in ('codes_etablissement', 'codes_controleurs', 'codes_retenus'): + value = getattr(extraction, field) + new_value, corrections = normalize_codes_field(value) + if corrections: + setattr(extraction, field, new_value) + for c in corrections: + all_fixes.append(f"{field}: {c}") + + # 2. Auto-remplir codes_retenus si vide + all_fixes.extend(autofill_codes_retenus(extraction)) + + # 3. Fallback texte_decision depuis _raw_block_text + raw_text = getattr(extraction, '_raw_block_text', None) + if raw_text and (not extraction.texte_decision or len(extraction.texte_decision.strip()) < 20): + # Chercher le paragraphe DECISION/PROPOSITION UCR dans le texte brut + match = re.search( + r'((?:DECISION|PROPOSITION|Décision|D[ée]cision)\s+UCR[:\s].*?)(?:\n\s*\n|\Z)', + raw_text, + re.DOTALL | re.IGNORECASE, + ) + if match and len(match.group(1).strip()) >= 20: + old = extraction.texte_decision or "(vide)" + extraction.texte_decision = match.group(1).strip() + all_fixes.append(f"texte_decision récupéré par regex (était: {old[:30]}...)") + + return all_fixes + + +def normalize_all(extractions: list) -> dict: + """ + Applique normalize_extraction à toute la liste. + + Retourne un rapport des corrections : + { + "total_fixes": int, + "details": list[str], + "by_type": {"cim10": int, "codes_retenus": int, "texte_decision": int} + } + """ + all_details = [] + by_type = {"cim10": 0, "codes_retenus": 0, "texte_decision": 0} + + for ext in extractions: + fixes = normalize_extraction(ext) + if fixes: + ogc_id = f"OGC {ext.num_ogc} (Champ {ext.champ})" + for fix in fixes: + detail = f"{ogc_id} : {fix}" + all_details.append(detail) + logger.info(f" 🔧 {detail}") + + # Catégoriser + if "→" in fix and ("codes_etablissement" in fix or "codes_controleurs" in fix or "codes_retenus:" in fix): + by_type["cim10"] += 1 + elif "codes_retenus auto-rempli" in fix: + by_type["codes_retenus"] += 1 + elif "texte_decision" in fix: + by_type["texte_decision"] += 1 + + total = len(all_details) + if total: + logger.info(f" Normalisation : {total} corrections " + f"(CIM-10: {by_type['cim10']}, codes_retenus: {by_type['codes_retenus']}, " + f"texte_decision: {by_type['texte_decision']})") + else: + logger.info(" Normalisation : aucune correction nécessaire") + + return { + "total_fixes": total, + "details": all_details, + "by_type": by_type, + } diff --git a/t2a-extractor/extractor/pdf_reader.py b/t2a-extractor/extractor/pdf_reader.py new file mode 100644 index 0000000..130168b --- /dev/null +++ b/t2a-extractor/extractor/pdf_reader.py @@ -0,0 +1,169 @@ +""" +Extraction de texte depuis des PDF natifs et scannés. +- PDF natif → pymupdf (extraction directe) +- PDF scanné → docTR (OCR) +""" +import fitz # pymupdf +import numpy as np +from PIL import Image +from pathlib import Path +from dataclasses import dataclass +import io +import logging + +from config import NATIVE_TEXT_MIN_CHARS, OCR_DPI, DOCTR_DET_ARCH, DOCTR_RECO_ARCH, OCR_MIN_CONFIDENCE + +logger = logging.getLogger(__name__) + +# Lazy loading de docTR (lourd à importer) +_doctr_predictor = None + + +def _get_doctr_predictor(): + """Charge le modèle docTR une seule fois (lazy).""" + global _doctr_predictor + if _doctr_predictor is None: + logger.info("Chargement du modèle docTR...") + from doctr.models import ocr_predictor + _doctr_predictor = ocr_predictor( + det_arch=DOCTR_DET_ARCH, + reco_arch=DOCTR_RECO_ARCH, + pretrained=True, + assume_straight_pages=True, + ) + logger.info("Modèle docTR chargé.") + return _doctr_predictor + + +@dataclass +class PageResult: + """Résultat d'extraction d'une page.""" + page_num: int + text: str + method: str # "native" ou "ocr" + confidence: float # 1.0 pour natif, score moyen docTR pour OCR + + +@dataclass +class PDFExtractionResult: + """Résultat complet d'extraction d'un PDF.""" + file_path: str + total_pages: int + pages: list # list[PageResult] + native_pages: int + ocr_pages: int + + @property + def full_text(self) -> str: + """Texte complet du document.""" + return "\n\n".join(p.text for p in self.pages if p.text.strip()) + + +def _page_to_image(page: fitz.Page, dpi: int = OCR_DPI) -> np.ndarray: + """Convertit une page PDF en image numpy (RGB) pour docTR.""" + zoom = dpi / 72 + mat = fitz.Matrix(zoom, zoom) + pix = page.get_pixmap(matrix=mat) + img = Image.open(io.BytesIO(pix.tobytes("png"))).convert("RGB") + return np.array(img) + + +def _extract_text_doctr(image_array: np.ndarray) -> tuple[str, float]: + """ + Extrait le texte d'une image via docTR. + Retourne (texte, score_confiance_moyen). + """ + predictor = _get_doctr_predictor() + result = predictor([image_array]) + + lines = [] + confidences = [] + + for page in result.pages: + for block in page.blocks: + for line in block.lines: + words = [] + for word in line.words: + if word.confidence >= OCR_MIN_CONFIDENCE: + words.append(word.value) + confidences.append(word.confidence) + if words: + lines.append(" ".join(words)) + + text = "\n".join(lines) + avg_conf = sum(confidences) / len(confidences) if confidences else 0.0 + return text, avg_conf + + +def extract_pdf(pdf_path: str | Path) -> PDFExtractionResult: + """ + Extrait le texte d'un PDF en détectant automatiquement + les pages natives vs scannées. + """ + pdf_path = Path(pdf_path) + if not pdf_path.exists(): + raise FileNotFoundError(f"PDF non trouvé : {pdf_path}") + + doc = fitz.open(str(pdf_path)) + pages = [] + native_count = 0 + ocr_count = 0 + + logger.info(f"Extraction de {pdf_path.name} ({len(doc)} pages)") + + for page_num in range(len(doc)): + page = doc[page_num] + native_text = page.get_text().strip() + + if len(native_text) >= NATIVE_TEXT_MIN_CHARS: + # Page native + pages.append(PageResult( + page_num=page_num + 1, + text=native_text, + method="native", + confidence=1.0, + )) + native_count += 1 + logger.debug(f" Page {page_num + 1}/{len(doc)} : natif ({len(native_text)} chars)") + else: + # Page scannée → OCR docTR + image = _page_to_image(page) + text, confidence = _extract_text_doctr(image) + + if text.strip(): + pages.append(PageResult( + page_num=page_num + 1, + text=text, + method="ocr", + confidence=confidence, + )) + ocr_count += 1 + logger.debug(f" Page {page_num + 1}/{len(doc)} : OCR (conf={confidence:.2f}, {len(text)} chars)") + else: + # Page vide (page de garde, séparateur, etc.) + pages.append(PageResult( + page_num=page_num + 1, + text="", + method="ocr", + confidence=0.0, + )) + ocr_count += 1 + logger.debug(f" Page {page_num + 1}/{len(doc)} : vide") + + total_pages = len(doc) + doc.close() + + result = PDFExtractionResult( + file_path=str(pdf_path), + total_pages=total_pages, + pages=pages, + native_pages=native_count, + ocr_pages=ocr_count, + ) + + logger.info( + f"Extraction terminée : {native_count} pages natives, " + f"{ocr_count} pages OCR, {len(result.full_text)} chars total" + ) + + return result diff --git a/t2a-extractor/extractor/segmenter.py b/t2a-extractor/extractor/segmenter.py new file mode 100644 index 0000000..8a8be9a --- /dev/null +++ b/t2a-extractor/extractor/segmenter.py @@ -0,0 +1,279 @@ +""" +Segmentation du texte UCR en blocs exploitables. +Découpe le texte extrait en : +- Entête (métadonnées du contrôle) +- Blocs par Champ +- Blocs par OGC (individuels et groupés) +""" +import re +import logging +from dataclasses import dataclass, field + +logger = logging.getLogger(__name__) + + +@dataclass +class OGCBlock: + """Un bloc de texte correspondant à un ou plusieurs OGC.""" + champ: int + ogc_numbers: list # list[int] — un seul pour individuel, plusieurs pour groupé + text: str + is_grouped: bool = False + + +@dataclass +class ChampBlock: + """Un bloc de texte correspondant à un Champ entier (décision globale sans OGC individuels).""" + champ: int + text: str + + +@dataclass +class SegmentationResult: + """Résultat de la segmentation.""" + header_text: str + ogc_blocks: list # list[OGCBlock] + champ_blocks: list # list[ChampBlock] — champs avec décision globale + total_ogc_count: int + + +def _clean_text(text: str) -> str: + """Nettoie le texte extrait (en-têtes/pieds de page, artefacts OCR).""" + cleaned_lines = [] + for line in text.split('\n'): + line_lower = line.lower().strip() + + # Supprimer les en-têtes/pieds de page UCR + markers = sum([ + bool(re.search(r'ucr\s*na', line_lower)), + bool(re.search(r'confidentiel', line_lower)), + bool(re.search(r'page\s*\d', line_lower)), + bool(re.search(r'p\s*a\s*g\s*e', line_lower)), + bool(re.search(r'\d+\s*[\|/]\s*\d+', line_lower)), + ]) + if markers >= 2: + continue + + # Lignes d'artefacts OCR (tirets, underscores, etc.) + if re.match(r'^[\s_\-—–\.\"\'eElL\|\]\[\(\)\{\},;:!/\\}{]{10,}$', line): + continue + + # Lignes avec trop de caractères parasites + if len(line.strip()) > 10 and len(re.findall(r'[_\-—–\|]', line)) > len(line.strip()) * 0.5: + continue + + # Lignes trop courtes (sauf nombres) + if len(line.strip()) <= 3 and not re.match(r'^\d+$', line.strip()): + continue + + cleaned_lines.append(line) + + text = '\n'.join(cleaned_lines) + # Réduire les sauts de ligne multiples + text = re.sub(r'\n{3,}', '\n\n', text) + + # Supprimer la signature UCR en fin de document (dernières 2000 chars seulement) + tail_start = max(0, len(text) - 2000) + tail = text[tail_start:] + patterns = [ + r'Le\s+\d{1,2}\s+\w+\s+\d{4}\s*\.?\s*Pour\s+.*$', + r'Pour\s+l\W{0,2}UCR.*$', + r'Pour\s+(?:I|l)\s*UCR.*$', + r'Docteur\s+\w+\s+\w+\s+Membre\s+.*$', + ] + for p in patterns: + tail = re.sub(p, '', tail, flags=re.DOTALL | re.IGNORECASE) + text = text[:tail_start] + tail + + return text.strip() + + +def _find_champ_boundaries(text: str) -> list[tuple[int, int]]: + """ + Trouve les positions de chaque Champ dans le texte. + Retourne [(position, numéro_champ), ...] trié par position. + """ + boundaries = [] + for m in re.finditer(r'Champ\s+(?:n°\s*)?(\d+)\s*[\s:–\-]', text, re.IGNORECASE): + boundaries.append((m.start(), int(m.group(1)))) + boundaries.sort(key=lambda x: x[0]) + return boundaries + + +def _get_champ_for_position(pos: int, champ_boundaries: list[tuple[int, int]]) -> int | None: + """Retourne le numéro de champ pour une position donnée dans le texte.""" + current_champ = None + for boundary_pos, champ_num in champ_boundaries: + if boundary_pos <= pos: + current_champ = champ_num + else: + break + return current_champ + + +def _extract_header(text: str, champ_boundaries: list[tuple[int, int]]) -> str: + """Extrait le texte d'en-tête (avant le premier Champ).""" + if champ_boundaries: + return text[:champ_boundaries[0][0]].strip() + return text.strip() + + +def _find_grouped_ogcs(text: str, champ_boundaries: list[tuple[int, int]]) -> list[OGCBlock]: + """ + Détecte les blocs où plusieurs OGC sont traités ensemble. + Pattern : "Concernant les OGC X,Y,Z..." + """ + results = [] + pattern = r'Concernant\s+les?\s+OGC\s+([\d\s,]+?)[\s,]*(le\s+désaccord|la\s+discussion)' + + for m in re.finditer(pattern, text, re.IGNORECASE): + nums_str = m.group(1) + ogc_nums = [int(n.strip()) for n in re.findall(r'\d+', nums_str)] + + if not ogc_nums: + continue + + # Trouver la fin du bloc + block_start = m.start() + end_offsets = [] + for next_pattern in [ + r'\nOGC\s+\d+\s*:', + r'\nConcernant\s+les?\s+OGC', + r'\nChamp\s+(?:n°\s*)?\d+', + ]: + next_match = re.search(next_pattern, text[m.end():], re.IGNORECASE) + if next_match: + end_offsets.append(m.end() + next_match.start()) + + block_end = min(end_offsets) if end_offsets else len(text) + block_text = text[block_start:block_end].strip() + + # Vérifier que ces OGC n'ont pas de bloc individuel plus loin + individually_treated = set() + for num in ogc_nums: + if re.search(rf'\bOGC\s+{num}\s*:', text[block_end:]): + individually_treated.add(num) + + grouped_only_nums = [n for n in ogc_nums if n not in individually_treated] + if not grouped_only_nums: + continue + + champ = _get_champ_for_position(block_start, champ_boundaries) + + results.append(OGCBlock( + champ=champ, + ogc_numbers=grouped_only_nums, + text=block_text, + is_grouped=True, + )) + + return results + + +def _find_individual_ogcs(text: str, champ_boundaries: list[tuple[int, int]], + already_grouped: set[int]) -> list[OGCBlock]: + """ + Détecte les blocs OGC individuels (OGC XX : ...). + Exclut les OGC déjà traités en groupe. + """ + results = [] + pattern = r'(OGC\s*:?\s*\d+\s*:?\s*.*?)(?=OGC\s*:?\s*\d+\s*:?\s|$)' + blocks = re.findall(pattern, text, re.DOTALL) + + for block in blocks: + num_match = re.search(r'OGC\s*:?\s*(\d+)', block) + if not num_match: + continue + + num = int(num_match.group(1)) + if num in already_grouped: + continue + + block_pos = text.find(block) + champ = _get_champ_for_position(block_pos, champ_boundaries) + + results.append(OGCBlock( + champ=champ, + ogc_numbers=[num], + text=block.strip(), + is_grouped=False, + )) + + return results + + +def _find_champ_level_decisions(text: str, champ_boundaries: list[tuple[int, int]]) -> list[ChampBlock]: + """ + Détecte les champs qui ont une décision globale sans OGC individuels/groupés. + """ + results = [] + for i, (pos, champ_num) in enumerate(champ_boundaries): + if i + 1 < len(champ_boundaries): + champ_text = text[pos:champ_boundaries[i + 1][0]] + else: + champ_text = text[pos:] + + # Skip si des OGC individuels/groupés existent dans ce champ + has_individual = bool(re.search(r'\bOGC\s*:?\s*\d+\s*:', champ_text)) + has_grouped = bool(re.search(r'Concernant\s+les?\s+OGC', champ_text, re.IGNORECASE)) + if has_individual or has_grouped: + continue + + # Vérifier qu'il y a bien une décision + has_decision = bool(re.search( + r'(DEC\w*ION|PROPOSITION)\s+UCR', champ_text, re.IGNORECASE + )) + if not has_decision: + continue + + results.append(ChampBlock( + champ=champ_num, + text=champ_text.strip(), + )) + + return results + + +def segment_text(text: str) -> SegmentationResult: + """ + Segmente le texte UCR complet en blocs exploitables. + """ + # Nettoyage + text = _clean_text(text) + logger.info(f"Texte nettoyé : {len(text)} caractères") + + # Trouver les limites des champs + champ_boundaries = _find_champ_boundaries(text) + logger.info(f"Champs détectés : {[num for _, num in champ_boundaries]}") + + # Entête + header = _extract_header(text, champ_boundaries) + + # OGC groupés + grouped_blocks = _find_grouped_ogcs(text, champ_boundaries) + already_grouped = set() + for block in grouped_blocks: + already_grouped.update(block.ogc_numbers) + logger.info(f"OGC groupés : {sum(len(b.ogc_numbers) for b in grouped_blocks)} OGC en {len(grouped_blocks)} groupes") + + # OGC individuels + individual_blocks = _find_individual_ogcs(text, champ_boundaries, already_grouped) + logger.info(f"OGC individuels : {len(individual_blocks)}") + + # Décisions au niveau champ + champ_blocks = _find_champ_level_decisions(text, champ_boundaries) + logger.info(f"Décisions au niveau champ : {len(champ_blocks)}") + + # Fusion et tri + all_ogc_blocks = grouped_blocks + individual_blocks + all_ogc_blocks.sort(key=lambda b: (b.champ or 0, min(b.ogc_numbers) if b.ogc_numbers else 0)) + + total_ogc = sum(len(b.ogc_numbers) for b in all_ogc_blocks) + logger.info(f"Total : {total_ogc} OGC segmentés") + + return SegmentationResult( + header_text=header, + ogc_blocks=all_ogc_blocks, + champ_blocks=champ_blocks, + total_ogc_count=total_ogc, + ) diff --git a/t2a-extractor/extractor/validator.py b/t2a-extractor/extractor/validator.py new file mode 100644 index 0000000..56b790c --- /dev/null +++ b/t2a-extractor/extractor/validator.py @@ -0,0 +1,148 @@ +""" +Validation des données extraites. +Vérifie les formats, la cohérence, et signale les anomalies. +Applique un auto-fix en safety-net via le normalizer avant validation. +""" +import re +import logging +from dataclasses import dataclass, field + +from config import DECISION_VALUES, TYPE_DESACCORD_VALUES +from extractor.normalizer import normalize_extraction + +logger = logging.getLogger(__name__) + +# Patterns de codes médicaux +CIM10_PATTERN = re.compile(r'^[A-Z]\d{2}(?:\.\d{1,2})?$') +CCAM_PATTERN = re.compile(r'^[A-Z]{4}\d{3}(?:-\d)?$') + + +@dataclass +class ValidationResult: + """Résultat de validation d'une extraction.""" + is_valid: bool + warnings: list # list[str] + errors: list # list[str] + fixes: list = field(default_factory=list) # list[str] — auto-corrections appliquées + + +def _validate_codes(codes_str: str | None, field_name: str) -> list[str]: + """Valide une chaîne de codes CIM-10/CCAM.""" + warnings = [] + if not codes_str: + return warnings + + codes = [c.strip() for c in codes_str.split(',')] + for code in codes: + if not code: + continue + is_cim10 = CIM10_PATTERN.match(code) + is_ccam = CCAM_PATTERN.match(code) + if not is_cim10 and not is_ccam: + warnings.append(f"{field_name} : code '{code}' ne correspond ni à CIM-10 ni à CCAM") + + return warnings + + +def validate_extraction(extraction) -> ValidationResult: + """ + Valide une extraction OGC. + Applique d'abord un auto-fix en safety-net via le normalizer, + puis retourne les warnings et erreurs détectés. + """ + warnings = [] + errors = [] + fixes = [] + + # Vérifier l'extraction elle-même + if not extraction.extraction_success: + errors.append(f"Extraction échouée : {extraction.error_message}") + return ValidationResult(is_valid=False, warnings=warnings, errors=errors) + + # Safety-net : auto-fix via normalizer avant validation + fixes = normalize_extraction(extraction) + + # Vérifier la décision + if extraction.decision_ucr and extraction.decision_ucr not in DECISION_VALUES: + warnings.append(f"Décision non standard : '{extraction.decision_ucr}'") + + # Vérifier le type de désaccord + if extraction.type_desaccord and extraction.type_desaccord not in TYPE_DESACCORD_VALUES: + warnings.append(f"Type désaccord non standard : '{extraction.type_desaccord}'") + + # Vérifier les codes + warnings.extend(_validate_codes(extraction.codes_etablissement, "codes_etablissement")) + warnings.extend(_validate_codes(extraction.codes_controleurs, "codes_controleurs")) + warnings.extend(_validate_codes(extraction.codes_retenus, "codes_retenus")) + + # Vérifier la cohérence décision / codes retenus + if extraction.decision_ucr == "Défavorable" and not extraction.codes_retenus: + if extraction.codes_controleurs: + warnings.append("Décision défavorable mais codes_retenus vide — les codes contrôleurs devraient être retenus") + + # Vérifier que le texte de décision n'est pas vide + if not extraction.texte_decision or len(extraction.texte_decision.strip()) < 20: + warnings.append("Texte de décision absent ou très court") + + is_valid = len(errors) == 0 + return ValidationResult(is_valid=is_valid, warnings=warnings, errors=errors, fixes=fixes) + + +def validate_all(extractions: list) -> dict: + """ + Valide toutes les extractions et retourne un rapport. + Inclut les auto-corrections appliquées par le safety-net. + """ + total = len(extractions) + valid = 0 + with_warnings = 0 + failed = 0 + all_warnings = [] + all_errors = [] + all_fixes = [] + + for ext in extractions: + result = validate_extraction(ext) + + # Collecter les auto-corrections + if result.fixes: + for f in result.fixes: + fix_msg = f"OGC {ext.num_ogc} (Champ {ext.champ}) : {f}" + all_fixes.append(fix_msg) + logger.info(f" 🔧 {fix_msg}") + + if result.is_valid: + valid += 1 + if result.warnings: + with_warnings += 1 + for w in result.warnings: + all_warnings.append(f"OGC {ext.num_ogc} (Champ {ext.champ}) : {w}") + else: + failed += 1 + for e in result.errors: + all_errors.append(f"OGC {ext.num_ogc} (Champ {ext.champ}) : {e}") + + report = { + "total": total, + "valid": valid, + "with_warnings": with_warnings, + "failed": failed, + "warnings": all_warnings, + "errors": all_errors, + "fixes": all_fixes, + "total_fixes": len(all_fixes), + } + + logger.info(f"Validation : {valid}/{total} OK, {with_warnings} avec warnings, {failed} échoués") + if all_fixes: + logger.info(f" {len(all_fixes)} auto-corrections appliquées par le safety-net") + if all_warnings: + for w in all_warnings[:10]: # Limiter l'affichage + logger.warning(f" ⚠ {w}") + if len(all_warnings) > 10: + logger.warning(f" ... et {len(all_warnings) - 10} autres warnings") + if all_errors: + for e in all_errors: + logger.error(f" ✗ {e}") + + return report diff --git a/t2a-extractor/main.py b/t2a-extractor/main.py new file mode 100644 index 0000000..e67c184 --- /dev/null +++ b/t2a-extractor/main.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 +""" +T2A Extractor — Extraction structurée de rapports UCR +Usage : python main.py [--output-dir ] [--csv] [--verbose] +""" +import argparse +import sys +import time +import logging +from pathlib import Path + +# Ajouter le répertoire du projet au path +sys.path.insert(0, str(Path(__file__).parent)) + +from config import DEFAULT_OUTPUT_DIR, OLLAMA_MODEL +from extractor.pdf_reader import extract_pdf +from extractor.segmenter import segment_text +from extractor.llm_extractor import extract_ogc_block, extract_champ_block, check_ollama_available +from extractor.normalizer import normalize_all +from extractor.validator import validate_all +from extractor.exporter import export_excel, export_csv + + +def setup_logging(verbose: bool = False): + """Configure le logging.""" + level = logging.DEBUG if verbose else logging.INFO + formatter = logging.Formatter( + '%(asctime)s [%(levelname)s] %(message)s', + datefmt='%H:%M:%S' + ) + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + root_logger = logging.getLogger() + root_logger.setLevel(level) + root_logger.addHandler(handler) + + +def main(): + parser = argparse.ArgumentParser( + description="Extraction structurée de rapports de contrôle T2A (UCR)" + ) + parser.add_argument("pdf", help="Chemin vers le fichier PDF à traiter") + parser.add_argument("--output-dir", "-o", default=None, + help="Dossier de sortie (défaut: ./output)") + parser.add_argument("--csv", action="store_true", + help="Exporter aussi en CSV") + parser.add_argument("--verbose", "-v", action="store_true", + help="Mode verbeux (debug)") + parser.add_argument("--skip-validation", action="store_true", + help="Ne pas valider les extractions") + + args = parser.parse_args() + setup_logging(args.verbose) + logger = logging.getLogger(__name__) + + pdf_path = Path(args.pdf) + if not pdf_path.exists(): + logger.error(f"Fichier non trouvé : {pdf_path}") + sys.exit(1) + + output_dir = Path(args.output_dir) if args.output_dir else DEFAULT_OUTPUT_DIR + output_dir.mkdir(parents=True, exist_ok=True) + + stem = pdf_path.stem + excel_path = output_dir / f"{stem}_ucr_extract.xlsx" + csv_path = output_dir / f"{stem}_ucr_extract.csv" + + # ============================================================ + # Étape 0 : Vérification Ollama + # ============================================================ + logger.info(f"Vérification Ollama ({OLLAMA_MODEL})...") + if not check_ollama_available(): + logger.error("Ollama non disponible. Assurez-vous que le service est démarré et le modèle chargé.") + logger.error(f" → ollama serve") + logger.error(f" → ollama pull {OLLAMA_MODEL}") + sys.exit(1) + + # ============================================================ + # Étape 1 : Extraction du texte PDF + # ============================================================ + logger.info("=" * 60) + logger.info("ÉTAPE 1 : Extraction du texte PDF") + logger.info("=" * 60) + t0 = time.time() + + pdf_result = extract_pdf(pdf_path) + logger.info(f" {pdf_result.total_pages} pages ({pdf_result.native_pages} natives, {pdf_result.ocr_pages} OCR)") + logger.info(f" {len(pdf_result.full_text)} caractères extraits en {time.time() - t0:.1f}s") + + # ============================================================ + # Étape 2 : Segmentation en blocs OGC + # ============================================================ + logger.info("=" * 60) + logger.info("ÉTAPE 2 : Segmentation en blocs OGC") + logger.info("=" * 60) + t1 = time.time() + + segments = segment_text(pdf_result.full_text) + logger.info(f" {segments.total_ogc_count} OGC détectés en {len(segments.ogc_blocks)} blocs") + logger.info(f" {len(segments.champ_blocks)} décisions au niveau champ") + logger.info(f" Segmentation en {time.time() - t1:.1f}s") + + # ============================================================ + # Étape 3 : Extraction structurée via VLM + # ============================================================ + logger.info("=" * 60) + logger.info("ÉTAPE 3 : Extraction structurée via VLM") + logger.info("=" * 60) + t2 = time.time() + + all_extractions = [] + total_blocks = len(segments.ogc_blocks) + len(segments.champ_blocks) + current = 0 + + # Extraire les blocs OGC + for block in segments.ogc_blocks: + current += 1 + ogc_str = ",".join(str(n) for n in block.ogc_numbers) + logger.info(f" [{current}/{total_blocks}] Champ {block.champ} — OGC {ogc_str}...") + + extractions = extract_ogc_block( + champ=block.champ, + ogc_numbers=block.ogc_numbers, + block_text=block.text, + ) + all_extractions.extend(extractions) + + for ext in extractions: + status = "✓" if ext.extraction_success else "✗" + logger.info(f" {status} OGC {ext.num_ogc} → {ext.decision_ucr or 'N/A'}") + + # Extraire les blocs Champ + for block in segments.champ_blocks: + current += 1 + logger.info(f" [{current}/{total_blocks}] Champ {block.champ} (décision globale)...") + + extraction = extract_champ_block( + champ=block.champ, + block_text=block.text, + ) + all_extractions.append(extraction) + + # Tri final + all_extractions.sort(key=lambda x: (x.champ or 0, x.num_ogc or 0)) + + elapsed = time.time() - t2 + logger.info(f" {len(all_extractions)} extractions en {elapsed:.1f}s ({elapsed/max(len(all_extractions),1):.1f}s/extraction)") + + # ============================================================ + # Étape 3.5 : Normalisation (post-traitement déterministe) + # ============================================================ + logger.info("=" * 60) + logger.info("ÉTAPE 3.5 : Normalisation (codes CIM-10, codes retenus, texte)") + logger.info("=" * 60) + t_norm = time.time() + + norm_report = normalize_all(all_extractions) + logger.info(f" {norm_report['total_fixes']} corrections en {time.time() - t_norm:.1f}s") + + # ============================================================ + # Étape 4 : Validation + # ============================================================ + if not args.skip_validation: + logger.info("=" * 60) + logger.info("ÉTAPE 4 : Validation") + logger.info("=" * 60) + + report = validate_all(all_extractions) + logger.info(f" {report['valid']}/{report['total']} valides, " + f"{report['with_warnings']} avec warnings, " + f"{report['failed']} échoués") + if report.get('total_fixes'): + logger.info(f" {report['total_fixes']} auto-corrections supplémentaires (safety-net)") + + # ============================================================ + # Étape 5 : Export + # ============================================================ + logger.info("=" * 60) + logger.info("ÉTAPE 5 : Export") + logger.info("=" * 60) + + n = export_excel(all_extractions, excel_path) + logger.info(f" Excel : {excel_path} ({n} lignes)") + + if args.csv: + n = export_csv(all_extractions, csv_path) + logger.info(f" CSV : {csv_path} ({n} lignes)") + + # ============================================================ + # Résumé + # ============================================================ + total_time = time.time() - t0 + logger.info("=" * 60) + logger.info("TERMINÉ") + logger.info(f" Durée totale : {total_time:.1f}s") + logger.info(f" OGC extraits : {len(all_extractions)}") + success_count = sum(1 for e in all_extractions if e.extraction_success) + logger.info(f" Succès : {success_count}/{len(all_extractions)}") + logger.info(f" Sortie : {excel_path}") + logger.info("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/t2a-extractor/requirements.txt b/t2a-extractor/requirements.txt new file mode 100644 index 0000000..ac9c1b7 --- /dev/null +++ b/t2a-extractor/requirements.txt @@ -0,0 +1,17 @@ +# T2A Extractor - Dependencies +# PDF +PyMuPDF>=1.24.0 + +# OCR +python-doctr[torch]>=0.9.0 +torch>=2.0.0 +torchvision>=0.15.0 + +# LLM +requests>=2.31.0 + +# Export +openpyxl>=3.1.0 + +# Validation (optionnel, pour usage futur) +# pydantic>=2.0.0 diff --git a/t2a-extractor/setup.sh b/t2a-extractor/setup.sh new file mode 100755 index 0000000..2cb35a9 --- /dev/null +++ b/t2a-extractor/setup.sh @@ -0,0 +1,78 @@ +#!/bin/bash +# ============================================================= +# T2A Extractor — Installation complète +# Ubuntu 24.04 — Python 3.12+ +# ============================================================= +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +VENV_DIR="$SCRIPT_DIR/.venv" + +echo "============================================" +echo " T2A Extractor — Installation" +echo "============================================" + +# --- 1. Vérifier Python --- +echo "" +echo "[1/5] Vérification de Python..." +if ! command -v python3 &>/dev/null; then + echo " ✗ Python3 non trouvé. Installation..." + sudo apt update && sudo apt install -y python3 python3-venv python3-pip +fi +PYTHON_VERSION=$(python3 --version 2>&1) +echo " ✓ $PYTHON_VERSION" + +# --- 2. Créer le venv --- +echo "" +echo "[2/5] Création de l'environnement virtuel..." +if [ -d "$VENV_DIR" ]; then + echo " → Suppression de l'ancien venv..." + rm -rf "$VENV_DIR" +fi +python3 -m venv "$VENV_DIR" +source "$VENV_DIR/bin/activate" +pip install --upgrade pip setuptools wheel -q +echo " ✓ Venv créé : $VENV_DIR" + +# --- 3. Installer les dépendances --- +echo "" +echo "[3/5] Installation des dépendances Python..." +pip install -r "$SCRIPT_DIR/requirements.txt" 2>&1 | tail -5 +echo " ✓ Dépendances installées" + +# --- 4. Vérifier Ollama --- +echo "" +echo "[4/5] Vérification d'Ollama..." +if ! command -v ollama &>/dev/null; then + echo " ⚠ Ollama non installé." + echo " → Installer avec : curl -fsSL https://ollama.com/install.sh | sh" + echo " → Puis : ollama pull gemma3:27b-cloud" +else + echo " ✓ Ollama installé : $(ollama --version 2>&1 || echo 'version inconnue')" + echo " → Assurez-vous que le modèle est chargé : ollama pull gemma3:27b-cloud" +fi + +# --- 5. Créer le dossier output --- +echo "" +echo "[5/5] Structure du projet..." +mkdir -p "$SCRIPT_DIR/output" +echo " ✓ Dossier output créé" + +# --- Résumé --- +echo "" +echo "============================================" +echo " Installation terminée !" +echo "============================================" +echo "" +echo " Activation du venv :" +echo " source $VENV_DIR/bin/activate" +echo "" +echo " Usage :" +echo " python main.py " +echo " python main.py --csv --verbose" +echo "" +echo " Avant la première utilisation :" +echo " 1. Démarrer Ollama : ollama serve" +echo " 2. Charger le modèle : ollama pull gemma3:27b-cloud" +echo " 3. Adapter config.py si nécessaire (OLLAMA_MODEL)" +echo ""