Files
rpa_vision_v3/core/matching/hierarchical_matcher.py
Dom cf495dd82f feat: chat unifié, GestureCatalog, Copilot, Léa UI, extraction données, vérification replay
Refonte majeure du système Agent Chat et ajout de nombreux modules :

- Chat unifié : suppression du dual Workflows/Agent Libre, tout passe par /api/chat
  avec résolution en 3 niveaux (workflow → geste → "montre-moi")
- GestureCatalog : 38 raccourcis clavier universels Windows avec matching sémantique,
  substitution automatique dans les replays, et endpoint /api/gestures
- Mode Copilot : exécution pas-à-pas des workflows avec validation humaine via WebSocket
  (approve/skip/abort) avant chaque action
- Léa UI (agent_v0/lea_ui/) : interface PyQt5 pour Windows avec overlay transparent
  pour feedback visuel pendant le replay
- Data Extraction (core/extraction/) : moteur d'extraction visuelle de données
  (OCR + VLM → SQLite), avec schémas YAML et export CSV/Excel
- ReplayVerifier (agent_v0/server_v1/) : vérification post-action par comparaison
  de screenshots, avec logique de retry (max 3)
- IntentParser durci : meilleur fallback regex, type GREETING, patterns améliorés
- Dashboard : nouvelles pages gestures, streaming, extractions
- Tests : 63 tests GestureCatalog, 47 tests extraction, corrections tests existants
- Dépréciation : /api/agent/plan et /api/agent/execute retournent HTTP 410,
  suppression du code hardcodé _plan_to_replay_actions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 10:02:09 +01:00

597 lines
20 KiB
Python

"""
HierarchicalMatcher - Système de matching multi-niveau
Ce module implémente un système de matching hiérarchique qui combine:
- Niveau fenêtre: titre, processus, classe de fenêtre
- Niveau région: régions UI détectées
- Niveau élément: éléments UI individuels
Formule de confiance: 0.2*fenêtre + 0.3*région + 0.5*élément
Boost temporel: +0.1 si successeur valide du node précédent
"""
import logging
import re
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass, field
from datetime import datetime
import numpy as np
logger = logging.getLogger(__name__)
# =============================================================================
# Dataclasses
# =============================================================================
@dataclass
class TemporalContext:
"""Contexte temporel pour le matching"""
previous_nodes: List[str] = field(default_factory=list) # N derniers nodes matchés
previous_confidences: List[float] = field(default_factory=list)
time_since_last_match: float = 0.0 # Temps depuis dernier match (secondes)
max_history: int = 5 # Nombre max de nodes à garder
def add_match(self, node_id: str, confidence: float) -> None:
"""Ajouter un match à l'historique"""
self.previous_nodes.append(node_id)
self.previous_confidences.append(confidence)
# Limiter la taille de l'historique
if len(self.previous_nodes) > self.max_history:
self.previous_nodes = self.previous_nodes[-self.max_history:]
self.previous_confidences = self.previous_confidences[-self.max_history:]
@property
def last_node(self) -> Optional[str]:
"""Dernier node matché"""
return self.previous_nodes[-1] if self.previous_nodes else None
@dataclass
class AlternativeMatch:
"""Match alternatif avec score"""
node_id: str
confidence: float
window_confidence: float
region_confidence: float
element_confidence: float
@dataclass
class MatchResult:
"""Résultat complet d'un match hiérarchique"""
node_id: str
confidence: float # Confiance globale (0-1)
window_confidence: float # Confiance niveau fenêtre
region_confidence: float # Confiance niveau région
element_confidence: float # Confiance niveau élément
temporal_boost: float = 0.0 # Bonus temporel appliqué
matched_variant: Optional[str] = None # ID de variante matchée
alternatives: List[AlternativeMatch] = field(default_factory=list)
match_time_ms: float = 0.0 # Temps de matching
@property
def raw_confidence(self) -> float:
"""Confiance avant boost temporel"""
return self.confidence - self.temporal_boost
def to_dict(self) -> Dict[str, Any]:
"""Sérialiser en dictionnaire"""
return {
"node_id": self.node_id,
"confidence": self.confidence,
"window_confidence": self.window_confidence,
"region_confidence": self.region_confidence,
"element_confidence": self.element_confidence,
"temporal_boost": self.temporal_boost,
"matched_variant": self.matched_variant,
"alternatives_count": len(self.alternatives),
"match_time_ms": self.match_time_ms
}
@dataclass
class HierarchicalMatcherConfig:
"""Configuration du matcher hiérarchique"""
# Poids pour la combinaison de confiance
window_weight: float = 0.2
region_weight: float = 0.3
element_weight: float = 0.5
# Boost temporel
temporal_boost: float = 0.1
max_confidence: float = 1.0
# Seuils
min_confidence_threshold: float = 0.5
window_title_similarity_threshold: float = 0.8
region_iou_threshold: float = 0.5
element_similarity_threshold: float = 0.7
# Matching de fenêtre
use_regex_title_matching: bool = True
case_sensitive_title: bool = False
# =============================================================================
# Matcher Hiérarchique
# =============================================================================
class HierarchicalMatcher:
"""
Système de matching multi-niveau pour reconnaissance d'états d'écran.
Combine trois niveaux de matching:
1. Fenêtre: titre, processus, classe
2. Région: zones UI détectées
3. Élément: éléments UI individuels
Example:
>>> matcher = HierarchicalMatcher()
>>> result = matcher.match(screenshot, workflow, temporal_context)
>>> if result.confidence > 0.8:
... print(f"Matched node: {result.node_id}")
"""
def __init__(self, config: Optional[HierarchicalMatcherConfig] = None):
"""
Initialiser le matcher.
Args:
config: Configuration du matcher (utilise défaut si None)
"""
self.config = config or HierarchicalMatcherConfig()
logger.info(
f"HierarchicalMatcher initialisé: "
f"weights=({self.config.window_weight}, {self.config.region_weight}, {self.config.element_weight})"
)
def match(
self,
screenshot: Any,
workflow: Any,
window_info: Optional[Dict] = None,
detected_elements: Optional[List] = None,
temporal_context: Optional[TemporalContext] = None
) -> MatchResult:
"""
Effectuer un match hiérarchique contre tous les nodes du workflow.
Args:
screenshot: Image du screenshot (PIL Image ou numpy array)
workflow: Workflow avec nodes à matcher
window_info: Informations de fenêtre (titre, processus, etc.)
detected_elements: Éléments UI détectés
temporal_context: Contexte temporel pour boost
Returns:
MatchResult avec le meilleur match et alternatives
"""
import time
start_time = time.time()
nodes = getattr(workflow, 'nodes', [])
if not nodes:
logger.warning("Workflow sans nodes")
return MatchResult(
node_id="",
confidence=0.0,
window_confidence=0.0,
region_confidence=0.0,
element_confidence=0.0
)
# Calculer scores pour chaque node
node_scores = []
for node in nodes:
score = self._compute_node_score(
node, screenshot, window_info, detected_elements
)
node_scores.append((node, score))
# Trier par confiance décroissante
node_scores.sort(key=lambda x: x[1]['combined'], reverse=True)
# Meilleur match
best_node, best_scores = node_scores[0]
# Appliquer boost temporel si applicable
temporal_boost = 0.0
if temporal_context and temporal_context.last_node:
if self._is_valid_successor(
temporal_context.last_node,
best_node.node_id,
workflow
):
temporal_boost = self.config.temporal_boost
# Calculer confiance finale (plafonnée à 1.0)
final_confidence = min(
best_scores['combined'] + temporal_boost,
self.config.max_confidence
)
# Construire alternatives
alternatives = []
for node, scores in node_scores[1:4]: # Top 3 alternatives
alternatives.append(AlternativeMatch(
node_id=node.node_id,
confidence=scores['combined'],
window_confidence=scores['window'],
region_confidence=scores['region'],
element_confidence=scores['element']
))
match_time = (time.time() - start_time) * 1000
result = MatchResult(
node_id=best_node.node_id,
confidence=final_confidence,
window_confidence=best_scores['window'],
region_confidence=best_scores['region'],
element_confidence=best_scores['element'],
temporal_boost=temporal_boost,
alternatives=alternatives,
match_time_ms=match_time
)
logger.debug(
f"Match: {result.node_id} (conf={result.confidence:.3f}, "
f"w={result.window_confidence:.3f}, r={result.region_confidence:.3f}, "
f"e={result.element_confidence:.3f}, boost={temporal_boost:.2f})"
)
return result
def _compute_node_score(
self,
node: Any,
screenshot: Any,
window_info: Optional[Dict],
detected_elements: Optional[List]
) -> Dict[str, float]:
"""
Calculer les scores de matching pour un node.
Returns:
Dict avec scores 'window', 'region', 'element', 'combined'
"""
# Score niveau fenêtre
window_score = self.match_window_level(window_info, node)
# Score niveau région
region_score = self.match_region_level(screenshot, node)
# Score niveau élément
element_score = self.match_element_level(detected_elements, node)
# Combinaison pondérée
combined = (
self.config.window_weight * window_score +
self.config.region_weight * region_score +
self.config.element_weight * element_score
)
return {
'window': window_score,
'region': region_score,
'element': element_score,
'combined': combined
}
def match_window_level(
self,
window_info: Optional[Dict],
node: Any
) -> float:
"""
Matcher au niveau fenêtre.
Compare:
- Titre de fenêtre (pattern regex ou similarité)
- Nom du processus
- Classe de fenêtre
Args:
window_info: Dict avec 'title', 'process_name', 'window_class'
node: WorkflowNode avec template
Returns:
Score de confiance 0.0-1.0
"""
if not window_info:
return 0.5 # Score neutre si pas d'info
template = getattr(node, 'template', None)
if not template:
return 0.5
scores = []
# Matching du titre
current_title = window_info.get('title', '')
template_pattern = getattr(template.window, 'title_pattern', None) if getattr(template, 'window', None) else None
if template_pattern and current_title:
if self.config.use_regex_title_matching:
try:
flags = 0 if self.config.case_sensitive_title else re.IGNORECASE
if re.search(template_pattern, current_title, flags):
scores.append(1.0)
else:
# Fallback sur similarité de chaîne
scores.append(self._string_similarity(template_pattern, current_title))
except re.error:
scores.append(self._string_similarity(template_pattern, current_title))
else:
scores.append(self._string_similarity(template_pattern, current_title))
# Matching du processus
current_process = window_info.get('process_name', '')
template_process = getattr(template.window, 'process_name', None) if getattr(template, 'window', None) else None
if template_process and current_process:
if current_process.lower() == template_process.lower():
scores.append(1.0)
else:
scores.append(0.0)
# Matching de la classe de fenêtre
current_class = window_info.get('window_class', '')
template_class = getattr(template, 'window_class', None)
if template_class and current_class:
if current_class == template_class:
scores.append(1.0)
else:
scores.append(0.0)
return np.mean(scores) if scores else 0.5
def match_region_level(
self,
screenshot: Any,
node: Any
) -> float:
"""
Matcher au niveau région.
Compare les régions UI détectées avec les régions template.
Utilise IoU (Intersection over Union) et similarité d'embedding.
Args:
screenshot: Image du screenshot
node: WorkflowNode avec template
Returns:
Score de confiance 0.0-1.0
"""
template = getattr(node, 'template', None)
if not template:
return 0.5
# Récupérer embedding prototype du template
prototype = getattr(template.embedding, 'vector_id', None) if getattr(template, 'embedding', None) else None
if prototype is None:
return 0.5
# Calculer embedding du screenshot actuel
try:
from core.embedding.state_embedding_builder import StateEmbeddingBuilder
builder = StateEmbeddingBuilder()
# Créer un ScreenState minimal pour le builder
from core.models.screen_state import ScreenState, WindowContext, RawLevel, PerceptionLevel, ContextLevel, EmbeddingRef
temp_state = ScreenState(
screen_state_id="temp_match",
timestamp=datetime.now(),
session_id="temp",
window=WindowContext(
app_name="unknown",
window_title="Unknown",
screen_resolution=[1920, 1080],
workspace="main"
),
raw=RawLevel(screenshot_path="", capture_method="memory", file_size_bytes=0),
perception=PerceptionLevel(
embedding=EmbeddingRef(provider="temp", vector_id="", dimensions=512),
detected_text=[],
text_detection_method="none",
confidence_avg=0.0
),
context=ContextLevel(
current_workflow_candidate=None,
workflow_step=0,
user_id="temp",
tags=[],
business_variables={}
),
metadata={"screenshot_data": screenshot}
)
state_embedding = builder.build(temp_state)
current_vector = state_embedding.get_vector()
# Calculer similarité cosinus
prototype_array = np.array(prototype)
similarity = self._cosine_similarity(current_vector, prototype_array)
return float(similarity)
except Exception as e:
logger.warning(f"Erreur matching région: {e}")
return 0.5
def match_element_level(
self,
detected_elements: Optional[List],
node: Any
) -> float:
"""
Matcher au niveau élément.
Compare les éléments UI détectés avec les éléments template.
Utilise rôle, texte et similarité visuelle.
Args:
detected_elements: Liste d'éléments UI détectés
node: WorkflowNode avec template
Returns:
Score de confiance 0.0-1.0
"""
if not detected_elements:
return 0.5
template = getattr(node, 'template', None)
if not template:
return 0.5
required_elements = getattr(template, 'required_ui_elements', [])
if not required_elements:
return 0.5
# Compter les éléments requis trouvés
found_count = 0
for required in required_elements:
req_role = required.get('role', '')
req_text = required.get('text', '')
for detected in detected_elements:
det_role = getattr(detected, 'role', '') or detected.get('role', '')
det_text = getattr(detected, 'text', '') or detected.get('text', '')
# Matching par rôle
role_match = req_role.lower() == det_role.lower() if req_role and det_role else True
# Matching par texte (similarité)
if req_text and det_text:
text_match = self._string_similarity(req_text, det_text) > self.config.element_similarity_threshold
else:
text_match = True
if role_match and text_match:
found_count += 1
break
return found_count / len(required_elements) if required_elements else 0.5
def _is_valid_successor(
self,
from_node_id: str,
to_node_id: str,
workflow: Any
) -> bool:
"""
Vérifier si to_node est un successeur valide de from_node.
Args:
from_node_id: ID du node source
to_node_id: ID du node destination
workflow: Workflow avec edges
Returns:
True si transition valide
"""
edges = getattr(workflow, 'edges', [])
for edge in edges:
if edge.from_node == from_node_id and edge.to_node == to_node_id:
return True
return False
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculer similarité cosinus entre deux vecteurs."""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))
def _string_similarity(self, s1: str, s2: str) -> float:
"""
Calculer similarité entre deux chaînes.
Utilise la distance de Levenshtein normalisée.
"""
if not s1 or not s2:
return 0.0
if not self.config.case_sensitive_title:
s1 = s1.lower()
s2 = s2.lower()
# Distance de Levenshtein simplifiée
if s1 == s2:
return 1.0
len1, len2 = len(s1), len(s2)
if len1 == 0 or len2 == 0:
return 0.0
# Matrice de distance
matrix = [[0] * (len2 + 1) for _ in range(len1 + 1)]
for i in range(len1 + 1):
matrix[i][0] = i
for j in range(len2 + 1):
matrix[0][j] = j
for i in range(1, len1 + 1):
for j in range(1, len2 + 1):
cost = 0 if s1[i-1] == s2[j-1] else 1
matrix[i][j] = min(
matrix[i-1][j] + 1, # Suppression
matrix[i][j-1] + 1, # Insertion
matrix[i-1][j-1] + cost # Substitution
)
distance = matrix[len1][len2]
max_len = max(len1, len2)
return 1.0 - (distance / max_len)
def get_config(self) -> HierarchicalMatcherConfig:
"""Récupérer la configuration actuelle."""
return self.config
def set_config(self, config: HierarchicalMatcherConfig) -> None:
"""Mettre à jour la configuration."""
self.config = config
logger.info("Configuration du matcher mise à jour")
# =============================================================================
# Fonctions utilitaires
# =============================================================================
def create_matcher(
window_weight: float = 0.2,
region_weight: float = 0.3,
element_weight: float = 0.5,
temporal_boost: float = 0.1
) -> HierarchicalMatcher:
"""
Créer un matcher avec configuration personnalisée.
Args:
window_weight: Poids du niveau fenêtre
region_weight: Poids du niveau région
element_weight: Poids du niveau élément
temporal_boost: Boost pour successeurs valides
Returns:
HierarchicalMatcher configuré
"""
config = HierarchicalMatcherConfig(
window_weight=window_weight,
region_weight=region_weight,
element_weight=element_weight,
temporal_boost=temporal_boost
)
return HierarchicalMatcher(config)