Feat: Actions validation avec OCR Ollama (qwen2.5-vl:7b)
- verify_element_exists: recherche visuelle OpenCV réelle - verify_text_content: OCR via Ollama (GPU) avec fallback easyocr - Paramètres ocr_mode et ollama_model dans le catalogue frontend - Support des modes de matching: exact, contains, regex, starts_with, ends_with Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,296 @@
|
|||||||
|
"""
|
||||||
|
Action Validation Existence - Vérifier qu'un élément visuel existe
|
||||||
|
Auteur : Dom, Alice, Kiro, Claude - 14 janvier 2026
|
||||||
|
|
||||||
|
Cette action vérifie l'existence d'un élément visuel sur l'écran
|
||||||
|
en utilisant le template matching OpenCV.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Dict, Any, List, Optional
|
||||||
|
from datetime import datetime
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Import des modules de base
|
||||||
|
from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus
|
||||||
|
from ...contracts.error import VWBErrorType, create_vwb_error
|
||||||
|
from ...contracts.evidence import VWBEvidenceType, create_screenshot_evidence
|
||||||
|
from ...contracts.visual_anchor import VWBVisualAnchor
|
||||||
|
|
||||||
|
|
||||||
|
class VWBVerifyElementExistsAction(BaseVWBAction):
|
||||||
|
"""
|
||||||
|
Action pour vérifier l'existence d'un élément visuel.
|
||||||
|
|
||||||
|
Cette action utilise le template matching OpenCV pour rechercher
|
||||||
|
un élément sur l'écran et valider s'il est présent ou absent.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
action_id: str,
|
||||||
|
parameters: Dict[str, Any],
|
||||||
|
screen_capturer=None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialise l'action de vérification d'existence.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
action_id: Identifiant unique de l'action
|
||||||
|
parameters: Paramètres incluant l'ancre visuelle
|
||||||
|
screen_capturer: Instance du ScreenCapturer (Option A thread-safe)
|
||||||
|
"""
|
||||||
|
super().__init__(
|
||||||
|
action_id=action_id,
|
||||||
|
name="Vérifier Existence Élément",
|
||||||
|
description="Vérifie qu'un élément visuel existe ou n'existe pas sur l'écran",
|
||||||
|
parameters=parameters,
|
||||||
|
screen_capturer=screen_capturer
|
||||||
|
)
|
||||||
|
|
||||||
|
# Paramètres spécifiques à la vérification
|
||||||
|
self.visual_anchor: Optional[VWBVisualAnchor] = parameters.get('visual_anchor')
|
||||||
|
self.should_exist = parameters.get('should_exist', True)
|
||||||
|
|
||||||
|
# Configuration de matching
|
||||||
|
self.confidence_threshold = parameters.get('confidence_threshold', 0.7)
|
||||||
|
self.search_timeout_ms = parameters.get('search_timeout_ms', 5000)
|
||||||
|
|
||||||
|
def validate_parameters(self) -> List[str]:
|
||||||
|
"""Valide les paramètres de l'action."""
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
# Vérifier l'ancre visuelle
|
||||||
|
if not self.visual_anchor:
|
||||||
|
errors.append("Ancre visuelle requise")
|
||||||
|
elif isinstance(self.visual_anchor, dict):
|
||||||
|
# Si c'est un dict, vérifier qu'il a les champs nécessaires
|
||||||
|
if not self.visual_anchor.get('screenshot') and not self.visual_anchor.get('image_base64'):
|
||||||
|
errors.append("Ancre visuelle doit contenir une image (screenshot ou image_base64)")
|
||||||
|
elif isinstance(self.visual_anchor, VWBVisualAnchor):
|
||||||
|
if not self.visual_anchor.is_active:
|
||||||
|
errors.append("Ancre visuelle inactive")
|
||||||
|
|
||||||
|
# Vérifier le seuil de confiance
|
||||||
|
if not (0.0 <= self.confidence_threshold <= 1.0):
|
||||||
|
errors.append("Seuil de confiance doit être entre 0.0 et 1.0")
|
||||||
|
|
||||||
|
return errors
|
||||||
|
|
||||||
|
def execute_core(self, step_id: str) -> VWBActionResult:
|
||||||
|
"""
|
||||||
|
Exécute la vérification d'existence de l'élément.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
step_id: Identifiant de l'étape
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Résultat d'exécution
|
||||||
|
"""
|
||||||
|
start_time = datetime.now()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Importer la fonction de recherche visuelle
|
||||||
|
from ...catalog_routes import find_visual_anchor_on_screen
|
||||||
|
|
||||||
|
# Obtenir l'image de l'ancre
|
||||||
|
anchor_image = self._get_anchor_image()
|
||||||
|
if not anchor_image:
|
||||||
|
return self._create_error_result(
|
||||||
|
step_id=step_id,
|
||||||
|
start_time=start_time,
|
||||||
|
error_type=VWBErrorType.PARAMETER_INVALID,
|
||||||
|
message="Impossible d'obtenir l'image de l'ancre visuelle"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Obtenir le bounding_box si disponible
|
||||||
|
bounding_box = self._get_bounding_box()
|
||||||
|
|
||||||
|
# Rechercher l'élément sur l'écran
|
||||||
|
anchor_name = self._get_anchor_name()
|
||||||
|
print(f"🔍 Recherche de l'élément: {anchor_name}")
|
||||||
|
|
||||||
|
search_result = find_visual_anchor_on_screen(
|
||||||
|
anchor_image_base64=anchor_image,
|
||||||
|
confidence_threshold=self.confidence_threshold,
|
||||||
|
bounding_box=bounding_box
|
||||||
|
)
|
||||||
|
|
||||||
|
# Analyser le résultat
|
||||||
|
element_found = search_result is not None and search_result.get('found', False)
|
||||||
|
confidence = search_result.get('confidence', 0.0) if search_result else 0.0
|
||||||
|
|
||||||
|
# Vérifier si le résultat correspond à l'attente
|
||||||
|
verification_success = (element_found == self.should_exist)
|
||||||
|
|
||||||
|
end_time = datetime.now()
|
||||||
|
execution_time = (end_time - start_time).total_seconds() * 1000
|
||||||
|
|
||||||
|
# Créer le résultat
|
||||||
|
if verification_success:
|
||||||
|
status_msg = "présent" if element_found else "absent"
|
||||||
|
print(f"✅ Vérification réussie: élément {status_msg} (confiance: {confidence:.2%})")
|
||||||
|
|
||||||
|
result = VWBActionResult(
|
||||||
|
action_id=self.action_id,
|
||||||
|
step_id=step_id,
|
||||||
|
status=VWBActionStatus.SUCCESS,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
execution_time_ms=execution_time,
|
||||||
|
output_data={
|
||||||
|
'element_found': element_found,
|
||||||
|
'should_exist': self.should_exist,
|
||||||
|
'verification_success': True,
|
||||||
|
'confidence': confidence,
|
||||||
|
'match_position': search_result if element_found else None
|
||||||
|
},
|
||||||
|
evidence_list=self.evidence_list.copy()
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
expected = "présent" if self.should_exist else "absent"
|
||||||
|
actual = "présent" if element_found else "absent"
|
||||||
|
error_msg = f"Élément attendu {expected} mais trouvé {actual}"
|
||||||
|
print(f"❌ Vérification échouée: {error_msg}")
|
||||||
|
|
||||||
|
return self._create_error_result(
|
||||||
|
step_id=step_id,
|
||||||
|
start_time=start_time,
|
||||||
|
error_type=VWBErrorType.ELEMENT_NOT_FOUND if self.should_exist else VWBErrorType.VALIDATION_FAILED,
|
||||||
|
message=error_msg,
|
||||||
|
technical_details={
|
||||||
|
'element_found': element_found,
|
||||||
|
'should_exist': self.should_exist,
|
||||||
|
'confidence': confidence,
|
||||||
|
'match_position': search_result if element_found else None
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
except ImportError as e:
|
||||||
|
print(f"⚠️ Erreur import: {e}")
|
||||||
|
# Fallback si catalog_routes n'est pas disponible
|
||||||
|
return self._execute_fallback(step_id, start_time)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return self._create_error_result(
|
||||||
|
step_id=step_id,
|
||||||
|
start_time=start_time,
|
||||||
|
error_type=VWBErrorType.SYSTEM_ERROR,
|
||||||
|
message=f"Erreur lors de la vérification: {str(e)}",
|
||||||
|
technical_details={'exception': str(e)}
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_anchor_image(self) -> Optional[str]:
|
||||||
|
"""Récupère l'image base64 de l'ancre."""
|
||||||
|
if isinstance(self.visual_anchor, dict):
|
||||||
|
# Format dictionnaire
|
||||||
|
return self.visual_anchor.get('screenshot') or self.visual_anchor.get('image_base64')
|
||||||
|
elif isinstance(self.visual_anchor, VWBVisualAnchor):
|
||||||
|
# Format objet VWBVisualAnchor
|
||||||
|
return self.visual_anchor.screenshot_base64
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_bounding_box(self) -> Optional[Dict]:
|
||||||
|
"""Récupère le bounding_box de l'ancre si disponible."""
|
||||||
|
if isinstance(self.visual_anchor, dict):
|
||||||
|
return self.visual_anchor.get('bounding_box')
|
||||||
|
elif isinstance(self.visual_anchor, VWBVisualAnchor):
|
||||||
|
if self.visual_anchor.has_bounding_box():
|
||||||
|
return self.visual_anchor.bounding_box
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_anchor_name(self) -> str:
|
||||||
|
"""Récupère le nom de l'ancre."""
|
||||||
|
if isinstance(self.visual_anchor, dict):
|
||||||
|
return self.visual_anchor.get('name', self.visual_anchor.get('anchor_id', 'Unknown'))
|
||||||
|
elif isinstance(self.visual_anchor, VWBVisualAnchor):
|
||||||
|
return self.visual_anchor.name
|
||||||
|
return "Unknown"
|
||||||
|
|
||||||
|
def _execute_fallback(self, step_id: str, start_time: datetime) -> VWBActionResult:
|
||||||
|
"""
|
||||||
|
Fallback si la recherche visuelle n'est pas disponible.
|
||||||
|
Utilise pyautogui.locateOnScreen si disponible.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import pyautogui
|
||||||
|
from PIL import Image
|
||||||
|
from io import BytesIO
|
||||||
|
import base64
|
||||||
|
|
||||||
|
anchor_image = self._get_anchor_image()
|
||||||
|
if not anchor_image:
|
||||||
|
return self._create_error_result(
|
||||||
|
step_id=step_id,
|
||||||
|
start_time=start_time,
|
||||||
|
error_type=VWBErrorType.PARAMETER_INVALID,
|
||||||
|
message="Image de l'ancre non disponible"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Décoder l'image
|
||||||
|
if ',' in anchor_image:
|
||||||
|
anchor_image = anchor_image.split(',')[1]
|
||||||
|
|
||||||
|
image_bytes = base64.b64decode(anchor_image)
|
||||||
|
pil_image = Image.open(BytesIO(image_bytes))
|
||||||
|
|
||||||
|
# Rechercher avec pyautogui
|
||||||
|
location = pyautogui.locateOnScreen(pil_image, confidence=self.confidence_threshold)
|
||||||
|
|
||||||
|
element_found = location is not None
|
||||||
|
verification_success = (element_found == self.should_exist)
|
||||||
|
|
||||||
|
end_time = datetime.now()
|
||||||
|
execution_time = (end_time - start_time).total_seconds() * 1000
|
||||||
|
|
||||||
|
if verification_success:
|
||||||
|
return VWBActionResult(
|
||||||
|
action_id=self.action_id,
|
||||||
|
step_id=step_id,
|
||||||
|
status=VWBActionStatus.SUCCESS,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
execution_time_ms=execution_time,
|
||||||
|
output_data={
|
||||||
|
'element_found': element_found,
|
||||||
|
'should_exist': self.should_exist,
|
||||||
|
'verification_success': True,
|
||||||
|
'method': 'pyautogui_fallback'
|
||||||
|
},
|
||||||
|
evidence_list=self.evidence_list.copy()
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
expected = "présent" if self.should_exist else "absent"
|
||||||
|
actual = "présent" if element_found else "absent"
|
||||||
|
|
||||||
|
return self._create_error_result(
|
||||||
|
step_id=step_id,
|
||||||
|
start_time=start_time,
|
||||||
|
error_type=VWBErrorType.VALIDATION_FAILED,
|
||||||
|
message=f"Élément attendu {expected} mais trouvé {actual}",
|
||||||
|
technical_details={'method': 'pyautogui_fallback'}
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return self._create_error_result(
|
||||||
|
step_id=step_id,
|
||||||
|
start_time=start_time,
|
||||||
|
error_type=VWBErrorType.SYSTEM_ERROR,
|
||||||
|
message=f"Erreur fallback: {str(e)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_action_info(self) -> Dict[str, Any]:
|
||||||
|
"""Retourne les informations de l'action pour l'interface."""
|
||||||
|
return {
|
||||||
|
'action_id': self.action_id,
|
||||||
|
'name': self.name,
|
||||||
|
'description': self.description,
|
||||||
|
'type': 'verify_element_exists',
|
||||||
|
'parameters': {
|
||||||
|
'anchor_name': self._get_anchor_name(),
|
||||||
|
'should_exist': self.should_exist,
|
||||||
|
'confidence_threshold': self.confidence_threshold
|
||||||
|
},
|
||||||
|
'status': self.current_status.value
|
||||||
|
}
|
||||||
@@ -0,0 +1,391 @@
|
|||||||
|
"""
|
||||||
|
Action Validation Texte - Vérifier le contenu textuel d'un élément
|
||||||
|
Auteur : Dom, Alice, Kiro, Claude - 14 janvier 2026
|
||||||
|
|
||||||
|
Cette action vérifie qu'un texte spécifique est présent dans une zone
|
||||||
|
de l'écran, en utilisant l'OCR pour extraire et comparer le texte.
|
||||||
|
|
||||||
|
Modes OCR disponibles:
|
||||||
|
- ollama: Utilise un modèle de vision local (GPU, meilleure qualité)
|
||||||
|
- easyocr: OCR traditionnel (CPU/GPU, plus rapide)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Dict, Any, List, Optional
|
||||||
|
from datetime import datetime
|
||||||
|
import time
|
||||||
|
import re
|
||||||
|
import base64
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
# Import des modules de base
|
||||||
|
from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus
|
||||||
|
from ...contracts.error import VWBErrorType, create_vwb_error
|
||||||
|
from ...contracts.evidence import VWBEvidenceType, create_screenshot_evidence
|
||||||
|
from ...contracts.visual_anchor import VWBVisualAnchor
|
||||||
|
|
||||||
|
|
||||||
|
class VWBVerifyTextContentAction(BaseVWBAction):
|
||||||
|
"""
|
||||||
|
Action pour vérifier le contenu textuel d'un élément.
|
||||||
|
|
||||||
|
Cette action extrait le texte d'une zone de l'écran via OCR
|
||||||
|
et vérifie qu'il correspond au texte attendu.
|
||||||
|
|
||||||
|
Supporte deux modes OCR:
|
||||||
|
- ollama: Modèle de vision local (meilleure qualité, utilise GPU)
|
||||||
|
- easyocr: OCR traditionnel (plus rapide, fallback)
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Configuration Ollama par défaut
|
||||||
|
OLLAMA_URL = "http://localhost:11434"
|
||||||
|
OLLAMA_MODEL = "qwen2.5-vl:7b" # Modèle de vision Qwen - excellent pour OCR
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
action_id: str,
|
||||||
|
parameters: Dict[str, Any],
|
||||||
|
screen_capturer=None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialise l'action de vérification de texte.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
action_id: Identifiant unique de l'action
|
||||||
|
parameters: Paramètres incluant l'ancre visuelle et le texte attendu
|
||||||
|
screen_capturer: Instance du ScreenCapturer (Option A thread-safe)
|
||||||
|
"""
|
||||||
|
super().__init__(
|
||||||
|
action_id=action_id,
|
||||||
|
name="Vérifier Contenu Texte",
|
||||||
|
description="Vérifie qu'un texte spécifique est présent dans une zone de l'écran",
|
||||||
|
parameters=parameters,
|
||||||
|
screen_capturer=screen_capturer
|
||||||
|
)
|
||||||
|
|
||||||
|
# Paramètres spécifiques à la vérification
|
||||||
|
self.visual_anchor: Optional[VWBVisualAnchor] = parameters.get('visual_anchor')
|
||||||
|
self.expected_text = parameters.get('expected_text', '')
|
||||||
|
self.match_mode = parameters.get('match_mode', 'contains') # exact, contains, regex
|
||||||
|
self.case_sensitive = parameters.get('case_sensitive', False)
|
||||||
|
|
||||||
|
# Configuration OCR
|
||||||
|
self.ocr_mode = parameters.get('ocr_mode', 'ollama') # ollama (GPU) ou easyocr
|
||||||
|
self.ollama_model = parameters.get('ollama_model', self.OLLAMA_MODEL)
|
||||||
|
self.ollama_url = parameters.get('ollama_url', self.OLLAMA_URL)
|
||||||
|
|
||||||
|
# Configuration de matching
|
||||||
|
self.confidence_threshold = parameters.get('confidence_threshold', 0.7)
|
||||||
|
|
||||||
|
def validate_parameters(self) -> List[str]:
|
||||||
|
"""Valide les paramètres de l'action."""
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
# Vérifier le texte attendu
|
||||||
|
if not self.expected_text:
|
||||||
|
errors.append("Texte attendu requis")
|
||||||
|
|
||||||
|
# Vérifier l'ancre visuelle (optionnelle pour recherche plein écran)
|
||||||
|
if self.visual_anchor:
|
||||||
|
if isinstance(self.visual_anchor, dict):
|
||||||
|
if not self.visual_anchor.get('screenshot') and not self.visual_anchor.get('image_base64'):
|
||||||
|
if not self.visual_anchor.get('bounding_box'):
|
||||||
|
errors.append("Ancre visuelle doit contenir une image ou un bounding_box")
|
||||||
|
elif isinstance(self.visual_anchor, VWBVisualAnchor):
|
||||||
|
if not self.visual_anchor.is_active:
|
||||||
|
errors.append("Ancre visuelle inactive")
|
||||||
|
|
||||||
|
# Vérifier le mode de matching
|
||||||
|
if self.match_mode not in ['exact', 'contains', 'regex', 'starts_with', 'ends_with']:
|
||||||
|
errors.append(f"Mode de matching invalide: {self.match_mode}")
|
||||||
|
|
||||||
|
# Vérifier le mode OCR
|
||||||
|
if self.ocr_mode not in ['ollama', 'easyocr']:
|
||||||
|
errors.append(f"Mode OCR invalide: {self.ocr_mode} (utilisez 'ollama' ou 'easyocr')")
|
||||||
|
|
||||||
|
return errors
|
||||||
|
|
||||||
|
def execute_core(self, step_id: str) -> VWBActionResult:
|
||||||
|
"""
|
||||||
|
Exécute la vérification du contenu textuel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
step_id: Identifiant de l'étape
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Résultat d'exécution
|
||||||
|
"""
|
||||||
|
start_time = datetime.now()
|
||||||
|
|
||||||
|
try:
|
||||||
|
anchor_name = self._get_anchor_name()
|
||||||
|
print(f"🔍 Vérification texte: '{self.expected_text}' dans {anchor_name}")
|
||||||
|
print(f" Mode OCR: {self.ocr_mode}")
|
||||||
|
|
||||||
|
# Extraire le texte de la zone
|
||||||
|
extracted_text = self._extract_text_from_screen()
|
||||||
|
|
||||||
|
if extracted_text is None:
|
||||||
|
return self._create_error_result(
|
||||||
|
step_id=step_id,
|
||||||
|
start_time=start_time,
|
||||||
|
error_type=VWBErrorType.SCREEN_CAPTURE_FAILED,
|
||||||
|
message="Impossible d'extraire le texte de l'écran"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Comparer le texte
|
||||||
|
text_matches = self._compare_text(extracted_text, self.expected_text)
|
||||||
|
|
||||||
|
end_time = datetime.now()
|
||||||
|
execution_time = (end_time - start_time).total_seconds() * 1000
|
||||||
|
|
||||||
|
if text_matches:
|
||||||
|
print(f"✅ Texte trouvé: '{self.expected_text}'")
|
||||||
|
|
||||||
|
return VWBActionResult(
|
||||||
|
action_id=self.action_id,
|
||||||
|
step_id=step_id,
|
||||||
|
status=VWBActionStatus.SUCCESS,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
execution_time_ms=execution_time,
|
||||||
|
output_data={
|
||||||
|
'text_matches': True,
|
||||||
|
'expected_text': self.expected_text,
|
||||||
|
'extracted_text': extracted_text,
|
||||||
|
'match_mode': self.match_mode,
|
||||||
|
'ocr_mode': self.ocr_mode
|
||||||
|
},
|
||||||
|
evidence_list=self.evidence_list.copy()
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
error_msg = f"Texte attendu '{self.expected_text}' non trouvé"
|
||||||
|
print(f"❌ {error_msg}")
|
||||||
|
if extracted_text:
|
||||||
|
print(f" Texte extrait: '{extracted_text[:100]}...'")
|
||||||
|
|
||||||
|
return self._create_error_result(
|
||||||
|
step_id=step_id,
|
||||||
|
start_time=start_time,
|
||||||
|
error_type=VWBErrorType.VALIDATION_FAILED,
|
||||||
|
message=error_msg,
|
||||||
|
technical_details={
|
||||||
|
'expected_text': self.expected_text,
|
||||||
|
'extracted_text': extracted_text[:500] if extracted_text else '',
|
||||||
|
'match_mode': self.match_mode,
|
||||||
|
'ocr_mode': self.ocr_mode
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return self._create_error_result(
|
||||||
|
step_id=step_id,
|
||||||
|
start_time=start_time,
|
||||||
|
error_type=VWBErrorType.SYSTEM_ERROR,
|
||||||
|
message=f"Erreur lors de la vérification: {str(e)}",
|
||||||
|
technical_details={'exception': str(e)}
|
||||||
|
)
|
||||||
|
|
||||||
|
def _extract_text_from_screen(self) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Extrait le texte de l'écran ou d'une zone spécifique.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Texte extrait ou None si erreur
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import pyautogui
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
# Capturer l'écran
|
||||||
|
screenshot = pyautogui.screenshot()
|
||||||
|
|
||||||
|
# Si on a un bounding_box, extraire la région
|
||||||
|
if self.visual_anchor:
|
||||||
|
bbox = self._get_bounding_box()
|
||||||
|
if bbox:
|
||||||
|
x = int(bbox.get('x', 0))
|
||||||
|
y = int(bbox.get('y', 0))
|
||||||
|
w = int(bbox.get('width', 100))
|
||||||
|
h = int(bbox.get('height', 100))
|
||||||
|
screenshot = screenshot.crop((x, y, x + w, y + h))
|
||||||
|
|
||||||
|
# Choisir le mode OCR
|
||||||
|
if self.ocr_mode == 'ollama':
|
||||||
|
return self._extract_with_ollama(screenshot)
|
||||||
|
else:
|
||||||
|
return self._extract_with_easyocr(screenshot)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Erreur extraction texte: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _extract_with_ollama(self, image) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Extrait le texte en utilisant Ollama avec un modèle de vision.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
image: Image PIL à analyser
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Texte extrait ou None si erreur
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import requests
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
print(f"🤖 Extraction OCR via Ollama ({self.ollama_model})...")
|
||||||
|
|
||||||
|
# Convertir l'image en base64
|
||||||
|
buffer = BytesIO()
|
||||||
|
image.save(buffer, format='PNG')
|
||||||
|
image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||||
|
|
||||||
|
# Appel à l'API Ollama
|
||||||
|
response = requests.post(
|
||||||
|
f"{self.ollama_url}/api/generate",
|
||||||
|
json={
|
||||||
|
"model": self.ollama_model,
|
||||||
|
"prompt": "Extract and return ONLY the text visible in this image. Do not add any explanation or commentary. Just return the raw text content.",
|
||||||
|
"images": [image_base64],
|
||||||
|
"stream": False,
|
||||||
|
"options": {
|
||||||
|
"temperature": 0.1, # Réponse plus déterministe
|
||||||
|
"num_predict": 500 # Limite de tokens
|
||||||
|
}
|
||||||
|
},
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
result = response.json()
|
||||||
|
extracted_text = result.get('response', '').strip()
|
||||||
|
print(f" ✅ Texte extrait ({len(extracted_text)} caractères)")
|
||||||
|
return extracted_text
|
||||||
|
else:
|
||||||
|
print(f" ⚠️ Erreur Ollama: {response.status_code}")
|
||||||
|
# Fallback sur easyocr
|
||||||
|
return self._extract_with_easyocr(image)
|
||||||
|
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
print(f" ⚠️ Ollama non disponible, fallback sur easyocr")
|
||||||
|
return self._extract_with_easyocr(image)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠️ Erreur Ollama: {e}, fallback sur easyocr")
|
||||||
|
return self._extract_with_easyocr(image)
|
||||||
|
|
||||||
|
def _extract_with_easyocr(self, image) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Extrait le texte en utilisant EasyOCR.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
image: Image PIL à analyser
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Texte extrait ou None si erreur
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import easyocr
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
print("📝 Extraction OCR via EasyOCR...")
|
||||||
|
|
||||||
|
# Convertir en array numpy
|
||||||
|
img_array = np.array(image)
|
||||||
|
|
||||||
|
# EasyOCR (GPU si disponible)
|
||||||
|
reader = easyocr.Reader(['fr', 'en'], gpu=True)
|
||||||
|
results = reader.readtext(img_array)
|
||||||
|
|
||||||
|
# Combiner les résultats
|
||||||
|
extracted_text = ' '.join([result[1] for result in results])
|
||||||
|
print(f" ✅ Texte extrait ({len(extracted_text)} caractères)")
|
||||||
|
return extracted_text.strip()
|
||||||
|
|
||||||
|
except ImportError:
|
||||||
|
print(" ⚠️ easyocr non disponible")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ❌ Erreur EasyOCR: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _compare_text(self, extracted: str, expected: str) -> bool:
|
||||||
|
"""
|
||||||
|
Compare le texte extrait avec le texte attendu.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
extracted: Texte extrait de l'écran
|
||||||
|
expected: Texte attendu
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True si le texte correspond selon le mode de matching
|
||||||
|
"""
|
||||||
|
if not extracted:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Normaliser si non sensible à la casse
|
||||||
|
if not self.case_sensitive:
|
||||||
|
extracted = extracted.lower()
|
||||||
|
expected = expected.lower()
|
||||||
|
|
||||||
|
# Appliquer le mode de matching
|
||||||
|
if self.match_mode == 'exact':
|
||||||
|
return extracted.strip() == expected.strip()
|
||||||
|
|
||||||
|
elif self.match_mode == 'contains':
|
||||||
|
return expected in extracted
|
||||||
|
|
||||||
|
elif self.match_mode == 'starts_with':
|
||||||
|
return extracted.strip().startswith(expected)
|
||||||
|
|
||||||
|
elif self.match_mode == 'ends_with':
|
||||||
|
return extracted.strip().endswith(expected)
|
||||||
|
|
||||||
|
elif self.match_mode == 'regex':
|
||||||
|
try:
|
||||||
|
flags = 0 if self.case_sensitive else re.IGNORECASE
|
||||||
|
pattern = re.compile(expected, flags)
|
||||||
|
return bool(pattern.search(extracted))
|
||||||
|
except re.error:
|
||||||
|
print(f" ⚠️ Pattern regex invalide: {expected}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _get_bounding_box(self) -> Optional[Dict]:
|
||||||
|
"""Récupère le bounding_box de l'ancre si disponible."""
|
||||||
|
if isinstance(self.visual_anchor, dict):
|
||||||
|
return self.visual_anchor.get('bounding_box')
|
||||||
|
elif isinstance(self.visual_anchor, VWBVisualAnchor):
|
||||||
|
if self.visual_anchor.has_bounding_box():
|
||||||
|
return self.visual_anchor.bounding_box
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_anchor_name(self) -> str:
|
||||||
|
"""Récupère le nom de l'ancre."""
|
||||||
|
if not self.visual_anchor:
|
||||||
|
return "écran entier"
|
||||||
|
if isinstance(self.visual_anchor, dict):
|
||||||
|
return self.visual_anchor.get('name', self.visual_anchor.get('anchor_id', 'zone'))
|
||||||
|
elif isinstance(self.visual_anchor, VWBVisualAnchor):
|
||||||
|
return self.visual_anchor.name
|
||||||
|
return "zone"
|
||||||
|
|
||||||
|
def get_action_info(self) -> Dict[str, Any]:
|
||||||
|
"""Retourne les informations de l'action pour l'interface."""
|
||||||
|
return {
|
||||||
|
'action_id': self.action_id,
|
||||||
|
'name': self.name,
|
||||||
|
'description': self.description,
|
||||||
|
'type': 'verify_text_content',
|
||||||
|
'parameters': {
|
||||||
|
'anchor_name': self._get_anchor_name(),
|
||||||
|
'expected_text': self.expected_text,
|
||||||
|
'match_mode': self.match_mode,
|
||||||
|
'case_sensitive': self.case_sensitive,
|
||||||
|
'ocr_mode': self.ocr_mode,
|
||||||
|
'ollama_model': self.ollama_model
|
||||||
|
},
|
||||||
|
'status': self.current_status.value
|
||||||
|
}
|
||||||
1434
visual_workflow_builder/frontend/src/data/staticCatalog.ts
Normal file
1434
visual_workflow_builder/frontend/src/data/staticCatalog.ts
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user