feat(cognition): timing + écran attendu + auto-apprentissage Shadow + VLM qwen2.5vl
Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 13s
security-audit / pip-audit (CVE dépendances) (push) Successful in 10s
security-audit / Scan secrets (grep) (push) Successful in 8s
tests / Lint (ruff + black) (push) Successful in 14s
tests / Tests unitaires (sans GPU) (push) Failing after 15s
tests / Tests sécurité (critique) (push) Has been skipped

Mémoire de travail enrichie :
- Timing par étape (durée, moyenne, alerte si lent)
- Écran attendu vs observation réelle
- Contexte VLM étendu

VLM reasoning : default qwen2.5vl:3b (gemma4 ne supporte pas vision)

Auto-apprentissage Shadow :
- stream_processor apprend les dialogues automatiquement
- Clic utilisateur après dialogue → pattern mémorisé
- Sauvegardé dans data/learned_patterns.json

GUI-R1 : 10 patterns additionnels extraits du dataset

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-20 21:52:45 +02:00
parent 04a14a56b2
commit cbe8dc95d2
3 changed files with 189 additions and 2 deletions

View File

@@ -1791,6 +1791,10 @@ class StreamProcessor:
# Workflows construits (pour le matching) # Workflows construits (pour le matching)
self._workflows: Dict[str, Any] = {} self._workflows: Dict[str, Any] = {}
# Shadow learning : dernier pattern UI détecté par session
# Stocke {session_id: {"pattern": str, "ocr_text": str, "screen_state": obj, "shot_id": str}}
self._pending_ui_patterns: Dict[str, Dict[str, Any]] = {}
# Charger les workflows existants depuis le disque # Charger les workflows existants depuis le disque
self._load_persisted_workflows() self._load_persisted_workflows()
@@ -1975,6 +1979,9 @@ class StreamProcessor:
- key_combo/key_press avec uniquement des modificateurs seuls (ctrl, alt, shift, etc.) - key_combo/key_press avec uniquement des modificateurs seuls (ctrl, alt, shift, etc.)
- key_combo/key_press avec liste de touches vide - key_combo/key_press avec liste de touches vide
- text_input avec texte vide - text_input avec texte vide
Shadow learning : quand un clic suit un pattern UI détecté,
on apprend l'association dialogue→bouton.
""" """
if _is_parasitic_event(event_data): if _is_parasitic_event(event_data):
logger.debug( logger.debug(
@@ -1982,9 +1989,119 @@ class StreamProcessor:
f"type={event_data.get('type')}, data={event_data.get('keys', event_data.get('text', ''))}" f"type={event_data.get('type')}, data={event_data.get('keys', event_data.get('text', ''))}"
) )
return {"status": "event_filtered", "session_id": session_id, "reason": "parasitic"} return {"status": "event_filtered", "session_id": session_id, "reason": "parasitic"}
# Shadow learning : si un pattern UI est en attente et qu'on reçoit un clic
if event_data.get("type") == "mouse_click":
self._try_shadow_learn(session_id, event_data)
self.session_manager.add_event(session_id, event_data) self.session_manager.add_event(session_id, event_data)
return {"status": "event_recorded", "session_id": session_id} return {"status": "event_recorded", "session_id": session_id}
def _try_shadow_learn(self, session_id: str, click_event: Dict[str, Any]):
"""Tente d'apprendre un pattern UI depuis un clic observé en Shadow.
Quand un screenshot contenait un pattern UI détecté (dialogue) et que
l'utilisateur clique ensuite, on extrait le texte OCR au point de clic
pour apprendre l'association : "quand je vois ce texte → cliquer sur ce bouton".
"""
with self._data_lock:
pending = self._pending_ui_patterns.pop(session_id, None)
if not pending:
return
screen_state = pending.get("screen_state")
if screen_state is None:
return
# Extraire la position du clic (pixels absolus)
pos = click_event.get("pos", [])
if not pos or len(pos) != 2:
return
click_x, click_y = pos[0], pos[1]
# Trouver le texte OCR le plus proche du point de clic
# via les ui_elements du ScreenState (ils ont bbox + label)
clicked_label = self._find_label_at_position(screen_state, click_x, click_y)
if not clicked_label:
return
# Extraire le trigger principal du texte OCR du dialogue
ocr_text = pending.get("ocr_text", "")
# Utiliser un extrait court comme trigger (max 80 chars, premier segment pertinent)
trigger_text = ocr_text[:80].strip().lower()
if not trigger_text:
return
logger.info(
f"Shadow learning: pattern '{pending['pattern_name']}' "
f"→ utilisateur a cliqué '{clicked_label}' | trigger='{trigger_text[:40]}...'"
)
# Sauvegarder le pattern appris
try:
from core.knowledge.ui_patterns import UIPatternLibrary
lib = UIPatternLibrary()
lib.save_learned_pattern({
"category": "dialog",
"triggers": [trigger_text],
"action": "click",
"target": clicked_label,
"os": "windows",
"confidence": 0.8,
})
except Exception as e:
logger.warning(f"Shadow learning: échec sauvegarde pattern: {e}")
@staticmethod
def _find_label_at_position(screen_state, click_x: int, click_y: int) -> Optional[str]:
"""Trouve le label de l'élément UI le plus proche du point de clic.
Parcourt les ui_elements du ScreenState et retourne le label de
l'élément dont la bbox contient le point, ou le plus proche si aucun
ne contient exactement le point.
"""
ui_elements = getattr(screen_state, "ui_elements", [])
if not ui_elements:
return None
best_label = None
best_dist = float("inf")
for elem in ui_elements:
bbox = getattr(elem, "bbox", None)
label = getattr(elem, "label", "")
if not bbox or not label:
continue
# BBox = (x, y, width, height) — extraire les coordonnées
try:
bx, by = bbox.x, bbox.y
bw, bh = bbox.width, bbox.height
except AttributeError:
# Fallback si bbox est une liste/tuple
if hasattr(bbox, '__len__') and len(bbox) >= 4:
bx, by, bw, bh = bbox[0], bbox[1], bbox[2], bbox[3]
else:
continue
# Vérifier si le clic est dans la bbox
if bx <= click_x <= bx + bw and by <= click_y <= by + bh:
return label.strip()
# Sinon calculer la distance au centre
cx = bx + bw / 2
cy = by + bh / 2
dist = ((click_x - cx) ** 2 + (click_y - cy) ** 2) ** 0.5
if dist < best_dist:
best_dist = dist
best_label = label.strip()
# Ne retourner le plus proche que s'il est raisonnablement proche (< 100px)
if best_label and best_dist < 100:
return best_label
return None
# ========================================================================= # =========================================================================
# Screenshots # Screenshots
# ========================================================================= # =========================================================================
@@ -2055,6 +2172,19 @@ class StreamProcessor:
result["ui_pattern_action"] = pattern["action"] result["ui_pattern_action"] = pattern["action"]
result["ui_pattern_target"] = pattern["target"] result["ui_pattern_target"] = pattern["target"]
logger.info(f"Pattern UI détecté: {pattern['pattern']}{pattern['target']}") logger.info(f"Pattern UI détecté: {pattern['pattern']}{pattern['target']}")
# Shadow learning : mémoriser le pattern en attente du clic utilisateur
with self._data_lock:
self._pending_ui_patterns[session_id] = {
"pattern_name": pattern["pattern"],
"ocr_text": ocr_text,
"screen_state": screen_state,
"shot_id": shot_id,
}
else:
# Pas de pattern connu → effacer le pending (l'écran a changé)
with self._data_lock:
self._pending_ui_patterns.pop(session_id, None)
except ImportError: except ImportError:
pass pass
except Exception as e: except Exception as e:

View File

@@ -77,10 +77,15 @@ class CognitiveContext:
needs_help: bool = False needs_help: bool = False
help_reason: str = "" help_reason: str = ""
# Métadonnées # Timing
session_id: str = "" session_id: str = ""
machine_id: str = "" machine_id: str = ""
started_at: Optional[datetime] = None started_at: Optional[datetime] = None
step_started_at: Optional[datetime] = None
step_durations: Dict[str, List[float]] = field(default_factory=dict)
# Ce que Léa devrait voir à l'écran (comparaison attendu vs réel)
expected_screen: str = ""
def record_action(self, action_type: str, target: str = "", def record_action(self, action_type: str, target: str = "",
result: str = "", success: bool = True, result: str = "", success: bool = True,
@@ -117,10 +122,47 @@ class CognitiveContext:
def advance_step(self): def advance_step(self):
"""Passe à l'étape suivante du plan.""" """Passe à l'étape suivante du plan."""
# Enregistrer la durée de l'étape précédente
if self.step_started_at:
duration = (datetime.now() - self.step_started_at).total_seconds()
step_key = self.current_step_description or f"step_{self.current_step}"
self.step_durations.setdefault(step_key, []).append(duration)
self.current_step += 1 self.current_step += 1
self.step_started_at = datetime.now()
if self.remaining_steps: if self.remaining_steps:
self.current_step_description = self.remaining_steps.pop(0) self.current_step_description = self.remaining_steps.pop(0)
def get_step_timing(self) -> Optional[Dict[str, Any]]:
"""Retourne les infos de timing de l'étape en cours."""
if not self.step_started_at:
return None
elapsed = (datetime.now() - self.step_started_at).total_seconds()
step_key = self.current_step_description or f"step_{self.current_step}"
history = self.step_durations.get(step_key, [])
avg = sum(history) / len(history) if history else None
result = {"elapsed_seconds": elapsed}
if avg:
result["avg_previous"] = avg
result["is_slow"] = elapsed > avg * 2
return result
def set_expected_screen(self, description: str):
"""Définit ce que Léa devrait voir à l'écran pour cette étape."""
self.expected_screen = description
def check_screen_matches_expected(self) -> Optional[bool]:
"""Compare l'observation actuelle avec l'écran attendu."""
if not self.expected_screen or not self.current_observation:
return None
obs_text = (self.current_observation.window_title + " " +
self.current_observation.ocr_text).lower()
expected_words = self.expected_screen.lower().split()
matches = sum(1 for w in expected_words if w in obs_text)
return matches / max(len(expected_words), 1) > 0.3
def learn(self, fact: str): def learn(self, fact: str):
"""Enregistre un fait appris pendant l'exécution.""" """Enregistre un fait appris pendant l'exécution."""
if fact not in self.learned_facts: if fact not in self.learned_facts:
@@ -175,6 +217,21 @@ class CognitiveContext:
for step in self.remaining_steps[:3]: for step in self.remaining_steps[:3]:
lines.append(f" - {step}") lines.append(f" - {step}")
timing = self.get_step_timing()
if timing:
lines.append(f"TEMPS ÉTAPE : {timing['elapsed_seconds']:.1f}s")
if timing.get('avg_previous'):
lines.append(f"MOYENNE PRÉCÉDENTE : {timing['avg_previous']:.1f}s")
if timing.get('is_slow'):
lines.append("⚠ ÉTAPE ANORMALEMENT LENTE")
if self.expected_screen:
match = self.check_screen_matches_expected()
if match is False:
lines.append(f"⚠ ÉCRAN INATTENDU (attendu: {self.expected_screen})")
elif match is True:
lines.append(f"ÉCRAN CONFORME : {self.expected_screen}")
lines.append(f"CONFIANCE : {self.confidence:.0%}") lines.append(f"CONFIANCE : {self.confidence:.0%}")
if self.needs_help: if self.needs_help:

View File

@@ -287,7 +287,7 @@ Si l'écran est normal sans action nécessaire, réponds action="nothing".
Réponds UNIQUEMENT le JSON, pas d'explication.""" Réponds UNIQUEMENT le JSON, pas d'explication."""
ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434") ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434")
model = os.environ.get("RPA_VLM_MODEL", os.environ.get("VLM_MODEL", "gemma4:e4b")) model = os.environ.get("RPA_REASONING_MODEL", os.environ.get("RPA_VLM_MODEL", "qwen2.5vl:3b"))
response = requests.post( response = requests.post(
f"{ollama_url}/api/generate", f"{ollama_url}/api/generate",