diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index ae98411..41f696c 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -192,7 +192,7 @@ RE_TEL = re.compile(r"(? str: + """Extrait le texte d'une page PyMuPDF en gérant les layouts multi-colonnes. + + Détecte si la page a un sidebar/colonne gauche parallèle à un corps droit + (typique des CRH/CRO hospitaliers). Si oui, lit chaque colonne séparément + pour éviter l'entrelacement du texte. + """ + blocks = page.get_text("blocks") + text_blocks = [] + for b in blocks: + x0, y0, x1, y1, text, _block_no, block_type = b + if block_type == 0 and text.strip(): + text_blocks.append((x0, y0, x1, y1, text.strip())) + if not text_blocks: + return "" + + page_w = page.rect.width + page_h = page.rect.height + + # --- Détection de colonnes --- + # Cherche une ligne verticale split_x qui sépare les blocs en deux groupes + # parallèles (chevauchement vertical significatif). + best_split = None + best_score = -1 + for split_x in range(int(page_w * 0.15), int(page_w * 0.45), 3): + left = [b for b in text_blocks if b[2] <= split_x + 5] + right = [b for b in text_blocks if b[0] >= split_x - 5] + crossing = [b for b in text_blocks if b[0] < split_x - 5 and b[2] > split_x + 5] + if len(left) < 3 or len(right) < 3: + continue + left_span = max(b[3] for b in left) - min(b[1] for b in left) + right_span = max(b[3] for b in right) - min(b[1] for b in right) + if left_span < page_h * 0.25 or right_span < page_h * 0.25: + continue + overlap_min = max(min(b[1] for b in left), min(b[1] for b in right)) + overlap_max = min(max(b[3] for b in left), max(b[3] for b in right)) + if overlap_max - overlap_min < page_h * 0.15: + continue + score = len(left) + len(right) - 5 * len(crossing) + if score > best_score: + best_score = score + best_split = split_x + + if best_split is not None: + left_blocks = sorted( + [b for b in text_blocks if b[2] <= best_split + 5], key=lambda b: b[1] + ) + right_blocks = sorted( + [b for b in text_blocks if b[0] >= best_split - 5], key=lambda b: b[1] + ) + full_width = sorted( + [b for b in text_blocks if b[0] < best_split - 5 and b[2] > best_split + 5], + key=lambda b: b[1], + ) + col_start_y = min( + min((b[1] for b in left_blocks), default=page_h), + min((b[1] for b in right_blocks), default=page_h), + ) + headers = [b for b in full_width if b[1] < col_start_y + 5] + footers = [b for b in full_width if b[1] >= col_start_y + 5] + parts = [] + for b in headers: + parts.append(b[4]) + for b in left_blocks: + parts.append(b[4]) + for b in right_blocks: + parts.append(b[4]) + for b in footers: + parts.append(b[4]) + return "\n".join(parts) + else: + sorted_blocks = sorted(text_blocks, key=lambda b: (b[1], b[0])) + return "\n".join(b[4] for b in sorted_blocks) + + def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool, OcrWordMap]: """Extraction texte multi-passes avec fallback OCR (docTR). Retourne (pages_text, tables_lines, ocr_used, ocr_word_map). + + Passe 1 : PyMuPDF layout-aware (blocs avec détection de colonnes) + Passe 1b: pdfplumber si PyMuPDF échoue ou donne peu de texte + Passe 2 : pdfminer si CID ou texte pauvre + Passe 3 : OCR docTR si PDF scanné (très peu de texte) + Tables : toujours extraites via pdfplumber (indépendamment du texte). """ pages_text: List[str] = [] tables_lines: List[List[str]] = [] ocr_used = False + + # --- Tables : toujours via pdfplumber --- with pdfplumber.open(pdf_path) as pdf: for p in pdf.pages: - t = p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or "" - pages_text.append(t) rows: List[str] = [] try: tables = p.extract_tables() @@ -733,28 +814,45 @@ def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List except Exception: pass tables_lines.append(rows) + + # --- Passe 1 : PyMuPDF layout-aware (détection multi-colonnes) --- + if fitz is not None: + try: + doc = fitz.open(str(pdf_path)) + pages_text = [_extract_page_layout_aware(doc[i]) for i in range(len(doc))] + doc.close() + except Exception: + pass + + # --- Passe 1b : pdfplumber si PyMuPDF n'a rien donné --- + total_chars = sum(len(x or "") for x in pages_text) + if total_chars < 500: + try: + with pdfplumber.open(pdf_path) as pdf: + pp_pages = [p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or "" for p in pdf.pages] + if sum(len(x) for x in pp_pages) > total_chars: + pages_text = pp_pages + except Exception: + pass + + # --- Passe 2 : pdfminer si CID ou texte pauvre --- total_chars = sum(len(x or "") for x in pages_text) need_fallback = total_chars < 500 if not need_fallback: need_fallback = any(CID_PATTERN.search(x or "") for x in pages_text) if need_fallback: - text_all = pdfminer_extract_text( - str(pdf_path), - laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5), - ) - split = [x for x in text_all.split("\f") if x] - if split: - pages_text = split - # 3e passe PyMuPDF si toujours pauvre/cid - total_chars = sum(len(x or "") for x in pages_text) - if (total_chars < 500 or any(CID_PATTERN.search(x or "") for x in pages_text)) and fitz is not None: try: - doc = fitz.open(str(pdf_path)) - pages_text = [doc[i].get_text("text") or "" for i in range(len(doc))] - doc.close() + text_all = pdfminer_extract_text( + str(pdf_path), + laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5), + ) + split = [x for x in text_all.split("\f") if x] + if split and sum(len(x) for x in split) > total_chars: + pages_text = split except Exception: pass - # 4e passe : OCR docTR si toujours très peu de texte (PDF scanné) + + # --- Passe 3 : OCR docTR si PDF scanné (très peu de texte) --- total_chars = sum(len(x or "") for x in pages_text) ocr_word_map: OcrWordMap = {} if total_chars < 200 and _DOCTR_AVAILABLE and fitz is not None: @@ -866,6 +964,13 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict return PLACEHOLDERS["EMAIL"] line = RE_EMAIL.sub(_repl_email, line) + # URL hospitalière (www.ch-xxx.fr, www.hopital-xxx.fr, etc.) + _re_url_hospital = re.compile(r"(?:https?://)?www\.[a-z0-9\-]+\.(?:fr|com|org)(?:/[^\s]*)?", re.IGNORECASE) + m_url = _re_url_hospital.search(line) + if m_url: + audit.append(PiiHit(page_idx, "ETAB", m_url.group(0), PLACEHOLDERS["ETAB"])) + line = line[:m_url.start()] + PLACEHOLDERS["ETAB"] + line[m_url.end():] + # TEL def _repl_tel(m: re.Match) -> str: audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"])) @@ -1443,10 +1548,10 @@ def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_nam def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit]) -> str: - """Applique les PiiHit non-NOM dans le texte (NDA footers, EPISODE, RPPS, etc.). - Ces hits sont détectés par _extract_trackare_identity mais n'étaient appliqués - qu'au PDF raster, pas au fichier .pseudonymise.txt.""" - _APPLY_KINDS = {"EPISODE", "RPPS"} + """Applique les PiiHit non-NOM dans le texte (NDA footers, EPISODE, RPPS, FINESS, etc.). + Ces hits sont détectés par _extract_trackare_identity ou la phase 0c + mais n'étaient appliqués qu'au PDF raster, pas au fichier .pseudonymise.txt.""" + _APPLY_KINDS = {"EPISODE", "RPPS", "FINESS"} # Collecter les valeurs à remplacer, groupées par placeholder replacements: Dict[str, str] = {} # original → placeholder for h in audit: @@ -1455,8 +1560,11 @@ def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit]) -> str: # Remplacer les plus longs d'abord (éviter les remplacements partiels) for original in sorted(replacements, key=len, reverse=True): placeholder = replacements[original] + escaped = re.escape(original) # Word boundary pour ne pas casser les mots (ex: ONDANSETRON) - text = re.sub(rf"\b{re.escape(original)}\b", placeholder, text) + text = re.sub(rf"\b{escaped}\b", placeholder, text) + # Aussi gérer les formats avec astérisques (*640000162*) + text = re.sub(rf"\*{escaped}\*", placeholder, text) return text @@ -1479,6 +1587,14 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] extracted_names.update(trackare_names) audit.extend(trackare_hits) + # Phase 0c : détection FINESS multiline (label et numéro sur lignes séparées, + # avec possiblement 0-2 lignes intermédiaires masquées ou vides) + _RE_FINESS_MULTILINE = re.compile( + r"(?:N°\s*)?[Ff]iness?\s*\n(?:[^\n]*\n){0,2}\s*\*?(\d{9})\*?", re.MULTILINE + ) + for m in _RE_FINESS_MULTILINE.finditer(full_raw): + audit.append(PiiHit(-1, "FINESS", m.group(1), PLACEHOLDERS["FINESS"])) + # Phase 1 : masquage ligne par ligne (regex classiques) out_pages: List[str] = [] for i, page_txt in enumerate(pages_text): @@ -1506,10 +1622,8 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] if extracted_names: text_out = _apply_extracted_names(text_out, extracted_names, audit, force_names=trackare_force_names) - # Phase 2b : application globale des PiiHit trackare (NDA footers, EPISODE, etc.) - # Ces hits sont détectés par _extract_trackare_identity mais pas encore remplacés dans le texte - if is_trackare: - text_out = _apply_trackare_hits_to_text(text_out, audit) + # Phase 2b : application globale des PiiHit (EPISODE, RPPS, FINESS) + text_out = _apply_trackare_hits_to_text(text_out, audit) return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare) @@ -2245,14 +2359,14 @@ def process_pdf( # 4b) Propagation globale SÉLECTIVE : uniquement pour les PII critiques # Les PII critiques (DATE_NAISSANCE, NIR, IPP, EMAIL) sont propagés sur toutes les pages # pour éviter les fuites sur les documents multi-pages (ex: CRO) - _CRITICAL_PII_TYPES = {"DATE_NAISSANCE", "NIR", "IPP", "EMAIL", "force_term", "force_regex"} + _CRITICAL_PII_TYPES = {"DATE_NAISSANCE", "NIR", "IPP", "EMAIL", "force_term", "force_regex", "FINESS"} _global_pii: Dict[str, set] = {} for h in anon.audit: # Collecter TOUS les types pour analyse, mais ne propager que les critiques if h.kind in {"TEL", "EMAIL", "ADRESSE", "CODE_POSTAL", "EPISODE", "RPPS", "VILLE", "ETAB", "VLM_SERVICE", "VLM_ETAB", "DATE_NAISSANCE", "NIR", "IPP", - "force_term", "force_regex"}: + "force_term", "force_regex", "FINESS"}: # Traitement spécial pour DATE_NAISSANCE : extraire la date pure et générer toutes les variations if h.kind == "DATE_NAISSANCE": # Extraire la date pure (DD/MM/YYYY ou DD/MM/YY) @@ -2404,6 +2518,14 @@ def process_pdf( # Sécurité : supprimer tout bloc [TABLES] résiduel (ne devrait plus arriver) final_text = re.sub(r"\n*\[TABLES\].*?\[/TABLES\]\n*", "\n", final_text, flags=re.DOTALL) + # Nettoyage crochets doubles : [[PLACEHOLDER]] → [PLACEHOLDER] (artefact quand + # le PDF original avait déjà des crochets autour de la valeur masquée) + _RE_BRACKET_CLEAN = re.compile( + r"\[+(\[(?:NOM|TEL|EMAIL|VILLE|ADRESSE|CODE_POSTAL|FINESS|ETABLISSEMENT|MASK|IPP|" + r"DOSSIER|NDA|EPISODE|RPPS|DATE_NAISSANCE|AGE|NIR|IBAN|OGC)\])\]+" + ) + final_text = _RE_BRACKET_CLEAN.sub(r"\1", final_text) + # Sauvegardes base = pdf_path.stem txt_path = out_dir / f"{base}.pseudonymise.txt"