""" Segmentation du texte UCR en blocs exploitables. Découpe le texte extrait en : - Entête (métadonnées du contrôle) - Blocs par Champ - Blocs par OGC (individuels et groupés) """ import re import logging from dataclasses import dataclass, field logger = logging.getLogger(__name__) @dataclass class OGCBlock: """Un bloc de texte correspondant à un ou plusieurs OGC.""" champ: int ogc_numbers: list # list[int] — un seul pour individuel, plusieurs pour groupé text: str is_grouped: bool = False @dataclass class ChampBlock: """Un bloc de texte correspondant à un Champ entier (décision globale sans OGC individuels).""" champ: int text: str @dataclass class SegmentationResult: """Résultat de la segmentation.""" header_text: str ogc_blocks: list # list[OGCBlock] champ_blocks: list # list[ChampBlock] — champs avec décision globale total_ogc_count: int def _clean_text(text: str) -> str: """Nettoie le texte extrait (en-têtes/pieds de page, artefacts OCR).""" cleaned_lines = [] for line in text.split('\n'): line_lower = line.lower().strip() # Supprimer les en-têtes/pieds de page UCR markers = sum([ bool(re.search(r'ucr\s*na', line_lower)), bool(re.search(r'confidentiel', line_lower)), bool(re.search(r'page\s*\d', line_lower)), bool(re.search(r'p\s*a\s*g\s*e', line_lower)), bool(re.search(r'\d+\s*[\|/]\s*\d+', line_lower)), ]) if markers >= 2: continue # Lignes d'artefacts OCR (tirets, underscores, etc.) if re.match(r'^[\s_\-—–\.\"\'eElL\|\]\[\(\)\{\},;:!/\\}{]{10,}$', line): continue # Lignes avec trop de caractères parasites if len(line.strip()) > 10 and len(re.findall(r'[_\-—–\|]', line)) > len(line.strip()) * 0.5: continue # Lignes trop courtes (sauf nombres) if len(line.strip()) <= 3 and not re.match(r'^\d+$', line.strip()): continue cleaned_lines.append(line) text = '\n'.join(cleaned_lines) # Réduire les sauts de ligne multiples text = re.sub(r'\n{3,}', '\n\n', text) # Supprimer la signature UCR en fin de document (dernières 2000 chars seulement) tail_start = max(0, len(text) - 2000) tail = text[tail_start:] patterns = [ r'Le\s+\d{1,2}\s+\w+\s+\d{4}\s*\.?\s*Pour\s+.*$', r'Pour\s+l\W{0,2}UCR.*$', r'Pour\s+(?:I|l)\s*UCR.*$', r'Docteur\s+\w+\s+\w+\s+Membre\s+.*$', ] for p in patterns: tail = re.sub(p, '', tail, flags=re.DOTALL | re.IGNORECASE) text = text[:tail_start] + tail return text.strip() def _find_champ_boundaries(text: str) -> list[tuple[int, int]]: """ Trouve les positions de chaque Champ dans le texte. Retourne [(position, numéro_champ), ...] trié par position. """ boundaries = [] for m in re.finditer(r'Champ\s+(?:n°\s*)?(\d+)\s*[\s:–\-]', text, re.IGNORECASE): boundaries.append((m.start(), int(m.group(1)))) boundaries.sort(key=lambda x: x[0]) return boundaries def _get_champ_for_position(pos: int, champ_boundaries: list[tuple[int, int]]) -> int | None: """Retourne le numéro de champ pour une position donnée dans le texte.""" current_champ = None for boundary_pos, champ_num in champ_boundaries: if boundary_pos <= pos: current_champ = champ_num else: break return current_champ def _extract_header(text: str, champ_boundaries: list[tuple[int, int]]) -> str: """Extrait le texte d'en-tête (avant le premier Champ).""" if champ_boundaries: return text[:champ_boundaries[0][0]].strip() return text.strip() def _find_grouped_ogcs(text: str, champ_boundaries: list[tuple[int, int]]) -> list[OGCBlock]: """ Détecte les blocs où plusieurs OGC sont traités ensemble. Pattern : "Concernant les OGC X,Y,Z..." """ results = [] pattern = r'Concernant\s+les?\s+OGC\s+([\d\s,]+?)[\s,]*(le\s+désaccord|la\s+discussion)' for m in re.finditer(pattern, text, re.IGNORECASE): nums_str = m.group(1) ogc_nums = [int(n.strip()) for n in re.findall(r'\d+', nums_str)] if not ogc_nums: continue # Trouver la fin du bloc block_start = m.start() end_offsets = [] for next_pattern in [ r'\nOGC\s+\d+\s*:', r'\nConcernant\s+les?\s+OGC', r'\nChamp\s+(?:n°\s*)?\d+', ]: next_match = re.search(next_pattern, text[m.end():], re.IGNORECASE) if next_match: end_offsets.append(m.end() + next_match.start()) block_end = min(end_offsets) if end_offsets else len(text) block_text = text[block_start:block_end].strip() # Vérifier que ces OGC n'ont pas de bloc individuel plus loin individually_treated = set() for num in ogc_nums: if re.search(rf'\bOGC\s+{num}\s*:', text[block_end:]): individually_treated.add(num) grouped_only_nums = [n for n in ogc_nums if n not in individually_treated] if not grouped_only_nums: continue champ = _get_champ_for_position(block_start, champ_boundaries) results.append(OGCBlock( champ=champ, ogc_numbers=grouped_only_nums, text=block_text, is_grouped=True, )) return results def _find_individual_ogcs(text: str, champ_boundaries: list[tuple[int, int]], already_grouped: set[int]) -> list[OGCBlock]: """ Détecte les blocs OGC individuels (OGC XX : ...). Exclut les OGC déjà traités en groupe. """ results = [] pattern = r'(OGC\s*:?\s*\d+\s*:?\s*.*?)(?=OGC\s*:?\s*\d+\s*:?\s|$)' blocks = re.findall(pattern, text, re.DOTALL) for block in blocks: num_match = re.search(r'OGC\s*:?\s*(\d+)', block) if not num_match: continue num = int(num_match.group(1)) if num in already_grouped: continue block_pos = text.find(block) champ = _get_champ_for_position(block_pos, champ_boundaries) results.append(OGCBlock( champ=champ, ogc_numbers=[num], text=block.strip(), is_grouped=False, )) return results def _find_champ_level_decisions(text: str, champ_boundaries: list[tuple[int, int]]) -> list[ChampBlock]: """ Détecte les champs qui ont une décision globale sans OGC individuels/groupés. """ results = [] for i, (pos, champ_num) in enumerate(champ_boundaries): if i + 1 < len(champ_boundaries): champ_text = text[pos:champ_boundaries[i + 1][0]] else: champ_text = text[pos:] # Skip si des OGC individuels/groupés existent dans ce champ has_individual = bool(re.search(r'\bOGC\s*:?\s*\d+\s*:', champ_text)) has_grouped = bool(re.search(r'Concernant\s+les?\s+OGC', champ_text, re.IGNORECASE)) if has_individual or has_grouped: continue # Vérifier qu'il y a bien une décision has_decision = bool(re.search( r'(DEC\w*ION|PROPOSITION)\s+UCR', champ_text, re.IGNORECASE )) if not has_decision: continue results.append(ChampBlock( champ=champ_num, text=champ_text.strip(), )) return results def segment_text(text: str) -> SegmentationResult: """ Segmente le texte UCR complet en blocs exploitables. """ # Nettoyage text = _clean_text(text) logger.info(f"Texte nettoyé : {len(text)} caractères") # Trouver les limites des champs champ_boundaries = _find_champ_boundaries(text) logger.info(f"Champs détectés : {[num for _, num in champ_boundaries]}") # Entête header = _extract_header(text, champ_boundaries) # OGC groupés grouped_blocks = _find_grouped_ogcs(text, champ_boundaries) already_grouped = set() for block in grouped_blocks: already_grouped.update(block.ogc_numbers) logger.info(f"OGC groupés : {sum(len(b.ogc_numbers) for b in grouped_blocks)} OGC en {len(grouped_blocks)} groupes") # OGC individuels individual_blocks = _find_individual_ogcs(text, champ_boundaries, already_grouped) logger.info(f"OGC individuels : {len(individual_blocks)}") # Décisions au niveau champ champ_blocks = _find_champ_level_decisions(text, champ_boundaries) logger.info(f"Décisions au niveau champ : {len(champ_blocks)}") # Fusion et tri all_ogc_blocks = grouped_blocks + individual_blocks all_ogc_blocks.sort(key=lambda b: (b.champ or 0, min(b.ogc_numbers) if b.ogc_numbers else 0)) total_ogc = sum(len(b.ogc_numbers) for b in all_ogc_blocks) logger.info(f"Total : {total_ogc} OGC segmentés") return SegmentationResult( header_text=header, ogc_blocks=all_ogc_blocks, champ_blocks=champ_blocks, total_ogc_count=total_ogc, )