Files
t2a-extractor/extractor/segmenter.py
dom f70d138db3 feat: T2A-Extractor pipeline with CIM-10 normalizer (31→0 warnings)
Initial commit with full extraction pipeline: PDF OCR (docTR), text
segmentation, LLM extraction (Ollama), deterministic post-processing
normalizer, validation, and Excel/CSV export.

The normalizer fixes OCR/LLM errors on CIM-10 codes:
- OCR digit→letter confusion in position 1 (1→I, 0→O, 5→S, 2→Z, 8→B)
- Missing dot separator (F050→F05.0, R410→R41.0)
- '+' instead of '.' (B99+1→B99.1, J961+0→J96.10)
- Excess decimals (Z04.880→Z04.88)
- OCR letter→digit in positions 2-3 (LO2.2→L02.2)
- Literal "null" string purge
- Auto-fill codes_retenus from decision context

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 20:44:32 +01:00

280 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Segmentation du texte UCR en blocs exploitables.
Découpe le texte extrait en :
- Entête (métadonnées du contrôle)
- Blocs par Champ
- Blocs par OGC (individuels et groupés)
"""
import re
import logging
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class OGCBlock:
"""Un bloc de texte correspondant à un ou plusieurs OGC."""
champ: int
ogc_numbers: list # list[int] — un seul pour individuel, plusieurs pour groupé
text: str
is_grouped: bool = False
@dataclass
class ChampBlock:
"""Un bloc de texte correspondant à un Champ entier (décision globale sans OGC individuels)."""
champ: int
text: str
@dataclass
class SegmentationResult:
"""Résultat de la segmentation."""
header_text: str
ogc_blocks: list # list[OGCBlock]
champ_blocks: list # list[ChampBlock] — champs avec décision globale
total_ogc_count: int
def _clean_text(text: str) -> str:
"""Nettoie le texte extrait (en-têtes/pieds de page, artefacts OCR)."""
cleaned_lines = []
for line in text.split('\n'):
line_lower = line.lower().strip()
# Supprimer les en-têtes/pieds de page UCR
markers = sum([
bool(re.search(r'ucr\s*na', line_lower)),
bool(re.search(r'confidentiel', line_lower)),
bool(re.search(r'page\s*\d', line_lower)),
bool(re.search(r'p\s*a\s*g\s*e', line_lower)),
bool(re.search(r'\d+\s*[\|/]\s*\d+', line_lower)),
])
if markers >= 2:
continue
# Lignes d'artefacts OCR (tirets, underscores, etc.)
if re.match(r'^[\s_\-—–\.\"\'eElL\|\]\[\(\)\{\},;:!/\\}{]{10,}$', line):
continue
# Lignes avec trop de caractères parasites
if len(line.strip()) > 10 and len(re.findall(r'[_\-—–\|]', line)) > len(line.strip()) * 0.5:
continue
# Lignes trop courtes (sauf nombres)
if len(line.strip()) <= 3 and not re.match(r'^\d+$', line.strip()):
continue
cleaned_lines.append(line)
text = '\n'.join(cleaned_lines)
# Réduire les sauts de ligne multiples
text = re.sub(r'\n{3,}', '\n\n', text)
# Supprimer la signature UCR en fin de document (dernières 2000 chars seulement)
tail_start = max(0, len(text) - 2000)
tail = text[tail_start:]
patterns = [
r'Le\s+\d{1,2}\s+\w+\s+\d{4}\s*\.?\s*Pour\s+.*$',
r'Pour\s+l\W{0,2}UCR.*$',
r'Pour\s+(?:I|l)\s*UCR.*$',
r'Docteur\s+\w+\s+\w+\s+Membre\s+.*$',
]
for p in patterns:
tail = re.sub(p, '', tail, flags=re.DOTALL | re.IGNORECASE)
text = text[:tail_start] + tail
return text.strip()
def _find_champ_boundaries(text: str) -> list[tuple[int, int]]:
"""
Trouve les positions de chaque Champ dans le texte.
Retourne [(position, numéro_champ), ...] trié par position.
"""
boundaries = []
for m in re.finditer(r'Champ\s+(?:n°\s*)?(\d+)\s*[\s:\-]', text, re.IGNORECASE):
boundaries.append((m.start(), int(m.group(1))))
boundaries.sort(key=lambda x: x[0])
return boundaries
def _get_champ_for_position(pos: int, champ_boundaries: list[tuple[int, int]]) -> int | None:
"""Retourne le numéro de champ pour une position donnée dans le texte."""
current_champ = None
for boundary_pos, champ_num in champ_boundaries:
if boundary_pos <= pos:
current_champ = champ_num
else:
break
return current_champ
def _extract_header(text: str, champ_boundaries: list[tuple[int, int]]) -> str:
"""Extrait le texte d'en-tête (avant le premier Champ)."""
if champ_boundaries:
return text[:champ_boundaries[0][0]].strip()
return text.strip()
def _find_grouped_ogcs(text: str, champ_boundaries: list[tuple[int, int]]) -> list[OGCBlock]:
"""
Détecte les blocs où plusieurs OGC sont traités ensemble.
Pattern : "Concernant les OGC X,Y,Z..."
"""
results = []
pattern = r'Concernant\s+les?\s+OGC\s+([\d\s,]+?)[\s,]*(le\s+désaccord|la\s+discussion)'
for m in re.finditer(pattern, text, re.IGNORECASE):
nums_str = m.group(1)
ogc_nums = [int(n.strip()) for n in re.findall(r'\d+', nums_str)]
if not ogc_nums:
continue
# Trouver la fin du bloc
block_start = m.start()
end_offsets = []
for next_pattern in [
r'\nOGC\s+\d+\s*:',
r'\nConcernant\s+les?\s+OGC',
r'\nChamp\s+(?:n°\s*)?\d+',
]:
next_match = re.search(next_pattern, text[m.end():], re.IGNORECASE)
if next_match:
end_offsets.append(m.end() + next_match.start())
block_end = min(end_offsets) if end_offsets else len(text)
block_text = text[block_start:block_end].strip()
# Vérifier que ces OGC n'ont pas de bloc individuel plus loin
individually_treated = set()
for num in ogc_nums:
if re.search(rf'\bOGC\s+{num}\s*:', text[block_end:]):
individually_treated.add(num)
grouped_only_nums = [n for n in ogc_nums if n not in individually_treated]
if not grouped_only_nums:
continue
champ = _get_champ_for_position(block_start, champ_boundaries)
results.append(OGCBlock(
champ=champ,
ogc_numbers=grouped_only_nums,
text=block_text,
is_grouped=True,
))
return results
def _find_individual_ogcs(text: str, champ_boundaries: list[tuple[int, int]],
already_grouped: set[int]) -> list[OGCBlock]:
"""
Détecte les blocs OGC individuels (OGC XX : ...).
Exclut les OGC déjà traités en groupe.
"""
results = []
pattern = r'(OGC\s*:?\s*\d+\s*:?\s*.*?)(?=OGC\s*:?\s*\d+\s*:?\s|$)'
blocks = re.findall(pattern, text, re.DOTALL)
for block in blocks:
num_match = re.search(r'OGC\s*:?\s*(\d+)', block)
if not num_match:
continue
num = int(num_match.group(1))
if num in already_grouped:
continue
block_pos = text.find(block)
champ = _get_champ_for_position(block_pos, champ_boundaries)
results.append(OGCBlock(
champ=champ,
ogc_numbers=[num],
text=block.strip(),
is_grouped=False,
))
return results
def _find_champ_level_decisions(text: str, champ_boundaries: list[tuple[int, int]]) -> list[ChampBlock]:
"""
Détecte les champs qui ont une décision globale sans OGC individuels/groupés.
"""
results = []
for i, (pos, champ_num) in enumerate(champ_boundaries):
if i + 1 < len(champ_boundaries):
champ_text = text[pos:champ_boundaries[i + 1][0]]
else:
champ_text = text[pos:]
# Skip si des OGC individuels/groupés existent dans ce champ
has_individual = bool(re.search(r'\bOGC\s*:?\s*\d+\s*:', champ_text))
has_grouped = bool(re.search(r'Concernant\s+les?\s+OGC', champ_text, re.IGNORECASE))
if has_individual or has_grouped:
continue
# Vérifier qu'il y a bien une décision
has_decision = bool(re.search(
r'(DEC\w*ION|PROPOSITION)\s+UCR', champ_text, re.IGNORECASE
))
if not has_decision:
continue
results.append(ChampBlock(
champ=champ_num,
text=champ_text.strip(),
))
return results
def segment_text(text: str) -> SegmentationResult:
"""
Segmente le texte UCR complet en blocs exploitables.
"""
# Nettoyage
text = _clean_text(text)
logger.info(f"Texte nettoyé : {len(text)} caractères")
# Trouver les limites des champs
champ_boundaries = _find_champ_boundaries(text)
logger.info(f"Champs détectés : {[num for _, num in champ_boundaries]}")
# Entête
header = _extract_header(text, champ_boundaries)
# OGC groupés
grouped_blocks = _find_grouped_ogcs(text, champ_boundaries)
already_grouped = set()
for block in grouped_blocks:
already_grouped.update(block.ogc_numbers)
logger.info(f"OGC groupés : {sum(len(b.ogc_numbers) for b in grouped_blocks)} OGC en {len(grouped_blocks)} groupes")
# OGC individuels
individual_blocks = _find_individual_ogcs(text, champ_boundaries, already_grouped)
logger.info(f"OGC individuels : {len(individual_blocks)}")
# Décisions au niveau champ
champ_blocks = _find_champ_level_decisions(text, champ_boundaries)
logger.info(f"Décisions au niveau champ : {len(champ_blocks)}")
# Fusion et tri
all_ogc_blocks = grouped_blocks + individual_blocks
all_ogc_blocks.sort(key=lambda b: (b.champ or 0, min(b.ogc_numbers) if b.ogc_numbers else 0))
total_ogc = sum(len(b.ogc_numbers) for b in all_ogc_blocks)
logger.info(f"Total : {total_ogc} OGC segmentés")
return SegmentationResult(
header_text=header,
ogc_blocks=all_ogc_blocks,
champ_blocks=champ_blocks,
total_ogc_count=total_ogc,
)