feat(phase2): Extraction layout-aware multi-colonnes — 322 fuites → 0, -103 FP

Phase 2 de l'amélioration qualité anonymisation :

1. Extraction multi-colonnes (PyMuPDF layout-aware) :
   - Nouvelle fonction _extract_page_layout_aware() détecte les layouts
     sidebar+corps (typiques des CRH/CRO hospitaliers)
   - Remplace pdfplumber comme extraction primaire (PyMuPDF blocks)
   - Élimine l'entrelacement de texte entre sidebar et corps médical
   - pdfplumber conservé pour les tables et comme fallback

2. Masquage FINESS multiline :
   - Détection "N° Finess\n[...]\n640000162" (label et numéro séparés)
   - Propagation globale du numéro FINESS sur toutes les pages
   - Gestion du format *640000162* (avec astérisques Trackare)

3. Masquage URLs hospitalières (www.ch-xxx.fr)

4. Nettoyage crochets doubles [[PLACEHOLDER]] → [PLACEHOLDER]

Résultats non-régression (30 fichiers audit) :
- Fuites : 322 → 0 (-100%)
- Faux positifs : 113 → 10 (-91%)
- 0 régression fonctionnelle
- OGC 1-59 : 0 fuite soignant, 0 FINESS, 0 lieu de naissance

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 18:19:08 +01:00
parent bc2fe667a0
commit e967a67052
55 changed files with 38506 additions and 18343 deletions

View File

@@ -192,7 +192,7 @@ RE_TEL = re.compile(r"(?<!\d)(?:\+33\s?|0)\d(?:[\s.\-]?\d){8}(?!\d)")
RE_TEL_COMPACT = re.compile(r"(?<!\d)0[1-9]\d{8}(?!\d)")
RE_IBAN = re.compile(r"\b[A-Z]{2}\d{2}(?:\s?[A-Z0-9]{4}){3,7}(?:\s?[A-Z0-9]{1,4})\b")
RE_IPP = re.compile(r"\bIPP\s*[:\-]?\s*([A-Za-z0-9]{6,})\b", re.IGNORECASE)
RE_FINESS = re.compile(r"\bFINESS\s*[:\-]?\s*(\d{9})\b", re.IGNORECASE)
RE_FINESS = re.compile(r"\b(?:N°\s*)?FINESS?\s*[:\-]?\s*(\d{9})\b", re.IGNORECASE)
RE_OGC = re.compile(r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,})\b", re.IGNORECASE)
RE_RPPS = re.compile(r"\b(?:N°\s*)?RPPS\s*[:\-]?\s*(\d{8,11})\b", re.IGNORECASE)
RE_NIR = re.compile(
@@ -712,17 +712,98 @@ def _get_doctr_model():
)
return _doctr_model_cache
def _extract_page_layout_aware(page) -> str:
"""Extrait le texte d'une page PyMuPDF en gérant les layouts multi-colonnes.
Détecte si la page a un sidebar/colonne gauche parallèle à un corps droit
(typique des CRH/CRO hospitaliers). Si oui, lit chaque colonne séparément
pour éviter l'entrelacement du texte.
"""
blocks = page.get_text("blocks")
text_blocks = []
for b in blocks:
x0, y0, x1, y1, text, _block_no, block_type = b
if block_type == 0 and text.strip():
text_blocks.append((x0, y0, x1, y1, text.strip()))
if not text_blocks:
return ""
page_w = page.rect.width
page_h = page.rect.height
# --- Détection de colonnes ---
# Cherche une ligne verticale split_x qui sépare les blocs en deux groupes
# parallèles (chevauchement vertical significatif).
best_split = None
best_score = -1
for split_x in range(int(page_w * 0.15), int(page_w * 0.45), 3):
left = [b for b in text_blocks if b[2] <= split_x + 5]
right = [b for b in text_blocks if b[0] >= split_x - 5]
crossing = [b for b in text_blocks if b[0] < split_x - 5 and b[2] > split_x + 5]
if len(left) < 3 or len(right) < 3:
continue
left_span = max(b[3] for b in left) - min(b[1] for b in left)
right_span = max(b[3] for b in right) - min(b[1] for b in right)
if left_span < page_h * 0.25 or right_span < page_h * 0.25:
continue
overlap_min = max(min(b[1] for b in left), min(b[1] for b in right))
overlap_max = min(max(b[3] for b in left), max(b[3] for b in right))
if overlap_max - overlap_min < page_h * 0.15:
continue
score = len(left) + len(right) - 5 * len(crossing)
if score > best_score:
best_score = score
best_split = split_x
if best_split is not None:
left_blocks = sorted(
[b for b in text_blocks if b[2] <= best_split + 5], key=lambda b: b[1]
)
right_blocks = sorted(
[b for b in text_blocks if b[0] >= best_split - 5], key=lambda b: b[1]
)
full_width = sorted(
[b for b in text_blocks if b[0] < best_split - 5 and b[2] > best_split + 5],
key=lambda b: b[1],
)
col_start_y = min(
min((b[1] for b in left_blocks), default=page_h),
min((b[1] for b in right_blocks), default=page_h),
)
headers = [b for b in full_width if b[1] < col_start_y + 5]
footers = [b for b in full_width if b[1] >= col_start_y + 5]
parts = []
for b in headers:
parts.append(b[4])
for b in left_blocks:
parts.append(b[4])
for b in right_blocks:
parts.append(b[4])
for b in footers:
parts.append(b[4])
return "\n".join(parts)
else:
sorted_blocks = sorted(text_blocks, key=lambda b: (b[1], b[0]))
return "\n".join(b[4] for b in sorted_blocks)
def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool, OcrWordMap]:
"""Extraction texte multi-passes avec fallback OCR (docTR).
Retourne (pages_text, tables_lines, ocr_used, ocr_word_map).
Passe 1 : PyMuPDF layout-aware (blocs avec détection de colonnes)
Passe 1b: pdfplumber si PyMuPDF échoue ou donne peu de texte
Passe 2 : pdfminer si CID ou texte pauvre
Passe 3 : OCR docTR si PDF scanné (très peu de texte)
Tables : toujours extraites via pdfplumber (indépendamment du texte).
"""
pages_text: List[str] = []
tables_lines: List[List[str]] = []
ocr_used = False
# --- Tables : toujours via pdfplumber ---
with pdfplumber.open(pdf_path) as pdf:
for p in pdf.pages:
t = p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or ""
pages_text.append(t)
rows: List[str] = []
try:
tables = p.extract_tables()
@@ -733,28 +814,45 @@ def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List
except Exception:
pass
tables_lines.append(rows)
# --- Passe 1 : PyMuPDF layout-aware (détection multi-colonnes) ---
if fitz is not None:
try:
doc = fitz.open(str(pdf_path))
pages_text = [_extract_page_layout_aware(doc[i]) for i in range(len(doc))]
doc.close()
except Exception:
pass
# --- Passe 1b : pdfplumber si PyMuPDF n'a rien donné ---
total_chars = sum(len(x or "") for x in pages_text)
if total_chars < 500:
try:
with pdfplumber.open(pdf_path) as pdf:
pp_pages = [p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or "" for p in pdf.pages]
if sum(len(x) for x in pp_pages) > total_chars:
pages_text = pp_pages
except Exception:
pass
# --- Passe 2 : pdfminer si CID ou texte pauvre ---
total_chars = sum(len(x or "") for x in pages_text)
need_fallback = total_chars < 500
if not need_fallback:
need_fallback = any(CID_PATTERN.search(x or "") for x in pages_text)
if need_fallback:
text_all = pdfminer_extract_text(
str(pdf_path),
laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5),
)
split = [x for x in text_all.split("\f") if x]
if split:
pages_text = split
# 3e passe PyMuPDF si toujours pauvre/cid
total_chars = sum(len(x or "") for x in pages_text)
if (total_chars < 500 or any(CID_PATTERN.search(x or "") for x in pages_text)) and fitz is not None:
try:
doc = fitz.open(str(pdf_path))
pages_text = [doc[i].get_text("text") or "" for i in range(len(doc))]
doc.close()
text_all = pdfminer_extract_text(
str(pdf_path),
laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5),
)
split = [x for x in text_all.split("\f") if x]
if split and sum(len(x) for x in split) > total_chars:
pages_text = split
except Exception:
pass
# 4e passe : OCR docTR si toujours très peu de texte (PDF scanné)
# --- Passe 3 : OCR docTR si PDF scanné (très peu de texte) ---
total_chars = sum(len(x or "") for x in pages_text)
ocr_word_map: OcrWordMap = {}
if total_chars < 200 and _DOCTR_AVAILABLE and fitz is not None:
@@ -866,6 +964,13 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
return PLACEHOLDERS["EMAIL"]
line = RE_EMAIL.sub(_repl_email, line)
# URL hospitalière (www.ch-xxx.fr, www.hopital-xxx.fr, etc.)
_re_url_hospital = re.compile(r"(?:https?://)?www\.[a-z0-9\-]+\.(?:fr|com|org)(?:/[^\s]*)?", re.IGNORECASE)
m_url = _re_url_hospital.search(line)
if m_url:
audit.append(PiiHit(page_idx, "ETAB", m_url.group(0), PLACEHOLDERS["ETAB"]))
line = line[:m_url.start()] + PLACEHOLDERS["ETAB"] + line[m_url.end():]
# TEL
def _repl_tel(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"]))
@@ -1443,10 +1548,10 @@ def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_nam
def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit]) -> str:
"""Applique les PiiHit non-NOM dans le texte (NDA footers, EPISODE, RPPS, etc.).
Ces hits sont détectés par _extract_trackare_identity mais n'étaient appliqués
qu'au PDF raster, pas au fichier .pseudonymise.txt."""
_APPLY_KINDS = {"EPISODE", "RPPS"}
"""Applique les PiiHit non-NOM dans le texte (NDA footers, EPISODE, RPPS, FINESS, etc.).
Ces hits sont détectés par _extract_trackare_identity ou la phase 0c
mais n'étaient appliqués qu'au PDF raster, pas au fichier .pseudonymise.txt."""
_APPLY_KINDS = {"EPISODE", "RPPS", "FINESS"}
# Collecter les valeurs à remplacer, groupées par placeholder
replacements: Dict[str, str] = {} # original → placeholder
for h in audit:
@@ -1455,8 +1560,11 @@ def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit]) -> str:
# Remplacer les plus longs d'abord (éviter les remplacements partiels)
for original in sorted(replacements, key=len, reverse=True):
placeholder = replacements[original]
escaped = re.escape(original)
# Word boundary pour ne pas casser les mots (ex: ONDANSETRON)
text = re.sub(rf"\b{re.escape(original)}\b", placeholder, text)
text = re.sub(rf"\b{escaped}\b", placeholder, text)
# Aussi gérer les formats avec astérisques (*640000162*)
text = re.sub(rf"\*{escaped}\*", placeholder, text)
return text
@@ -1479,6 +1587,14 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
extracted_names.update(trackare_names)
audit.extend(trackare_hits)
# Phase 0c : détection FINESS multiline (label et numéro sur lignes séparées,
# avec possiblement 0-2 lignes intermédiaires masquées ou vides)
_RE_FINESS_MULTILINE = re.compile(
r"(?:N°\s*)?[Ff]iness?\s*\n(?:[^\n]*\n){0,2}\s*\*?(\d{9})\*?", re.MULTILINE
)
for m in _RE_FINESS_MULTILINE.finditer(full_raw):
audit.append(PiiHit(-1, "FINESS", m.group(1), PLACEHOLDERS["FINESS"]))
# Phase 1 : masquage ligne par ligne (regex classiques)
out_pages: List[str] = []
for i, page_txt in enumerate(pages_text):
@@ -1506,10 +1622,8 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
if extracted_names:
text_out = _apply_extracted_names(text_out, extracted_names, audit, force_names=trackare_force_names)
# Phase 2b : application globale des PiiHit trackare (NDA footers, EPISODE, etc.)
# Ces hits sont détectés par _extract_trackare_identity mais pas encore remplacés dans le texte
if is_trackare:
text_out = _apply_trackare_hits_to_text(text_out, audit)
# Phase 2b : application globale des PiiHit (EPISODE, RPPS, FINESS)
text_out = _apply_trackare_hits_to_text(text_out, audit)
return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare)
@@ -2245,14 +2359,14 @@ def process_pdf(
# 4b) Propagation globale SÉLECTIVE : uniquement pour les PII critiques
# Les PII critiques (DATE_NAISSANCE, NIR, IPP, EMAIL) sont propagés sur toutes les pages
# pour éviter les fuites sur les documents multi-pages (ex: CRO)
_CRITICAL_PII_TYPES = {"DATE_NAISSANCE", "NIR", "IPP", "EMAIL", "force_term", "force_regex"}
_CRITICAL_PII_TYPES = {"DATE_NAISSANCE", "NIR", "IPP", "EMAIL", "force_term", "force_regex", "FINESS"}
_global_pii: Dict[str, set] = {}
for h in anon.audit:
# Collecter TOUS les types pour analyse, mais ne propager que les critiques
if h.kind in {"TEL", "EMAIL", "ADRESSE", "CODE_POSTAL", "EPISODE", "RPPS", "VILLE", "ETAB",
"VLM_SERVICE", "VLM_ETAB", "DATE_NAISSANCE", "NIR", "IPP",
"force_term", "force_regex"}:
"force_term", "force_regex", "FINESS"}:
# Traitement spécial pour DATE_NAISSANCE : extraire la date pure et générer toutes les variations
if h.kind == "DATE_NAISSANCE":
# Extraire la date pure (DD/MM/YYYY ou DD/MM/YY)
@@ -2404,6 +2518,14 @@ def process_pdf(
# Sécurité : supprimer tout bloc [TABLES] résiduel (ne devrait plus arriver)
final_text = re.sub(r"\n*\[TABLES\].*?\[/TABLES\]\n*", "\n", final_text, flags=re.DOTALL)
# Nettoyage crochets doubles : [[PLACEHOLDER]] → [PLACEHOLDER] (artefact quand
# le PDF original avait déjà des crochets autour de la valeur masquée)
_RE_BRACKET_CLEAN = re.compile(
r"\[+(\[(?:NOM|TEL|EMAIL|VILLE|ADRESSE|CODE_POSTAL|FINESS|ETABLISSEMENT|MASK|IPP|"
r"DOSSIER|NDA|EPISODE|RPPS|DATE_NAISSANCE|AGE|NIR|IBAN|OGC)\])\]+"
)
final_text = _RE_BRACKET_CLEAN.sub(r"\1", final_text)
# Sauvegardes
base = pdf_path.stem
txt_path = out_dir / f"{base}.pseudonymise.txt"