Files
t2a/analyze_pdfs.py
dom 4a12cd2676 feat: pipeline T2A - anonymisation, extraction CIM-10 et intégration edsnlp
Pipeline complet de traitement de documents médicaux PDF :
- Extraction texte (pdfplumber) et classification (Trackare/CRH)
- Anonymisation multi-couche (regex + NER CamemBERT + sweep)
- Extraction médicale CIM-10 hybride : edsnlp (AP-HP) enrichit les
  diagnostics, médicaments (codes ATC via Romedi) et négation,
  avec fallback regex pour les patterns spécifiques
- Fix sentencepiece pinné à <0.2.0 pour compatibilité CamemBERT

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 15:24:12 +01:00

255 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""
Analyse structurelle detaillee des PDFs dans /home/dom/ai/t2a/input/
Utilise pdfplumber pour extraire texte, tableaux, headers et donnees personnelles.
"""
import pdfplumber
import os
import re
INPUT_DIR = "/home/dom/ai/t2a/input/"
REPORT_FILE = "/home/dom/ai/t2a/rapport_analyse_pdfs.md"
# Patterns pour detecter des donnees personnelles
PATTERNS = {
"telephone": re.compile(r'(?:\+?\d{1,3}[\s.-]?)?\(?\d{2,4}\)?[\s.-]?\d{2,4}[\s.-]?\d{2,4}[\s.-]?\d{0,4}'),
"email": re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'),
"code_postal": re.compile(r'\b\d{5}\b'),
"numero_dossier": re.compile(r'\b\d{7,10}\b'),
"date": re.compile(r'\b\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4}\b'),
"montant_euro": re.compile(r'\d+[\s.,]?\d*\s*[€]|\d+[\s.,]?\d*\s*EUR'),
}
def analyze_pdf(filepath):
"""Analyse complete d'un PDF."""
result = {
"filename": os.path.basename(filepath),
"filepath": filepath,
"pages": [],
"tables_all": [],
"full_text": "",
"headers_detected": [],
"personal_data": {},
"metadata": {},
}
with pdfplumber.open(filepath) as pdf:
result["metadata"] = {
"num_pages": len(pdf.pages),
"pdf_metadata": pdf.metadata if pdf.metadata else {},
}
for i, page in enumerate(pdf.pages):
page_info = {
"page_num": i + 1,
"width": page.width,
"height": page.height,
"text": "",
"tables": [],
"lines_count": 0,
"chars_count": 0,
"rects_count": 0,
"images_count": 0,
}
text = page.extract_text() or ""
page_info["text"] = text
page_info["lines_count"] = len(text.split('\n')) if text else 0
page_info["chars_count"] = len(page.chars) if page.chars else 0
page_info["rects_count"] = len(page.rects) if page.rects else 0
page_info["images_count"] = len(page.images) if page.images else 0
tables = page.extract_tables() or []
for t_idx, table in enumerate(tables):
table_info = {
"table_index": t_idx,
"page": i + 1,
"rows": len(table),
"cols": max(len(row) for row in table) if table else 0,
"data": table,
"header_row": table[0] if table else [],
}
page_info["tables"].append(table_info)
result["tables_all"].append(table_info)
result["pages"].append(page_info)
result["full_text"] += f"\n--- PAGE {i+1} ---\n{text}\n"
# Detecter les headers/sections
for line in result["full_text"].split('\n'):
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("--- PAGE"):
continue
if len(stripped) >= 3 and stripped == stripped.upper() and any(c.isalpha() for c in stripped):
result["headers_detected"].append(stripped)
elif len(stripped) < 80 and stripped[0].isupper() and ':' in stripped:
result["headers_detected"].append(stripped)
# Detecter les donnees personnelles
for pattern_name, pattern in PATTERNS.items():
matches = pattern.findall(result["full_text"])
if matches:
unique_matches = list(set(m.strip() for m in matches if len(m.strip()) > 3))
if unique_matches:
result["personal_data"][pattern_name] = unique_matches
return result
def format_table_for_md(table_data, max_rows=30):
"""Formate un tableau en Markdown."""
if not table_data:
return "_Tableau vide_"
lines = []
max_cols = max(len(row) for row in table_data)
normalized = []
for row in table_data[:max_rows]:
norm_row = []
for j in range(max_cols):
if j < len(row) and row[j] is not None:
cell = str(row[j]).replace('\n', ' ').replace('|', '/').strip()
norm_row.append(cell if cell else "")
else:
norm_row.append("")
normalized.append(norm_row)
lines.append("| " + " | ".join(normalized[0]) + " |")
lines.append("| " + " | ".join(["---"] * max_cols) + " |")
for row in normalized[1:]:
lines.append("| " + " | ".join(row) + " |")
if len(table_data) > max_rows:
lines.append(f"\n_... ({len(table_data) - max_rows} lignes supplementaires non affichees)_")
return "\n".join(lines)
def generate_report(analyses):
"""Genere le rapport Markdown."""
report = []
report.append("# Rapport d'analyse structurelle des PDFs")
report.append(f"\n**Repertoire analyse :** `{INPUT_DIR}`")
report.append(f"**Nombre de fichiers :** {len(analyses)}")
report.append("")
for idx, analysis in enumerate(analyses, 1):
report.append(f"\n{'='*80}")
report.append(f"## {idx}. {analysis['filename']}")
report.append(f"{'='*80}\n")
meta = analysis["metadata"]
report.append("### Metadonnees du PDF")
report.append(f"- **Nombre de pages :** {meta['num_pages']}")
if meta.get("pdf_metadata"):
for k, v in meta["pdf_metadata"].items():
if v:
report.append(f"- **{k} :** {v}")
report.append("")
report.append("### Structure par page")
for page in analysis["pages"]:
report.append(f"\n#### Page {page['page_num']}")
report.append(f"- **Dimensions :** {page['width']} x {page['height']} pts")
report.append(f"- **Lignes de texte :** {page['lines_count']}")
report.append(f"- **Caracteres (objets) :** {page['chars_count']}")
report.append(f"- **Rectangles :** {page['rects_count']}")
report.append(f"- **Images :** {page['images_count']}")
report.append(f"- **Tableaux detectes :** {len(page['tables'])}")
report.append("")
report.append("### Texte complet extrait")
report.append("```")
report.append(analysis["full_text"].strip())
report.append("```")
report.append("")
if analysis["tables_all"]:
report.append(f"### Tableaux detectes ({len(analysis['tables_all'])} au total)")
for t in analysis["tables_all"]:
report.append(f"\n#### Tableau {t['table_index']+1} (Page {t['page']}) - {t['rows']} lignes x {t['cols']} colonnes")
report.append("")
report.append(format_table_for_md(t["data"]))
report.append("")
else:
report.append("### Tableaux detectes")
report.append("_Aucun tableau detecte par pdfplumber._\n")
report.append("### Sections / Headers identifies")
if analysis["headers_detected"]:
seen = set()
for h in analysis["headers_detected"]:
if h not in seen:
report.append(f"- `{h}`")
seen.add(h)
else:
report.append("_Aucun header identifie._")
report.append("")
report.append("### Donnees personnelles detectees")
if analysis["personal_data"]:
for category, values in analysis["personal_data"].items():
report.append(f"\n**{category.replace('_', ' ').title()} :**")
for v in sorted(values):
report.append(f"- `{v}`")
else:
report.append("_Aucune donnee personnelle detectee._")
report.append("")
report.append(f"\n{'='*80}")
report.append("## Resume comparatif")
report.append(f"{'='*80}\n")
report.append("| Caracteristique | " + " | ".join(a["filename"] for a in analyses) + " |")
report.append("| --- | " + " | ".join(["---"] * len(analyses)) + " |")
report.append("| Pages | " + " | ".join(str(a["metadata"]["num_pages"]) for a in analyses) + " |")
report.append("| Tableaux | " + " | ".join(str(len(a["tables_all"])) for a in analyses) + " |")
report.append("| Headers | " + " | ".join(str(len(set(a["headers_detected"]))) for a in analyses) + " |")
report.append("| Longueur texte | " + " | ".join(str(len(a["full_text"])) + " chars" for a in analyses) + " |")
return "\n".join(report)
def main():
pdf_files = sorted([
os.path.join(INPUT_DIR, f)
for f in os.listdir(INPUT_DIR)
if f.lower().endswith('.pdf')
])
print(f"Fichiers PDF trouves : {len(pdf_files)}")
for f in pdf_files:
print(f" - {f}")
analyses = []
for filepath in pdf_files:
print(f"\nAnalyse de : {os.path.basename(filepath)} ...")
analysis = analyze_pdf(filepath)
analyses.append(analysis)
print(f" Pages: {analysis['metadata']['num_pages']}")
print(f" Tableaux: {len(analysis['tables_all'])}")
print(f" Headers: {len(set(analysis['headers_detected']))}")
print(f" Texte: {len(analysis['full_text'])} chars")
report = generate_report(analyses)
with open(REPORT_FILE, "w", encoding="utf-8") as f:
f.write(report)
print(f"\n{'='*60}")
print(f"Rapport ecrit dans : {REPORT_FILE}")
print(f"{'='*60}")
print("\n")
print(report)
if __name__ == "__main__":
main()