Files
rpa_vision_v3/core/extraction/iteration_controller.py
Dom cf495dd82f feat: chat unifié, GestureCatalog, Copilot, Léa UI, extraction données, vérification replay
Refonte majeure du système Agent Chat et ajout de nombreux modules :

- Chat unifié : suppression du dual Workflows/Agent Libre, tout passe par /api/chat
  avec résolution en 3 niveaux (workflow → geste → "montre-moi")
- GestureCatalog : 38 raccourcis clavier universels Windows avec matching sémantique,
  substitution automatique dans les replays, et endpoint /api/gestures
- Mode Copilot : exécution pas-à-pas des workflows avec validation humaine via WebSocket
  (approve/skip/abort) avant chaque action
- Léa UI (agent_v0/lea_ui/) : interface PyQt5 pour Windows avec overlay transparent
  pour feedback visuel pendant le replay
- Data Extraction (core/extraction/) : moteur d'extraction visuelle de données
  (OCR + VLM → SQLite), avec schémas YAML et export CSV/Excel
- ReplayVerifier (agent_v0/server_v1/) : vérification post-action par comparaison
  de screenshots, avec logique de retry (max 3)
- IntentParser durci : meilleur fallback regex, type GREETING, patterns améliorés
- Dashboard : nouvelles pages gestures, streaming, extractions
- Tests : 63 tests GestureCatalog, 47 tests extraction, corrections tests existants
- Dépréciation : /api/agent/plan et /api/agent/execute retournent HTTP 410,
  suppression du code hardcodé _plan_to_replay_actions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 10:02:09 +01:00

259 lines
8.3 KiB
Python

"""
IterationController - Controle de navigation entre enregistrements
Gere la boucle de navigation : passage au record suivant, pagination,
scroll, etc. Communique avec le streaming server (Agent V1) pour
envoyer les actions de navigation sur la machine cible.
"""
import logging
import time
from typing import Any, Dict, Optional
import requests
from .schema import ExtractionSchema
logger = logging.getLogger(__name__)
class IterationController:
"""
Controle la navigation entre les enregistrements a extraire.
Types de navigation supportes :
- list_detail : cliquer sur chaque element d'une liste
- pagination : bouton suivant / page suivante
- scroll : defilement vertical
- manual : l'utilisateur navigue manuellement
"""
def __init__(
self,
schema: ExtractionSchema,
streaming_server_url: str = "http://localhost:5005",
):
"""
Args:
schema: Schema d'extraction (contient les regles de navigation)
streaming_server_url: URL du streaming server Agent V1
"""
self.schema = schema
self.server_url = streaming_server_url.rstrip("/")
self.current_index = 0
self.max_records = schema.navigation.get("max_records", 100)
self.nav_type = schema.navigation.get("type", "manual")
self.nav_action = schema.navigation.get("next_record", "click_next_in_list")
self.nav_delay = schema.navigation.get("delay_ms", 1000)
# Etat interne
self._started = False
self._finished = False
# ------------------------------------------------------------------
# API publique
# ------------------------------------------------------------------
def has_next(self) -> bool:
"""Retourne True s'il reste des enregistrements a traiter."""
if self._finished:
return False
return self.current_index < self.max_records
def navigate_to_next(self, session_id: str) -> bool:
"""
Naviguer vers l'enregistrement suivant.
Envoie les actions de navigation au streaming server
en fonction du type de navigation defini dans le schema.
Args:
session_id: ID de la session de streaming
Returns:
True si la navigation a reussi
"""
if not self.has_next():
logger.info("Plus d'enregistrements a traiter (index=%d)", self.current_index)
return False
success = False
if self.nav_type == "manual":
# Mode manuel : on attend juste un delai
logger.info(
"Navigation manuelle : attente de %dms (index=%d)",
self.nav_delay,
self.current_index,
)
time.sleep(self.nav_delay / 1000)
success = True
elif self.nav_type == "pagination":
success = self._navigate_pagination(session_id)
elif self.nav_type == "list_detail":
success = self._navigate_list_detail(session_id)
elif self.nav_type == "scroll":
success = self._navigate_scroll(session_id)
else:
logger.warning("Type de navigation inconnu : %s", self.nav_type)
success = False
if success:
self.current_index += 1
logger.debug(
"Navigation reussie -> index=%d/%d",
self.current_index,
self.max_records,
)
return success
def navigate_to_record(self, session_id: str, index: int) -> bool:
"""
Naviguer vers un enregistrement specifique.
Args:
session_id: ID de la session de streaming
index: Index de l'enregistrement cible
Returns:
True si la navigation a reussi
"""
if index < 0 or index >= self.max_records:
logger.error("Index hors limites : %d (max=%d)", index, self.max_records)
return False
# Naviguer pas a pas jusqu'a l'index cible
steps = index - self.current_index
if steps < 0:
logger.warning(
"Navigation arriere non supportee (current=%d, target=%d)",
self.current_index,
index,
)
return False
for _ in range(steps):
if not self.navigate_to_next(session_id):
return False
return True
def reset(self) -> None:
"""Reinitialiser le controleur."""
self.current_index = 0
self._started = False
self._finished = False
def mark_finished(self) -> None:
"""Marquer l'iteration comme terminee (ex: fin de liste detectee)."""
self._finished = True
logger.info("Iteration marquee comme terminee a l'index %d", self.current_index)
@property
def progress(self) -> Dict[str, Any]:
"""Retourne la progression actuelle."""
return {
"current_index": self.current_index,
"max_records": self.max_records,
"progress_pct": round(
(self.current_index / self.max_records * 100)
if self.max_records > 0 else 0,
1,
),
"nav_type": self.nav_type,
"finished": self._finished,
}
# ------------------------------------------------------------------
# Navigation specifique
# ------------------------------------------------------------------
def _navigate_pagination(self, session_id: str) -> bool:
"""Navigation par pagination (bouton suivant)."""
action = {
"type": "click",
"target": self.nav_action,
"description": "Cliquer sur le bouton suivant / page suivante",
}
return self._send_action(session_id, action)
def _navigate_list_detail(self, session_id: str) -> bool:
"""Navigation dans une liste (cliquer sur l'element suivant)."""
action = {
"type": "click",
"target": self.nav_action,
"index": self.current_index,
"description": f"Cliquer sur l'element {self.current_index + 1} de la liste",
}
return self._send_action(session_id, action)
def _navigate_scroll(self, session_id: str) -> bool:
"""Navigation par defilement."""
action = {
"type": "scroll",
"direction": "down",
"amount": self.schema.navigation.get("scroll_amount", 300),
"description": "Defiler vers le bas",
}
return self._send_action(session_id, action)
# ------------------------------------------------------------------
# Communication avec le streaming server
# ------------------------------------------------------------------
def _send_action(self, session_id: str, action: Dict[str, Any]) -> bool:
"""
Envoyer une action de navigation au streaming server.
L'action est envoyee via l'API du streaming server (port 5005).
Si le serveur n'est pas disponible, on simule un delai.
Args:
session_id: ID de la session de streaming
action: Description de l'action a executer
Returns:
True si l'action a ete executee ou simulee
"""
try:
payload = {
"session_id": session_id,
"action": action,
}
response = requests.post(
f"{self.server_url}/api/action",
json=payload,
timeout=10,
)
if response.status_code == 200:
# Attendre le delai de navigation
if self.nav_delay > 0:
time.sleep(self.nav_delay / 1000)
return True
else:
logger.warning(
"Action de navigation echouee : HTTP %d", response.status_code
)
return False
except requests.exceptions.ConnectionError:
logger.warning(
"Streaming server non accessible a %s — simulation du delai",
self.server_url,
)
# Simuler l'attente de navigation (mode degrade)
if self.nav_delay > 0:
time.sleep(self.nav_delay / 1000)
return True
except Exception as e:
logger.error("Erreur envoi action de navigation : %s", e)
return False