refactor: factorisation input_handler partagé + page cartographie processus
Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 12s
security-audit / pip-audit (CVE dépendances) (push) Successful in 11s
security-audit / Scan secrets (grep) (push) Successful in 8s
tests / Lint (ruff + black) (push) Successful in 14s
tests / Tests unitaires (sans GPU) (push) Failing after 14s
tests / Tests sécurité (critique) (push) Has been skipped
Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 12s
security-audit / pip-audit (CVE dépendances) (push) Successful in 11s
security-audit / Scan secrets (grep) (push) Successful in 8s
tests / Lint (ruff + black) (push) Successful in 14s
tests / Tests unitaires (sans GPU) (push) Failing after 14s
tests / Tests sécurité (critique) (push) Has been skipped
core/execution/input_handler.py (NOUVEAU) : - safe_type_text() : setxkbmap fr + xdotool, partagé entre les 2 executors - check_screen_for_patterns() : détection dialogues UI via OCR - handle_detected_pattern() : clic bouton par OCR (mot exact, le plus bas) - post_execution_cleanup() : vérification post-workflow VWB executor : suppression du code dupliqué, alias vers input_handler Core executor : pyautogui.write() remplacé par safe_type_text() Page dashboard "Cartographie des processus" : - GET /process-mining : vue analyse des flux de travail - POST /api/process-mining/discover : génère BPMN + indicateurs - 4 cartes indicateurs, diagramme, points d'attention, variantes - Dark theme, français, zéro jargon technique - Onglet ajouté dans la navigation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -24,107 +24,17 @@ from . import api_v3_bp
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_CHAR_TO_KEYSYM = {
|
||||
' ': 'space', '\t': 'Tab', '\n': 'Return',
|
||||
'!': 'exclam', '"': 'quotedbl', '#': 'numbersign', '$': 'dollar',
|
||||
'%': 'percent', '&': 'ampersand', "'": 'apostrophe',
|
||||
'(': 'parenleft', ')': 'parenright', '*': 'asterisk', '+': 'plus',
|
||||
',': 'comma', '-': 'minus', '.': 'period', '/': 'slash',
|
||||
':': 'colon', ';': 'semicolon', '<': 'less', '=': 'equal',
|
||||
'>': 'greater', '?': 'question', '@': 'at',
|
||||
'[': 'bracketleft', '\\': 'backslash', ']': 'bracketright',
|
||||
'^': 'asciicircum', '_': 'underscore', '`': 'grave',
|
||||
'{': 'braceleft', '|': 'bar', '}': 'braceright', '~': 'asciitilde',
|
||||
}
|
||||
from core.execution.input_handler import (
|
||||
safe_type_text as _shared_safe_type_text,
|
||||
check_screen_for_patterns as _shared_check_patterns,
|
||||
handle_detected_pattern as _shared_handle_pattern,
|
||||
post_execution_cleanup as _shared_post_cleanup,
|
||||
)
|
||||
|
||||
|
||||
def _xdotool_type_by_keysym(text):
|
||||
"""Tape du texte via xdotool — hybride rapide + fiable.
|
||||
|
||||
Les caractères alphanumériques passent par xdotool type (un seul appel,
|
||||
rapide). Les caractères spéciaux (:, /, @, etc.) passent par xdotool key
|
||||
avec les noms de keysym X11 pour éviter les erreurs AZERTY dans les VM.
|
||||
"""
|
||||
segments = []
|
||||
buf = []
|
||||
|
||||
def flush_buf():
|
||||
if buf:
|
||||
segments.append(('type', ''.join(buf)))
|
||||
buf.clear()
|
||||
|
||||
for ch in text:
|
||||
if ch in _CHAR_TO_KEYSYM:
|
||||
flush_buf()
|
||||
segments.append(('key', _CHAR_TO_KEYSYM[ch]))
|
||||
else:
|
||||
buf.append(ch)
|
||||
flush_buf()
|
||||
|
||||
for kind, value in segments:
|
||||
if kind == 'type':
|
||||
subprocess.run(
|
||||
['xdotool', 'type', '--delay', '0', '--clearmodifiers', '--', value],
|
||||
timeout=10, check=True
|
||||
)
|
||||
else:
|
||||
subprocess.run(
|
||||
['xdotool', 'key', '--clearmodifiers', value],
|
||||
timeout=2, check=True
|
||||
)
|
||||
time.sleep(0.02)
|
||||
|
||||
|
||||
def safe_type_text(text):
|
||||
"""Saisie de texte compatible VM/Citrix et claviers AZERTY/QWERTY.
|
||||
|
||||
Priorité :
|
||||
1. xdotool type avec refresh layout → traverse les VM spice/QEMU
|
||||
2. Presse-papier (xclip) + Ctrl+V → fallback
|
||||
3. pyautogui.write() → dernier recours
|
||||
"""
|
||||
import shutil
|
||||
import pyautogui
|
||||
|
||||
# Méthode 1 : xdotool type avec refresh du layout clavier
|
||||
# setxkbmap fr AVANT xdotool force X11 à recharger le keymap
|
||||
# → xdotool utilise les bons keycodes AZERTY
|
||||
if shutil.which('xdotool') and shutil.which('setxkbmap'):
|
||||
try:
|
||||
subprocess.run(['setxkbmap', 'fr'], timeout=2)
|
||||
subprocess.run(
|
||||
['xdotool', 'type', '--delay', '0', '--clearmodifiers', '--', text],
|
||||
timeout=max(30, len(text) * 0.05),
|
||||
check=True
|
||||
)
|
||||
print(f" ✅ Saisie via xdotool type ({len(text)} car.)")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f" ⚠️ xdotool type échoué: {e}")
|
||||
|
||||
# Méthode 2 : Presse-papier (fonctionne en local, pas toujours en VM)
|
||||
xclip = shutil.which('xclip')
|
||||
if xclip:
|
||||
try:
|
||||
p = subprocess.Popen(
|
||||
['xclip', '-selection', 'clipboard'],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL
|
||||
)
|
||||
p.stdin.write(text.encode('utf-8'))
|
||||
p.stdin.close()
|
||||
time.sleep(0.2)
|
||||
pyautogui.hotkey('ctrl', 'v')
|
||||
time.sleep(0.3)
|
||||
print(f" ✅ Saisie via presse-papier ({len(text)} car.)")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f" ⚠️ xclip échoué: {e}")
|
||||
|
||||
# Méthode 3 : pyautogui (dernier recours)
|
||||
print(" ⚠️ Saisie via pyautogui.write()")
|
||||
pyautogui.write(text)
|
||||
safe_type_text = _shared_safe_type_text
|
||||
_check_screen_for_patterns = _shared_check_patterns
|
||||
_handle_detected_pattern = _shared_handle_pattern
|
||||
|
||||
|
||||
def minimize_active_window():
|
||||
@@ -179,151 +89,6 @@ _execution_state = {
|
||||
}
|
||||
|
||||
|
||||
def _check_screen_for_patterns() -> Optional[Dict[str, Any]]:
|
||||
"""Vérifie si l'écran actuel contient un pattern UI connu (dialogue, popup).
|
||||
|
||||
Capture l'écran, extrait le texte via OCR léger, et cherche
|
||||
un pattern dans la UIPatternLibrary.
|
||||
|
||||
Returns:
|
||||
Dict avec le pattern trouvé et l'action à effectuer, ou None.
|
||||
"""
|
||||
try:
|
||||
from core.knowledge.ui_patterns import UIPatternLibrary
|
||||
import mss
|
||||
from PIL import Image
|
||||
import numpy as np
|
||||
|
||||
lib = UIPatternLibrary()
|
||||
# Debug: vérifier les triggers du dialog_save
|
||||
save_patterns = [p for p in lib._patterns if p.name == 'dialog_save']
|
||||
if save_patterns:
|
||||
print(f" 🔎 [Pattern] dialog_save triggers: {save_patterns[0].triggers}")
|
||||
|
||||
with mss.mss() as sct:
|
||||
monitor = sct.monitors[1]
|
||||
screenshot = sct.grab(monitor)
|
||||
screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX')
|
||||
|
||||
try:
|
||||
from services.ocr_service import ocr_extract_text
|
||||
ocr_text = ocr_extract_text(screen)
|
||||
except ImportError:
|
||||
return None
|
||||
|
||||
if not ocr_text or len(ocr_text) < 5:
|
||||
print(f" 🔎 [Pattern] OCR vide ou trop court ({len(ocr_text) if ocr_text else 0} chars)")
|
||||
return None
|
||||
|
||||
print(f" 🔎 [Pattern] OCR ({len(ocr_text)} chars): {ocr_text[:500]}")
|
||||
|
||||
pattern = lib.find_pattern(ocr_text)
|
||||
if pattern:
|
||||
print(f" 🔎 [Pattern] Match: {pattern['pattern']} (category={pattern['category']})")
|
||||
if pattern['category'] in ('dialog', 'popup'):
|
||||
print(f"🧠 [Pattern] DÉTECTÉ: {pattern['pattern']} → {pattern['action']} '{pattern['target']}'")
|
||||
return pattern
|
||||
else:
|
||||
print(f" 🔎 [Pattern] Ignoré (catégorie {pattern['category']})")
|
||||
return None
|
||||
else:
|
||||
print(f" 🔎 [Pattern] Aucun match dans le texte OCR")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f" 🔎 [Pattern] EXCEPTION: {e}")
|
||||
traceback.print_exc()
|
||||
return None
|
||||
|
||||
|
||||
def _handle_detected_pattern(pattern: Dict[str, Any]) -> bool:
|
||||
"""Gère automatiquement un pattern UI détecté.
|
||||
|
||||
Cherche le bouton cible via OCR (position réelle sur l'écran),
|
||||
avec fallback sur les coordonnées typiques si l'OCR ne trouve pas.
|
||||
"""
|
||||
import pyautogui
|
||||
|
||||
action = pattern.get('action')
|
||||
target = pattern.get('target', '')
|
||||
alternatives = pattern.get('alternatives', [])
|
||||
|
||||
if action == 'click':
|
||||
candidates = [target] + alternatives
|
||||
|
||||
# Chercher le bouton via OCR sur l'écran actuel
|
||||
try:
|
||||
import mss
|
||||
from PIL import Image
|
||||
from services.ocr_service import ocr_extract_words
|
||||
|
||||
with mss.mss() as sct:
|
||||
monitor = sct.monitors[1]
|
||||
screenshot = sct.grab(monitor)
|
||||
screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX')
|
||||
|
||||
words = ocr_extract_words(screen)
|
||||
|
||||
# Collecter TOUS les matchs, puis prendre le plus bas (boutons = bas du dialogue)
|
||||
all_matches = []
|
||||
|
||||
for candidate in candidates:
|
||||
candidate_lower = candidate.lower()
|
||||
for word in words:
|
||||
word_text = word['text'].lower()
|
||||
if len(word_text) < 2 or len(candidate_lower) < 2:
|
||||
continue
|
||||
if word_text == candidate_lower:
|
||||
x1, y1, x2, y2 = word['bbox']
|
||||
all_matches.append({
|
||||
'text': word['text'],
|
||||
'x': int((x1 + x2) / 2),
|
||||
'y': int((y1 + y2) / 2),
|
||||
'match_type': 'exact',
|
||||
})
|
||||
|
||||
# Recherche partielle (ex: "nregistrer" sans le E souligné)
|
||||
if not all_matches:
|
||||
for candidate in candidates:
|
||||
if len(candidate) > 3:
|
||||
partial = candidate[1:].lower()
|
||||
for word in words:
|
||||
if partial in word['text'].lower():
|
||||
x1, y1, x2, y2 = word['bbox']
|
||||
all_matches.append({
|
||||
'text': word['text'],
|
||||
'x': int((x1 + x2) / 2),
|
||||
'y': int((y1 + y2) / 2),
|
||||
'match_type': 'partial',
|
||||
})
|
||||
|
||||
if all_matches:
|
||||
for m in all_matches:
|
||||
print(f" 🔎 [Pattern] Candidat: '{m['text']}' à ({m['x']}, {m['y']}) [{m['match_type']}]")
|
||||
|
||||
best = max(all_matches, key=lambda m: m['y'])
|
||||
print(f"🤖 [Pattern] Clic sur '{best['text']}' à ({best['x']}, {best['y']}) [le plus bas = bouton]")
|
||||
pyautogui.click(best['x'], best['y'])
|
||||
time.sleep(1.0)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f" 🔎 [Pattern] OCR bouton échoué: {e}")
|
||||
|
||||
print(f" 🔎 [Pattern] Bouton '{target}' introuvable par OCR — pas de clic")
|
||||
return False
|
||||
|
||||
elif action == 'hotkey':
|
||||
keys = target.split('+')
|
||||
print(f"🤖 [Pattern] Raccourci automatique: {target}")
|
||||
pyautogui.hotkey(*keys)
|
||||
time.sleep(0.5)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def execute_workflow_thread(execution_id: str, workflow_id: str, app):
|
||||
"""
|
||||
Thread d'exécution du workflow.
|
||||
|
||||
Reference in New Issue
Block a user