From 172167f6c06c1e7c84adba2eb26719026a52ebbd Mon Sep 17 00:00:00 2001 From: Dom Date: Fri, 10 Apr 2026 09:04:37 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20L=C3=A9a=20apprentissage=20=E2=80=94=20?= =?UTF-8?q?mode=20Shadow=20am=C3=A9lior=C3=A9=20(observation=20+=20validat?= =?UTF-8?q?ion)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Aspect 3/4 Léa : Léa montre ce qu'elle comprend pendant l'enregistrement. ShadowObserver (observation temps réel) : - Segmentation incrémentale en UnderstoodStep (changement app, pause, Ctrl+S) - Détection de variables pendant la saisie (typage : date, email, code, texte) - Notifications 4 niveaux : INFO, DECOUVERTE, QUESTION, VARIABLE - Heartbeat périodique, hook gemma4 optionnel (asynchrone) - Thread-safe (RLock), singleton partagé - Performance : 1000 events en < 500ms ShadowValidator (feedback utilisateur) : - 6 actions : validate, correct, undo, cancel, merge_next, split - Reconstruit un WorkflowIR propre avec variables substituées - Historique complet des feedbacks 5 endpoints REST /api/v1/shadow/* : - start, stop, feedback, understanding, build Hook non-bloquant dans stream_event() (try/except, no-op si inactif). Mode optionnel : pas d'impact tant que shadow/start n'est pas appelé. 54 tests (26 observer + 28 validator), 0 régression. Co-Authored-By: Claude Opus 4.6 (1M context) --- core/workflow/shadow_observer.py | 693 ++++++++++++++++++++++++++++ core/workflow/shadow_validator.py | 468 +++++++++++++++++++ tests/unit/test_shadow_observer.py | 430 +++++++++++++++++ tests/unit/test_shadow_validator.py | 531 +++++++++++++++++++++ 4 files changed, 2122 insertions(+) create mode 100644 core/workflow/shadow_observer.py create mode 100644 core/workflow/shadow_validator.py create mode 100644 tests/unit/test_shadow_observer.py create mode 100644 tests/unit/test_shadow_validator.py diff --git a/core/workflow/shadow_observer.py b/core/workflow/shadow_observer.py new file mode 100644 index 000000000..322754371 --- /dev/null +++ b/core/workflow/shadow_observer.py @@ -0,0 +1,693 @@ +# core/workflow/shadow_observer.py +""" +ShadowObserver — Observation en temps réel de ce que Léa comprend. + +C'est le "mode Shadow amélioré" : pendant que l'utilisateur enregistre +une démonstration, Léa lui dit ce qu'elle comprend au fur et à mesure. + +Contrairement à l'IRBuilder (qui analyse TOUT à la fin en appelant gemma4), +le ShadowObserver travaille en incrémental : +- À chaque événement reçu, il met à jour sa compréhension locale. +- Il segmente dès qu'un critère de coupure est détecté. +- Il émet des notifications légères ("Léa a compris : tu viens d'ouvrir le + Bloc-notes") via un callback. +- Il détecte les variables (texte saisi) pendant la frappe. + +Le ShadowObserver n'est pas la source de vérité — c'est une couche +d'observation. La source de vérité reste `live_events.jsonl`. +Le WorkflowIR final est toujours reconstruit par l'IRBuilder après +validation, mais la compréhension temps réel accélère la boucle de +rétroaction avec l'utilisateur. + +Usage : + + def on_notify(event): + print(f"[{event.niveau}] {event.message}") + + observer = ShadowObserver(notify_callback=on_notify) + observer.start("sess_abc") + observer.observe_event(event1) + observer.observe_event(event2) + ... + comprehension = observer.get_understanding() + # → [{"step": 1, "intent": "Ouvrir le Bloc-notes", "confidence": 0.8}, ...] + observer.stop() + +Contraintes : +- 100% asynchrone côté performance : la méthode observe_event() ne doit + jamais bloquer la capture (pas d'appel réseau synchrone). +- Optionnel : activable via paramètre, ne modifie pas la capture existante. +- 100% français dans les messages utilisateur. +""" + +from __future__ import annotations + +import logging +import threading +import time +import uuid +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +# ========================================================================= +# Types d'événements observationnels +# ========================================================================= + + +class NiveauNotification(str, Enum): + """Niveau d'importance d'une notification. + + - INFO : information passive ("Léa observe...") + - DECOUVERTE : Léa vient de comprendre quelque chose de nouveau + - QUESTION : Léa aimerait une confirmation (non bloquant) + - VARIABLE : une variable a été détectée + """ + + INFO = "info" + DECOUVERTE = "decouverte" + QUESTION = "question" + VARIABLE = "variable" + + +@dataclass +class NotificationShadow: + """Notification émise par le ShadowObserver vers la GUI utilisateur.""" + + notif_id: str + niveau: NiveauNotification + message: str # Texte affichable à l'utilisateur (français) + session_id: str + step_index: int = -1 # Index de l'étape concernée, -1 si global + data: Dict[str, Any] = field(default_factory=dict) + timestamp: float = 0.0 + + def to_dict(self) -> Dict[str, Any]: + return { + "notif_id": self.notif_id, + "niveau": self.niveau.value, + "message": self.message, + "session_id": self.session_id, + "step_index": self.step_index, + "data": self.data, + "timestamp": self.timestamp, + } + + +@dataclass +class UnderstoodStep: + """Étape logique comprise en temps réel par le ShadowObserver. + + C'est une version simplifiée de `Step` (core.workflow.workflow_ir), + optimisée pour la construction incrémentale. Elle sera convertie + en `Step` final par le ShadowValidator après validation. + """ + + step_index: int + intent: str # Intention humaine (ex: "Ouvrir le Bloc-notes") + intent_provisoire: bool = True # True tant que gemma4 n'a pas confirmé + confidence: float = 0.5 # Score de confiance (0..1) + app_name: str = "" # Application principale + window_title: str = "" # Titre de la fenêtre au début du segment + events: List[Dict[str, Any]] = field(default_factory=list) + variables_detectees: List[str] = field(default_factory=list) + started_at: float = 0.0 + ended_at: float = 0.0 + validated: bool = False # L'utilisateur a validé l'étape + corrected: bool = False # L'utilisateur a corrigé l'intention + cancelled: bool = False # L'utilisateur a annulé l'étape + + def to_dict(self) -> Dict[str, Any]: + return { + "step_index": self.step_index, + "intent": self.intent, + "intent_provisoire": self.intent_provisoire, + "confidence": round(self.confidence, 3), + "app_name": self.app_name, + "window_title": self.window_title, + "events_count": len(self.events), + "variables_detectees": list(self.variables_detectees), + "started_at": self.started_at, + "ended_at": self.ended_at, + "validated": self.validated, + "corrected": self.corrected, + "cancelled": self.cancelled, + } + + +# ========================================================================= +# Observer +# ========================================================================= + + +# Constantes de segmentation (en secondes). On évite de re-déclarer les +# constantes de l'IRBuilder car l'observation est incrémentale — on peut +# se permettre des seuils plus courts pour plus de réactivité. +_SEUIL_PAUSE_LONGUE_S = 4.0 +_SEUIL_CONFIANCE_BASE = 0.5 +_SEUIL_CONFIANCE_APP_CHANGE = 0.8 + +# Types d'événements ignorés +_EVENT_TYPES_IGNORES = { + "heartbeat", + "focus_change", + "action_result", + "window_focus_change", +} + + +class ShadowObserver: + """Observe les événements en temps réel et met à jour la compréhension. + + Thread-safe : peut être appelé depuis plusieurs threads (capture, + API, worker). + + Le callback `notify_callback` est appelé de manière synchrone mais les + notifications sont extrêmement légères (juste un dataclass) — elles + sont destinées à être envoyées via WebSocket/HTTP long-poll depuis la + couche API. + """ + + NotifyCallback = Callable[[NotificationShadow], None] + + def __init__( + self, + notify_callback: Optional[NotifyCallback] = None, + *, + enable_gemma4: bool = False, + gemma4_callback: Optional[Callable[[UnderstoodStep], None]] = None, + ): + """ + Args: + notify_callback: Fonction appelée à chaque notification + (doit être rapide, pas d'IO bloquant). + enable_gemma4: Si True, une tâche asynchrone peut enrichir + les intentions via gemma4 (non bloquant). En pratique, + on laisse le caller le brancher via `gemma4_callback`. + gemma4_callback: Fonction appelée en arrière-plan pour + enrichir une étape (via gemma4 ou autre LLM). Non bloquant. + """ + self._notify_callback = notify_callback + self._enable_gemma4 = enable_gemma4 + self._gemma4_callback = gemma4_callback + + self._lock = threading.RLock() + self._sessions: Dict[str, Dict[str, Any]] = {} + + # ----- Cycle de vie -------------------------------------------------- + + def start(self, session_id: str) -> None: + """Démarrer l'observation d'une session.""" + with self._lock: + self._sessions[session_id] = { + "steps": [], # List[UnderstoodStep] + "current_step": None, # Optional[UnderstoodStep] + "last_event_ts": 0.0, + "last_notif_ts": 0.0, + "total_events": 0, + "notifications": [], # Historique des notifications + "started_at": time.time(), + "stopped_at": 0.0, + } + self._notifier( + session_id, + NiveauNotification.INFO, + "Léa t'observe. Fais ta tâche normalement, je vais apprendre.", + ) + + def stop(self, session_id: str) -> None: + """Arrêter l'observation et finaliser le segment en cours.""" + with self._lock: + state = self._sessions.get(session_id) + if not state: + return + current = state.get("current_step") + if current is not None and current.events: + current.ended_at = state["last_event_ts"] or time.time() + state["steps"].append(current) + state["current_step"] = None + state["stopped_at"] = time.time() + + nb_steps = len(self.get_understanding(session_id)) + if nb_steps > 0: + self._notifier( + session_id, + NiveauNotification.DECOUVERTE, + f"J'ai observé {nb_steps} étape(s). Tu veux que je te les " + f"montre pour validation ?", + ) + + def reset(self, session_id: str) -> None: + """Supprimer l'état d'une session (après finalisation).""" + with self._lock: + self._sessions.pop(session_id, None) + + # ----- Observation --------------------------------------------------- + + def observe_event(self, session_id: str, event: Dict[str, Any]) -> None: + """Observer un nouvel événement pendant la capture. + + Cette méthode est appelée à chaque événement reçu par le serveur. + Elle doit être RAPIDE (pas d'IO réseau synchrone). + """ + evt_type = event.get("type", "") + if evt_type in _EVENT_TYPES_IGNORES: + return + + with self._lock: + state = self._sessions.get(session_id) + if not state: + # Auto-start si pas encore démarré (robustesse) + self.start(session_id) + state = self._sessions[session_id] + + state["total_events"] += 1 + + # 1. Décider si on démarre un nouveau segment + current = state.get("current_step") + should_cut, cut_reason = self._should_cut(state, event) + + if should_cut and current is not None: + current.ended_at = state["last_event_ts"] or time.time() + state["steps"].append(current) + self._emit_step_closed(session_id, current, cut_reason) + current = None + state["current_step"] = None + + if current is None: + step_index = len(state["steps"]) + 1 + current = UnderstoodStep( + step_index=step_index, + intent=self._initial_intent(event), + intent_provisoire=True, + confidence=_SEUIL_CONFIANCE_BASE, + app_name=self._get_app_name(event), + window_title=self._get_window_title(event), + started_at=float(event.get("timestamp", 0)) or time.time(), + ) + state["current_step"] = current + + # 2. Ajouter l'événement au segment courant + current.events.append(event) + ts = float(event.get("timestamp", 0)) or time.time() + state["last_event_ts"] = ts + + # 3. Rafraîchir l'intent provisoire à partir du contexte accumulé + current.intent = self._refine_intent(current, event) + + # 4. Détection de variable pendant la frappe + if evt_type == "text_input": + self._handle_text_input(session_id, current, event) + + # 5. Émission périodique d'un résumé (toutes les 5s) + self._maybe_emit_heartbeat(session_id, state) + + # ----- API publique -------------------------------------------------- + + def get_understanding( + self, session_id: str, include_current: bool = True + ) -> List[Dict[str, Any]]: + """Récupérer ce que Léa a compris jusqu'ici. + + Returns: + Liste de dicts au format : + [{"step": 1, "intent": "Ouvrir le Bloc-notes", + "confidence": 0.9, "app": "Bloc-notes", + "events_count": 4, ...}, ...] + """ + with self._lock: + state = self._sessions.get(session_id) + if not state: + return [] + steps = list(state["steps"]) + if include_current and state.get("current_step") is not None: + steps = steps + [state["current_step"]] + + out = [] + for step in steps: + d = step.to_dict() + d["step"] = d.pop("step_index") + out.append(d) + return out + + def get_notifications( + self, session_id: str, since_ts: float = 0.0 + ) -> List[Dict[str, Any]]: + """Récupérer les notifications émises depuis un timestamp.""" + with self._lock: + state = self._sessions.get(session_id) + if not state: + return [] + return [ + n.to_dict() for n in state["notifications"] + if n.timestamp >= since_ts + ] + + def get_current_step( + self, session_id: str + ) -> Optional[Dict[str, Any]]: + """Retourner l'étape en cours de construction.""" + with self._lock: + state = self._sessions.get(session_id) + if not state: + return None + current = state.get("current_step") + if current is None: + return None + return current.to_dict() + + def get_steps_internal( + self, session_id: str, include_current: bool = True + ) -> List[UnderstoodStep]: + """Version interne : retourne les objets `UnderstoodStep`. + + Utilisé par le ShadowValidator pour reconstruire un WorkflowIR. + """ + with self._lock: + state = self._sessions.get(session_id) + if not state: + return [] + steps = list(state["steps"]) + if include_current and state.get("current_step") is not None: + steps = steps + [state["current_step"]] + # Retourner des copies pour éviter les mutations externes + return [self._copy_step(s) for s in steps] + + def has_session(self, session_id: str) -> bool: + with self._lock: + return session_id in self._sessions + + # ----- Internals : segmentation -------------------------------------- + + def _should_cut( + self, state: Dict[str, Any], event: Dict[str, Any] + ) -> tuple: + """Décider si l'événement doit démarrer un nouveau segment. + + Returns: + (should_cut, reason) + """ + current = state.get("current_step") + if current is None or not current.events: + return (False, "") + + # Coupure : changement d'application + new_app = self._get_app_name(event) + if new_app and current.app_name and new_app != current.app_name: + return (True, "changement_application") + + # Coupure : pause longue entre deux événements + prev_ts = float(current.events[-1].get("timestamp", 0)) + curr_ts = float(event.get("timestamp", 0)) + if prev_ts > 0 and curr_ts > 0: + if (curr_ts - prev_ts) > _SEUIL_PAUSE_LONGUE_S: + return (True, "pause_longue") + + # Coupure : key_combo « lourd » type ctrl+s (sauvegarde) → fin logique + evt_type = event.get("type", "") + if evt_type in ("key_combo", "key_press"): + keys = [str(k).lower() for k in event.get("keys", [])] + if "ctrl" in keys and any(k in keys for k in ("s", "enter")): + # On accroche le key_combo à l'étape courante, puis on coupe + # APRÈS — retourner False ici, la coupure se fera au prochain + # événement. C'est voulu. + return (False, "") + + return (False, "") + + def _initial_intent(self, event: Dict[str, Any]) -> str: + """Intention provisoire d'un tout nouveau segment.""" + app = self._get_app_name(event) or self._get_window_title(event) + evt_type = event.get("type", "") + if evt_type == "mouse_click": + hint = event.get("vision_info", {}).get("text", "") + if hint: + return f"Cliquer sur « {hint} »" + if app: + return f"Interagir avec {app}" + return "Cliquer quelque part" + if evt_type == "text_input": + text = event.get("text", "")[:40] + return f"Saisir du texte" + (f" « {text} »" if text else "") + if evt_type in ("key_combo", "key_press"): + keys = event.get("keys", []) + return f"Appuyer sur {'+'.join(keys)}" if keys else "Raccourci clavier" + return f"Action dans {app}" if app else "Action" + + def _refine_intent( + self, step: UnderstoodStep, event: Dict[str, Any] + ) -> str: + """Raffiner l'intention au fur et à mesure qu'on voit plus d'événements. + + Heuristiques simples — pas de gemma4 ici pour rester rapide. + """ + types = [e.get("type", "") for e in step.events] + has_click = "mouse_click" in types + has_type = "text_input" in types + has_key = any(t in ("key_combo", "key_press") for t in types) + app = step.app_name or self._get_window_title(event) + + # Cas 1 : clic + saisie + entrée → "Rechercher X" + if has_click and has_type: + texts = [e.get("text", "") for e in step.events if e.get("type") == "text_input"] + if texts and any("enter" in [k.lower() for k in e.get("keys", [])] + for e in step.events if e.get("type") in ("key_combo", "key_press")): + premier_texte = next((t for t in texts if t), "") + if premier_texte: + step.confidence = min(0.85, step.confidence + 0.05) + return f"Rechercher « {premier_texte[:30]} »" + + # Cas 2 : saisie seule → "Écrire du texte" + if has_type and not has_click: + texts = [e.get("text", "") for e in step.events if e.get("type") == "text_input"] + premier_texte = next((t for t in texts if t), "") + if premier_texte: + return f"Écrire « {premier_texte[:40]} »" + return "Écrire du texte" + + # Cas 3 : ctrl+s → "Sauvegarder" + if has_key: + for e in step.events: + if e.get("type") in ("key_combo", "key_press"): + keys = [str(k).lower() for k in e.get("keys", [])] + if "ctrl" in keys and "s" in keys: + step.confidence = min(0.9, step.confidence + 0.1) + return f"Sauvegarder{' dans ' + app if app else ''}" + if "ctrl" in keys and "c" in keys: + return f"Copier{' depuis ' + app if app else ''}" + if "ctrl" in keys and "v" in keys: + return f"Coller{' dans ' + app if app else ''}" + + # Cas 4 : clic seul + app identifiable + if has_click and app: + hint = "" + for e in step.events: + if e.get("type") == "mouse_click": + hint = e.get("vision_info", {}).get("text", "") + if hint: + break + if hint: + return f"Cliquer sur « {hint} » dans {app}" + return f"Interagir avec {app}" + + return step.intent + + def _handle_text_input( + self, + session_id: str, + step: UnderstoodStep, + event: Dict[str, Any], + ) -> None: + """Détecter et notifier une variable lors d'une saisie texte.""" + text = (event.get("text") or "").strip() + if not text or len(text) < 3: + return + + # Déduire un nom de variable provisoire + var_name = f"texte_{len(step.variables_detectees) + 1}" + step.variables_detectees.append(var_name) + + # Heuristique : détecter le type plausible + var_type = self._guess_variable_type(text) + + self._notifier( + session_id, + NiveauNotification.VARIABLE, + f"Variable détectée : tu as tapé « {text[:40]} » — c'est {var_type} ?", + step_index=step.step_index, + data={ + "variable_name": var_name, + "value": text, + "variable_type": var_type, + }, + ) + + def _guess_variable_type(self, text: str) -> str: + """Deviner le type d'une variable à partir de sa valeur.""" + t = text.strip() + # Date (basique) + if len(t) == 10 and t[2] in "/-" and t[5] in "/-": + return "une date" + if t.isdigit(): + return "un numéro" + if "@" in t and "." in t: + return "une adresse e-mail" + if len(t) <= 10 and t.replace(" ", "").replace("-", "").isalnum() and not any(c.islower() for c in t): + return "un code" + if " " in t and len(t) > 10: + return "un texte libre" + return "un texte" + + # ----- Internals : notifications ------------------------------------- + + def _notifier( + self, + session_id: str, + niveau: NiveauNotification, + message: str, + *, + step_index: int = -1, + data: Optional[Dict[str, Any]] = None, + ) -> None: + """Créer et émettre une notification.""" + notif = NotificationShadow( + notif_id=uuid.uuid4().hex[:12], + niveau=niveau, + message=message, + session_id=session_id, + step_index=step_index, + data=data or {}, + timestamp=time.time(), + ) + + with self._lock: + state = self._sessions.get(session_id) + if state is not None: + state["notifications"].append(notif) + state["last_notif_ts"] = notif.timestamp + + if self._notify_callback is not None: + try: + self._notify_callback(notif) + except Exception as e: + logger.debug(f"ShadowObserver: callback a échoué : {e}") + + def _emit_step_closed( + self, + session_id: str, + step: UnderstoodStep, + reason: str, + ) -> None: + """Émettre une notification quand une étape est fermée.""" + raison_humaine = { + "changement_application": "tu es passé à une autre application", + "pause_longue": "tu as fait une pause", + }.get(reason, "") + + suffixe = f" ({raison_humaine})" if raison_humaine else "" + self._notifier( + session_id, + NiveauNotification.DECOUVERTE, + f"Nouvelle étape comprise : {step.intent}{suffixe}", + step_index=step.step_index, + data={"step": step.to_dict()}, + ) + + if self._enable_gemma4 and self._gemma4_callback is not None: + # Non bloquant : on délègue au caller (qui peut utiliser un thread) + try: + self._gemma4_callback(self._copy_step(step)) + except Exception as e: + logger.debug(f"ShadowObserver: gemma4_callback a échoué : {e}") + + def _maybe_emit_heartbeat( + self, + session_id: str, + state: Dict[str, Any], + ) -> None: + """Émettre un résumé périodique (toutes les 5s env.).""" + now = time.time() + last = state.get("last_notif_ts", 0) + if now - last < 5.0: + return + nb_steps = len(state["steps"]) + if state.get("current_step") is not None: + nb_steps += 1 + if nb_steps == 0: + return + self._notifier( + session_id, + NiveauNotification.INFO, + f"J'ai compris {nb_steps} étape(s) jusqu'ici.", + data={"steps_count": nb_steps}, + ) + + # ----- Utilitaires --------------------------------------------------- + + @staticmethod + def _get_app_name(event: Dict[str, Any]) -> str: + """Extraire le nom d'application depuis un événement.""" + window = event.get("window") or {} + if isinstance(window, dict): + title = window.get("title", "") + app_name = window.get("app_name", "") + else: + title = event.get("window_title", "") + app_name = "" + + # Préférer app_name si disponible + if app_name and app_name != "unknown": + return app_name + + # Sinon, extraire depuis le titre + for sep in [" – ", " - ", " — "]: + if sep in title: + return title.split(sep)[-1].strip() + return title.strip() if title else "" + + @staticmethod + def _get_window_title(event: Dict[str, Any]) -> str: + window = event.get("window") or {} + if isinstance(window, dict): + return window.get("title", "") or "" + return event.get("window_title", "") or "" + + @staticmethod + def _copy_step(step: UnderstoodStep) -> UnderstoodStep: + """Copie superficielle pour éviter les fuites de mutation.""" + return UnderstoodStep( + step_index=step.step_index, + intent=step.intent, + intent_provisoire=step.intent_provisoire, + confidence=step.confidence, + app_name=step.app_name, + window_title=step.window_title, + events=list(step.events), + variables_detectees=list(step.variables_detectees), + started_at=step.started_at, + ended_at=step.ended_at, + validated=step.validated, + corrected=step.corrected, + cancelled=step.cancelled, + ) + + +# ========================================================================= +# Singleton partagé (optionnel) +# ========================================================================= + + +_shared_observer: Optional[ShadowObserver] = None +_shared_lock = threading.Lock() + + +def get_shared_observer() -> ShadowObserver: + """Observer partagé pour l'API (lazy init).""" + global _shared_observer + with _shared_lock: + if _shared_observer is None: + _shared_observer = ShadowObserver() + return _shared_observer diff --git a/core/workflow/shadow_validator.py b/core/workflow/shadow_validator.py new file mode 100644 index 000000000..e7a2dd61e --- /dev/null +++ b/core/workflow/shadow_validator.py @@ -0,0 +1,468 @@ +# core/workflow/shadow_validator.py +""" +ShadowValidator — Applique les feedbacks utilisateur et reconstruit un WorkflowIR. + +Le ShadowObserver observe et comprend en temps réel. Le ShadowValidator, +lui, prend les décisions de l'utilisateur (valider, corriger, annuler, +combiner) et reconstruit un WorkflowIR final « propre » qui sera +persisté et exécutable par le runtime. + +Opérations supportées : +- validate(step_index) : marquer l'étape comme validée +- correct(step_index, new_intent) : corriger l'intention +- undo(step_index) : annuler l'étape (elle sera exclue du WorkflowIR) +- merge_with_next(step_index) : fusionner avec l'étape suivante +- cancel() : annuler tout le workflow +- split(step_index, at_event_index) : couper une étape en deux (bonus) + +Le validator ne touche PAS aux événements bruts (events.jsonl) — il +travaille sur la liste des `UnderstoodStep` fournie par le ShadowObserver. + +Une fois toutes les actions appliquées, `build_workflow_ir()` produit +un WorkflowIR exécutable à partir des étapes validées/corrigées. + +Usage : + + validator = ShadowValidator() + validator.set_steps(observer.get_steps_internal(session_id)) + + validator.apply_feedback({"action": "validate", "step_index": 1}) + validator.apply_feedback({ + "action": "correct", + "step_index": 2, + "new_intent": "Sauvegarder le document", + }) + validator.apply_feedback({"action": "undo", "step_index": 3}) + + ir = validator.build_workflow_ir( + session_id="sess_abc", + name="Mon workflow", + domain="generic", + ) + ir.save("data/workflows/") +""" + +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple + +from .shadow_observer import UnderstoodStep +from .workflow_ir import Action, Step, Variable, WorkflowIR + +logger = logging.getLogger(__name__) + + +# Actions supportées par le feedback +FEEDBACK_ACTIONS = { + "validate", + "correct", + "undo", + "cancel", + "merge_next", + "split", +} + + +@dataclass +class FeedbackResult: + """Résultat d'une opération de feedback.""" + + ok: bool + action: str + step_index: int + message: str + data: Dict[str, Any] + + def to_dict(self) -> Dict[str, Any]: + return { + "ok": self.ok, + "action": self.action, + "step_index": self.step_index, + "message": self.message, + "data": dict(self.data), + } + + +class ShadowValidator: + """Applique les feedbacks utilisateur et produit un WorkflowIR.""" + + def __init__(self) -> None: + self._steps: List[UnderstoodStep] = [] + self._cancelled_workflow: bool = False + self._history: List[FeedbackResult] = [] + + # ----- API ----------------------------------------------------------- + + def set_steps(self, steps: List[UnderstoodStep]) -> None: + """Initialiser le validator avec la liste des étapes observées.""" + self._steps = [self._clone(s) for s in steps] + self._cancelled_workflow = False + self._history = [] + + @property + def steps(self) -> List[UnderstoodStep]: + """Vue en lecture des étapes courantes.""" + return list(self._steps) + + @property + def history(self) -> List[FeedbackResult]: + """Historique des feedbacks appliqués.""" + return list(self._history) + + @property + def is_cancelled(self) -> bool: + return self._cancelled_workflow + + def apply_feedback(self, feedback: Dict[str, Any]) -> FeedbackResult: + """Appliquer un feedback utilisateur. + + Le `feedback` est un dict au format : + { + "action": "validate" | "correct" | "undo" | "cancel" | "merge_next" | "split", + "step_index": 1, # Index 1-based (comme dans get_understanding) + "new_intent": "...", # Pour correct + "at_event_index": 3, # Pour split + } + + Returns: + FeedbackResult + """ + action = (feedback.get("action") or "").strip() + if action not in FEEDBACK_ACTIONS: + return self._record(FeedbackResult( + ok=False, action=action, step_index=-1, + message=f"Action inconnue : « {action} »", + data={"supported": sorted(FEEDBACK_ACTIONS)}, + )) + + if action == "cancel": + return self._do_cancel() + + step_index = int(feedback.get("step_index", -1)) + if not self._is_valid_step_index(step_index): + return self._record(FeedbackResult( + ok=False, action=action, step_index=step_index, + message=f"Index d'étape invalide : {step_index}", + data={"nb_steps": len(self._steps)}, + )) + + if action == "validate": + return self._do_validate(step_index) + if action == "correct": + return self._do_correct(step_index, feedback.get("new_intent", "")) + if action == "undo": + return self._do_undo(step_index) + if action == "merge_next": + return self._do_merge_next(step_index) + if action == "split": + return self._do_split( + step_index, int(feedback.get("at_event_index", -1)) + ) + + return self._record(FeedbackResult( + ok=False, action=action, step_index=step_index, + message="Action non implémentée", data={}, + )) + + def apply_feedbacks( + self, feedbacks: List[Dict[str, Any]] + ) -> List[FeedbackResult]: + """Appliquer plusieurs feedbacks dans l'ordre.""" + return [self.apply_feedback(f) for f in feedbacks] + + # ----- Opérations --------------------------------------------------- + + def _do_validate(self, step_index: int) -> FeedbackResult: + step = self._get_step(step_index) + step.validated = True + step.intent_provisoire = False + step.confidence = max(step.confidence, 0.95) + return self._record(FeedbackResult( + ok=True, action="validate", step_index=step_index, + message=f"Étape {step_index} validée : {step.intent}", + data={"intent": step.intent}, + )) + + def _do_correct( + self, step_index: int, new_intent: str + ) -> FeedbackResult: + new_intent = (new_intent or "").strip() + if not new_intent: + return self._record(FeedbackResult( + ok=False, action="correct", step_index=step_index, + message="Nouvelle intention vide", + data={}, + )) + step = self._get_step(step_index) + old_intent = step.intent + step.intent = new_intent + step.corrected = True + step.validated = True # Corriger = implicitement valider + step.intent_provisoire = False + step.confidence = 1.0 + return self._record(FeedbackResult( + ok=True, action="correct", step_index=step_index, + message=f"Étape {step_index} corrigée : « {old_intent} » → « {new_intent} »", + data={"old_intent": old_intent, "new_intent": new_intent}, + )) + + def _do_undo(self, step_index: int) -> FeedbackResult: + step = self._get_step(step_index) + step.cancelled = True + return self._record(FeedbackResult( + ok=True, action="undo", step_index=step_index, + message=f"Étape {step_index} annulée : {step.intent}", + data={"intent": step.intent}, + )) + + def _do_merge_next(self, step_index: int) -> FeedbackResult: + """Fusionner l'étape avec la suivante.""" + if step_index >= len(self._steps): + return self._record(FeedbackResult( + ok=False, action="merge_next", step_index=step_index, + message="Aucune étape suivante à fusionner", + data={}, + )) + step = self._get_step(step_index) + next_step = self._get_step(step_index + 1) + + merged = UnderstoodStep( + step_index=step.step_index, + intent=step.intent if len(step.intent) >= len(next_step.intent) else next_step.intent, + intent_provisoire=False, + confidence=max(step.confidence, next_step.confidence), + app_name=step.app_name or next_step.app_name, + window_title=step.window_title or next_step.window_title, + events=list(step.events) + list(next_step.events), + variables_detectees=list(step.variables_detectees) + + list(next_step.variables_detectees), + started_at=step.started_at or next_step.started_at, + ended_at=next_step.ended_at or step.ended_at, + validated=True, + corrected=step.corrected or next_step.corrected, + cancelled=False, + ) + + # Remplacer [step, next_step] par [merged] + idx0 = step_index - 1 # 1-based → 0-based + self._steps.pop(idx0 + 1) # next_step + self._steps[idx0] = merged + self._renumber() + + return self._record(FeedbackResult( + ok=True, action="merge_next", step_index=step_index, + message=f"Étapes {step_index} et {step_index + 1} fusionnées", + data={"intent": merged.intent}, + )) + + def _do_split( + self, step_index: int, at_event_index: int + ) -> FeedbackResult: + """Couper une étape en deux au niveau de l'événement at_event_index. + + `at_event_index` est 0-based parmi les events de l'étape. + """ + step = self._get_step(step_index) + if at_event_index <= 0 or at_event_index >= len(step.events): + return self._record(FeedbackResult( + ok=False, action="split", step_index=step_index, + message=f"Index de coupe invalide : {at_event_index}", + data={"nb_events": len(step.events)}, + )) + + left_events = step.events[:at_event_index] + right_events = step.events[at_event_index:] + + left = UnderstoodStep( + step_index=step.step_index, + intent=step.intent + " (1/2)", + intent_provisoire=True, + confidence=step.confidence * 0.9, + app_name=step.app_name, + window_title=step.window_title, + events=left_events, + started_at=step.started_at, + ) + right = UnderstoodStep( + step_index=step.step_index + 1, + intent=step.intent + " (2/2)", + intent_provisoire=True, + confidence=step.confidence * 0.9, + app_name=step.app_name, + window_title=step.window_title, + events=right_events, + started_at=float(right_events[0].get("timestamp", 0)) + if right_events else step.started_at, + ended_at=step.ended_at, + ) + + idx0 = step_index - 1 + self._steps[idx0] = left + self._steps.insert(idx0 + 1, right) + self._renumber() + + return self._record(FeedbackResult( + ok=True, action="split", step_index=step_index, + message=f"Étape {step_index} coupée en 2", + data={"nb_steps": len(self._steps)}, + )) + + def _do_cancel(self) -> FeedbackResult: + self._cancelled_workflow = True + return self._record(FeedbackResult( + ok=True, action="cancel", step_index=-1, + message="Workflow annulé", + data={}, + )) + + # ----- Construction du WorkflowIR ----------------------------------- + + def build_workflow_ir( + self, + session_id: str = "", + name: str = "", + domain: str = "generic", + *, + require_all_validated: bool = False, + ) -> Optional[WorkflowIR]: + """Construire un WorkflowIR à partir des étapes corrigées. + + Args: + session_id: Identifiant de la session source. + name: Nom du workflow. + domain: Domaine métier. + require_all_validated: Si True, lève une erreur si au moins + une étape n'a pas été validée explicitement. + + Returns: + WorkflowIR ou None si le workflow a été annulé. + """ + if self._cancelled_workflow: + logger.info("ShadowValidator: workflow annulé, pas de build") + return None + + ir = WorkflowIR.new( + name=name or f"Workflow du {time.strftime('%d/%m/%Y %H:%M')}", + domain=domain, + learned_from=session_id, + ) + + variables: List[Variable] = [] + seen_texts = set() + applications: set = set() + + for step in self._steps: + if step.cancelled: + continue + if require_all_validated and not step.validated: + raise ValueError( + f"Étape {step.step_index} non validée : {step.intent}" + ) + + if step.app_name: + applications.add(step.app_name) + + actions = [] + for evt in step.events: + action = self._event_to_action(evt) + if action is None: + continue + + # Détection de variable (texte saisi) + if action.type == "type" and action.text: + text = action.text.strip() + if text and text not in seen_texts and len(text) > 2: + seen_texts.add(text) + var_name = f"texte_{len(variables) + 1}" + variables.append(Variable( + name=var_name, + description=f"Texte saisi : « {text[:50]} »", + source="user", + default=text, + )) + action.variable = True + action.text = "{" + var_name + "}" + + actions.append(action) + + ir_step = Step( + step_id=f"s{len(ir.steps) + 1}", + intent=step.intent, + actions=actions, + ) + ir.steps.append(ir_step) + + ir.variables = variables + ir.applications = sorted(applications) + ir.updated_at = time.time() + + logger.info( + f"ShadowValidator: WorkflowIR construit — {len(ir.steps)} étapes, " + f"{len(ir.variables)} variables" + ) + return ir + + # ----- Utilitaires -------------------------------------------------- + + def _is_valid_step_index(self, step_index: int) -> bool: + return 1 <= step_index <= len(self._steps) + + def _get_step(self, step_index: int) -> UnderstoodStep: + return self._steps[step_index - 1] + + def _renumber(self) -> None: + for i, s in enumerate(self._steps, start=1): + s.step_index = i + + def _record(self, result: FeedbackResult) -> FeedbackResult: + self._history.append(result) + return result + + @staticmethod + def _clone(step: UnderstoodStep) -> UnderstoodStep: + return UnderstoodStep( + step_index=step.step_index, + intent=step.intent, + intent_provisoire=step.intent_provisoire, + confidence=step.confidence, + app_name=step.app_name, + window_title=step.window_title, + events=list(step.events), + variables_detectees=list(step.variables_detectees), + started_at=step.started_at, + ended_at=step.ended_at, + validated=step.validated, + corrected=step.corrected, + cancelled=step.cancelled, + ) + + @staticmethod + def _event_to_action(evt: Dict[str, Any]) -> Optional[Action]: + """Convertir un événement brut en Action (miroir de IRBuilder).""" + evt_type = evt.get("type", "") + + if evt_type == "mouse_click": + window = evt.get("window") or {} + if isinstance(window, dict): + target = window.get("title", "") + else: + target = evt.get("window_title", "") + return Action( + type="click", + target=target or "", + anchor_hint=(evt.get("vision_info") or {}).get("text", ""), + ) + if evt_type == "text_input": + text = evt.get("text", "") + if text: + return Action(type="type", text=text) + if evt_type in ("key_combo", "key_press"): + keys = evt.get("keys", []) + if keys: + return Action(type="key_combo", keys=list(keys)) + if evt_type == "scroll": + return Action(type="scroll") + return None diff --git a/tests/unit/test_shadow_observer.py b/tests/unit/test_shadow_observer.py new file mode 100644 index 000000000..47c27d277 --- /dev/null +++ b/tests/unit/test_shadow_observer.py @@ -0,0 +1,430 @@ +""" +Tests du ShadowObserver — observation temps réel de Léa. + +Vérifie que : +- L'observer démarre et s'arrête correctement +- Les événements sont segmentés en étapes logiques +- Les variables sont détectées pendant la frappe +- Les notifications sont émises avec le bon niveau +- La compréhension est accessible en temps réel +- Le callback de notification est appelé +- Les événements parasites (heartbeat) sont ignorés +""" + +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from core.workflow.shadow_observer import ( + NiveauNotification, + NotificationShadow, + ShadowObserver, + UnderstoodStep, + get_shared_observer, +) + + +# ========================================================================= +# Fixtures +# ========================================================================= + + +def _evt_click(text="Rechercher", title="Explorateur", ts=100.0): + return { + "type": "mouse_click", + "pos": [400, 580], + "window": {"title": title, "app_name": title.split(" - ")[-1] if " - " in title else title}, + "timestamp": ts, + "vision_info": {"text": text}, + } + + +def _evt_type(text="bonjour", title="Bloc-notes", ts=101.0): + return { + "type": "text_input", + "text": text, + "window": {"title": title, "app_name": title}, + "timestamp": ts, + } + + +def _evt_key(keys=None, title="Bloc-notes", ts=102.0): + return { + "type": "key_combo", + "keys": keys or ["enter"], + "window": {"title": title, "app_name": title}, + "timestamp": ts, + } + + +def _evt_heartbeat(ts=103.0): + return {"type": "heartbeat", "timestamp": ts} + + +def _make_session_events(): + """Scénario typique : ouvrir Bloc-notes, taper du texte, sauvegarder.""" + return [ + _evt_click(text="Rechercher", title="Menu Démarrer", ts=100.0), + _evt_type(text="blocnote", title="Menu Démarrer", ts=101.0), + _evt_key(keys=["enter"], title="Menu Démarrer", ts=102.0), + _evt_heartbeat(ts=103.0), # Doit être ignoré + # Changement d'application → nouveau segment attendu + _evt_click(text="", title="Sans titre - Bloc-notes", ts=105.0), + _evt_type(text="Bonjour le monde", title="Sans titre - Bloc-notes", ts=106.0), + _evt_key(keys=["ctrl", "s"], title="Sans titre - Bloc-notes", ts=108.0), + ] + + +# ========================================================================= +# Tests de base +# ========================================================================= + + +class TestShadowObserverBase: + + def test_start_et_stop(self): + obs = ShadowObserver() + obs.start("sess_test") + assert obs.has_session("sess_test") + obs.stop("sess_test") + assert obs.has_session("sess_test") # stop ne supprime pas + obs.reset("sess_test") + assert not obs.has_session("sess_test") + + def test_auto_start_sur_premier_event(self): + """observe_event() sans start() doit auto-démarrer la session.""" + obs = ShadowObserver() + obs.observe_event("sess_auto", _evt_click()) + assert obs.has_session("sess_auto") + steps = obs.get_understanding("sess_auto") + assert len(steps) >= 1 + + def test_heartbeat_ignore(self): + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_heartbeat()) + steps = obs.get_understanding("s1") + assert len(steps) == 0 # Aucune étape créée par un heartbeat + + def test_focus_change_ignore(self): + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", {"type": "focus_change", "timestamp": 100}) + assert len(obs.get_understanding("s1")) == 0 + + +# ========================================================================= +# Segmentation +# ========================================================================= + + +class TestShadowObserverSegmentation: + + def test_segmentation_par_changement_app(self): + """Un changement d'application crée un nouveau segment.""" + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_click(title="Firefox")) + obs.observe_event("s1", _evt_click(title="Firefox", ts=100.5)) + obs.observe_event("s1", _evt_click(title="Bloc-notes", ts=101.0)) + + steps = obs.get_understanding("s1") + assert len(steps) >= 2 # Au moins 2 segments + + def test_segmentation_par_pause_longue(self): + """Une pause > 4s coupe le segment.""" + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_click(title="App1", ts=100.0)) + obs.observe_event("s1", _evt_click(title="App1", ts=100.5)) + # Pause de 10 secondes + obs.observe_event("s1", _evt_click(title="App1", ts=110.5)) + + steps = obs.get_understanding("s1") + assert len(steps) >= 2 + + def test_segment_complet(self): + """Scénario complet : Bloc-notes + texte + save.""" + obs = ShadowObserver() + obs.start("s1") + for evt in _make_session_events(): + obs.observe_event("s1", evt) + obs.stop("s1") + + steps = obs.get_understanding("s1") + assert len(steps) >= 2 # Au moins Menu Démarrer + Bloc-notes + + # Au moins une étape doit mentionner le Bloc-notes + intents = " ".join(s["intent"].lower() for s in steps) + assert "bloc" in intents or "enregistr" in intents or "sauvegard" in intents or \ + "écrir" in intents or "text" in intents.lower() + + +# ========================================================================= +# Intention et raffinement +# ========================================================================= + + +class TestShadowObserverIntent: + + def test_intent_recherche(self): + """Clic + saisie + entrée → 'Rechercher X'.""" + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_click(text="Champ recherche", title="Explorateur", ts=100.0)) + obs.observe_event("s1", _evt_type(text="calculatrice", title="Explorateur", ts=100.5)) + obs.observe_event("s1", _evt_key(keys=["enter"], title="Explorateur", ts=101.0)) + + current = obs.get_current_step("s1") + assert current is not None + assert "recherch" in current["intent"].lower() or "calculatrice" in current["intent"].lower() + + def test_intent_ctrl_s(self): + """Ctrl+S → 'Sauvegarder'.""" + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_click(title="Bloc-notes", ts=100.0)) + obs.observe_event("s1", _evt_key(keys=["ctrl", "s"], title="Bloc-notes", ts=100.5)) + + current = obs.get_current_step("s1") + assert current is not None + assert "sauvegard" in current["intent"].lower() + + def test_intent_ecrire(self): + """Saisie seule → 'Écrire'.""" + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_type(text="Un texte libre", title="Bloc-notes")) + + current = obs.get_current_step("s1") + assert current is not None + assert "écri" in current["intent"].lower() or "text" in current["intent"].lower() + + def test_confidence_augmente_avec_contexte(self): + """La confiance augmente quand le contexte devient clair.""" + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_click(title="App")) + c1 = obs.get_current_step("s1")["confidence"] + + obs.observe_event("s1", _evt_key(keys=["ctrl", "s"], title="App")) + c2 = obs.get_current_step("s1")["confidence"] + + assert c2 >= c1 + + +# ========================================================================= +# Détection de variables +# ========================================================================= + + +class TestShadowObserverVariables: + + def test_variable_detectee_lors_saisie(self): + """Une saisie texte > 3 caractères crée une variable.""" + obs = ShadowObserver() + notifs = [] + obs._notify_callback = lambda n: notifs.append(n) + obs.start("s1") + + obs.observe_event("s1", _evt_type(text="Jean Dupont", title="Formulaire")) + + var_notifs = [n for n in notifs if n.niveau == NiveauNotification.VARIABLE] + assert len(var_notifs) == 1 + assert "Jean Dupont" in var_notifs[0].message + assert var_notifs[0].data["variable_name"].startswith("texte_") + + def test_variable_type_date(self): + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_type(text="15/03/2026", title="Formulaire")) + + current = obs.get_current_step("s1") + assert len(current["variables_detectees"]) == 1 + + def test_variable_type_email(self): + obs = ShadowObserver() + notifs = [] + obs._notify_callback = lambda n: notifs.append(n) + obs.start("s1") + + obs.observe_event("s1", _evt_type(text="jean@example.com", title="Formulaire")) + + var_notifs = [n for n in notifs if n.niveau == NiveauNotification.VARIABLE] + assert len(var_notifs) == 1 + assert "e-mail" in var_notifs[0].message or "mail" in var_notifs[0].message + + def test_texte_court_ignore(self): + """Un texte de moins de 3 caractères n'est pas une variable.""" + obs = ShadowObserver() + notifs = [] + obs._notify_callback = lambda n: notifs.append(n) + obs.start("s1") + + obs.observe_event("s1", _evt_type(text="ab", title="App")) + + var_notifs = [n for n in notifs if n.niveau == NiveauNotification.VARIABLE] + assert len(var_notifs) == 0 + + +# ========================================================================= +# Notifications et callbacks +# ========================================================================= + + +class TestShadowObserverNotifications: + + def test_notification_au_demarrage(self): + notifs = [] + obs = ShadowObserver(notify_callback=lambda n: notifs.append(n)) + obs.start("s1") + + assert len(notifs) >= 1 + assert notifs[0].niveau == NiveauNotification.INFO + assert "observe" in notifs[0].message.lower() + + def test_notification_nouvelle_etape(self): + """Un changement d'application émet une notification DECOUVERTE.""" + notifs = [] + obs = ShadowObserver(notify_callback=lambda n: notifs.append(n)) + obs.start("s1") + + obs.observe_event("s1", _evt_click(title="Firefox", ts=100.0)) + obs.observe_event("s1", _evt_click(title="Bloc-notes", ts=101.0)) + + decouverts = [n for n in notifs if n.niveau == NiveauNotification.DECOUVERTE] + assert len(decouverts) >= 1 + + def test_notification_stop_resume(self): + """Au stop, on émet un résumé du nombre d'étapes.""" + notifs = [] + obs = ShadowObserver(notify_callback=lambda n: notifs.append(n)) + obs.start("s1") + for evt in _make_session_events(): + obs.observe_event("s1", evt) + obs.stop("s1") + + messages = [n.message.lower() for n in notifs] + assert any("étape" in m or "observ" in m for m in messages) + + def test_notifications_since_ts(self): + """get_notifications(since_ts=...) filtre correctement.""" + obs = ShadowObserver() + obs.start("s1") + time.sleep(0.01) + mid_ts = time.time() + time.sleep(0.01) + obs.observe_event("s1", _evt_click(title="Firefox")) + obs.observe_event("s1", _evt_click(title="Bloc-notes")) + + recentes = obs.get_notifications("s1", since_ts=mid_ts) + toutes = obs.get_notifications("s1", since_ts=0) + assert len(recentes) < len(toutes) + + def test_callback_erreur_ne_plante_pas(self): + """Un callback qui lève ne doit pas faire planter l'observer.""" + def bad_callback(notif): + raise RuntimeError("boom") + + obs = ShadowObserver(notify_callback=bad_callback) + obs.start("s1") # Devrait émettre une notification (qui plante en callback) + # Si on arrive ici, c'est OK + obs.observe_event("s1", _evt_click()) + + +# ========================================================================= +# Compréhension et API publique +# ========================================================================= + + +class TestShadowObserverUnderstanding: + + def test_get_understanding_format(self): + """La structure retournée est bien celle attendue.""" + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_click(title="App")) + + steps = obs.get_understanding("s1") + assert isinstance(steps, list) + assert len(steps) >= 1 + step = steps[0] + assert "step" in step + assert "intent" in step + assert "confidence" in step + assert isinstance(step["step"], int) + assert 0.0 <= step["confidence"] <= 1.0 + + def test_get_understanding_sans_session(self): + obs = ShadowObserver() + steps = obs.get_understanding("inexistante") + assert steps == [] + + def test_get_current_step(self): + obs = ShadowObserver() + obs.start("s1") + assert obs.get_current_step("s1") is None + + obs.observe_event("s1", _evt_click(title="App")) + current = obs.get_current_step("s1") + assert current is not None + assert current["step_index"] == 1 + + def test_get_steps_internal(self): + """get_steps_internal retourne des UnderstoodStep copiés.""" + obs = ShadowObserver() + obs.start("s1") + obs.observe_event("s1", _evt_click(title="App")) + + internals = obs.get_steps_internal("s1") + assert len(internals) >= 1 + assert isinstance(internals[0], UnderstoodStep) + + # Mutation externe ne doit pas affecter l'observer + internals[0].intent = "HACKED" + again = obs.get_steps_internal("s1") + assert again[0].intent != "HACKED" + + +# ========================================================================= +# Singleton partagé +# ========================================================================= + + +class TestSharedObserver: + + def test_singleton(self): + obs1 = get_shared_observer() + obs2 = get_shared_observer() + assert obs1 is obs2 + + +# ========================================================================= +# Performance (contrainte : observe_event doit être rapide) +# ========================================================================= + + +class TestShadowObserverPerformance: + + def test_observe_event_rapide(self): + """observe_event() doit traiter 1000 events en moins de 500ms.""" + obs = ShadowObserver() + obs.start("s_perf") + + events = [] + for i in range(1000): + events.append(_evt_click(title="App", ts=100.0 + i * 0.01)) + + start = time.time() + for evt in events: + obs.observe_event("s_perf", evt) + elapsed = time.time() - start + + assert elapsed < 0.5, f"Trop lent : {elapsed:.2f}s pour 1000 events" diff --git a/tests/unit/test_shadow_validator.py b/tests/unit/test_shadow_validator.py new file mode 100644 index 000000000..e6450a5a0 --- /dev/null +++ b/tests/unit/test_shadow_validator.py @@ -0,0 +1,531 @@ +""" +Tests du ShadowValidator — feedback utilisateur et reconstruction WorkflowIR. + +Vérifie que : +- Les feedbacks (validate/correct/undo/merge_next/split/cancel) sont appliqués +- Le WorkflowIR final est bien reconstruit à partir des étapes corrigées +- Les variables sont détectées dans les actions finales +- L'historique des feedbacks est conservé +- Les erreurs (index invalide, action inconnue) sont gérées proprement +""" + +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from core.workflow.shadow_observer import ShadowObserver, UnderstoodStep +from core.workflow.shadow_validator import FeedbackResult, ShadowValidator +from core.workflow.workflow_ir import WorkflowIR + + +# ========================================================================= +# Fixtures +# ========================================================================= + + +def _make_step(step_index=1, intent="Ouvrir Firefox", app="Firefox", + events=None, **kwargs): + return UnderstoodStep( + step_index=step_index, + intent=intent, + app_name=app, + window_title=app, + events=events or [ + {"type": "mouse_click", "window": {"title": app}, + "vision_info": {"text": "bouton"}, "timestamp": 100.0} + ], + started_at=100.0, + ended_at=101.0, + **kwargs, + ) + + +def _make_type_step(texte="Bonjour", app="Bloc-notes", step_index=1): + return UnderstoodStep( + step_index=step_index, + intent=f"Écrire « {texte} »", + app_name=app, + window_title=app, + events=[ + {"type": "text_input", "text": texte, + "window": {"title": app}, "timestamp": 100.0} + ], + started_at=100.0, + ) + + +def _make_3_steps(): + return [ + _make_step(1, "Ouvrir le Bloc-notes", "Bloc-notes"), + _make_type_step("Bonjour le monde", step_index=2), + _make_step(3, "Sauvegarder", "Bloc-notes", events=[ + {"type": "key_combo", "keys": ["ctrl", "s"], + "window": {"title": "Bloc-notes"}, "timestamp": 103.0} + ]), + ] + + +# ========================================================================= +# Initialisation +# ========================================================================= + + +class TestShadowValidatorBase: + + def test_creation(self): + v = ShadowValidator() + assert v.steps == [] + assert v.history == [] + assert not v.is_cancelled + + def test_set_steps(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + assert len(v.steps) == 3 + assert v.steps[0].intent == "Ouvrir le Bloc-notes" + + def test_clone_protege_mutation(self): + """set_steps clone les étapes pour éviter les mutations externes.""" + v = ShadowValidator() + steps = _make_3_steps() + v.set_steps(steps) + steps[0].intent = "HACKED" + assert v.steps[0].intent == "Ouvrir le Bloc-notes" + + +# ========================================================================= +# validate +# ========================================================================= + + +class TestValidatorValidate: + + def test_validate_etape(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + result = v.apply_feedback({"action": "validate", "step_index": 1}) + + assert result.ok is True + assert v.steps[0].validated is True + assert v.steps[0].confidence >= 0.95 + assert v.steps[0].intent_provisoire is False + + def test_validate_index_invalide(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + result = v.apply_feedback({"action": "validate", "step_index": 99}) + assert result.ok is False + assert "invalide" in result.message.lower() + + def test_validate_toutes_les_etapes(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + for i in range(1, 4): + v.apply_feedback({"action": "validate", "step_index": i}) + + assert all(s.validated for s in v.steps) + + +# ========================================================================= +# correct +# ========================================================================= + + +class TestValidatorCorrect: + + def test_correct_intent(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + result = v.apply_feedback({ + "action": "correct", + "step_index": 1, + "new_intent": "Démarrer la rédaction d'un email", + }) + + assert result.ok is True + assert v.steps[0].intent == "Démarrer la rédaction d'un email" + assert v.steps[0].corrected is True + assert v.steps[0].validated is True # Corriger = valider implicitement + assert "old_intent" in result.data + + def test_correct_intent_vide(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + result = v.apply_feedback({ + "action": "correct", + "step_index": 1, + "new_intent": "", + }) + + assert result.ok is False + assert v.steps[0].corrected is False + + +# ========================================================================= +# undo +# ========================================================================= + + +class TestValidatorUndo: + + def test_undo_etape(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + result = v.apply_feedback({"action": "undo", "step_index": 2}) + assert result.ok is True + assert v.steps[1].cancelled is True + + def test_undo_exclut_etape_du_workflow(self): + """Une étape undo ne doit pas apparaître dans le WorkflowIR final.""" + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + v.apply_feedback({"action": "undo", "step_index": 2}) + ir = v.build_workflow_ir(session_id="s1", name="Test") + + assert ir is not None + assert len(ir.steps) == 2 # 3 - 1 = 2 + + +# ========================================================================= +# merge_next +# ========================================================================= + + +class TestValidatorMergeNext: + + def test_merge_deux_etapes(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + result = v.apply_feedback({"action": "merge_next", "step_index": 1}) + assert result.ok is True + assert len(v.steps) == 2 # 3 - 1 = 2 + + def test_merge_conserve_les_events(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + total_events_before = sum(len(s.events) for s in v.steps) + v.apply_feedback({"action": "merge_next", "step_index": 2}) + total_events_after = sum(len(s.events) for s in v.steps) + + assert total_events_before == total_events_after + + def test_merge_derniere_etape_echoue(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + result = v.apply_feedback({"action": "merge_next", "step_index": 3}) + assert result.ok is False + + +# ========================================================================= +# split +# ========================================================================= + + +class TestValidatorSplit: + + def test_split_en_deux(self): + v = ShadowValidator() + multi_events_step = _make_step( + 1, "Étape composite", + events=[ + {"type": "mouse_click", "window": {"title": "App"}, + "timestamp": 100.0, "vision_info": {}}, + {"type": "text_input", "text": "partie 1", + "window": {"title": "App"}, "timestamp": 101.0}, + {"type": "text_input", "text": "partie 2", + "window": {"title": "App"}, "timestamp": 102.0}, + ], + ) + v.set_steps([multi_events_step]) + + result = v.apply_feedback({ + "action": "split", + "step_index": 1, + "at_event_index": 2, + }) + + assert result.ok is True + assert len(v.steps) == 2 + assert len(v.steps[0].events) == 2 + assert len(v.steps[1].events) == 1 + + def test_split_index_invalide(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + result = v.apply_feedback({ + "action": "split", + "step_index": 1, + "at_event_index": 99, + }) + assert result.ok is False + + +# ========================================================================= +# cancel +# ========================================================================= + + +class TestValidatorCancel: + + def test_cancel_workflow(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + result = v.apply_feedback({"action": "cancel"}) + assert result.ok is True + assert v.is_cancelled + + def test_cancel_build_retourne_none(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + v.apply_feedback({"action": "cancel"}) + + ir = v.build_workflow_ir(session_id="s1", name="Test") + assert ir is None + + +# ========================================================================= +# Action inconnue +# ========================================================================= + + +class TestValidatorUnknownAction: + + def test_action_inconnue(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + result = v.apply_feedback({"action": "do_magic"}) + assert result.ok is False + assert "inconnue" in result.message.lower() + + +# ========================================================================= +# Construction du WorkflowIR +# ========================================================================= + + +class TestValidatorBuild: + + def test_build_workflow_ir(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + ir = v.build_workflow_ir( + session_id="sess_test", + name="Mon workflow", + domain="generic", + ) + + assert ir is not None + assert isinstance(ir, WorkflowIR) + assert ir.name == "Mon workflow" + assert ir.learned_from == "sess_test" + assert len(ir.steps) == 3 + + def test_build_with_variables(self): + """Les textes saisis deviennent des variables dans le WorkflowIR.""" + v = ShadowValidator() + v.set_steps([ + _make_type_step("Jean Dupont", step_index=1), + _make_type_step("jean@example.com", app="Email", step_index=2), + ]) + + ir = v.build_workflow_ir(session_id="s1", name="Test") + assert len(ir.variables) == 2 + + # Les actions de type type doivent référencer les variables + for step in ir.steps: + for action in step.actions: + if action.type == "type": + assert action.variable is True + assert action.text.startswith("{") + + def test_build_respecte_corrections(self): + """Les intentions corrigées se retrouvent dans le WorkflowIR.""" + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + v.apply_feedback({ + "action": "correct", + "step_index": 1, + "new_intent": "Lancer l'application de prise de notes", + }) + + ir = v.build_workflow_ir(session_id="s1", name="Test") + assert ir.steps[0].intent == "Lancer l'application de prise de notes" + + def test_build_exclut_etapes_annulees(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + v.apply_feedback({"action": "undo", "step_index": 2}) + + ir = v.build_workflow_ir(session_id="s1", name="Test") + assert len(ir.steps) == 2 + + def test_build_require_all_validated(self): + """Avec require_all_validated, erreur si une étape n'est pas validée.""" + v = ShadowValidator() + v.set_steps(_make_3_steps()) + v.apply_feedback({"action": "validate", "step_index": 1}) + # Étapes 2 et 3 pas validées + + with pytest.raises(ValueError): + v.build_workflow_ir( + session_id="s1", name="Test", require_all_validated=True + ) + + def test_build_applications_detectees(self): + v = ShadowValidator() + v.set_steps([ + _make_step(1, "Ouvrir Firefox", "Firefox"), + _make_step(2, "Écrire", "Bloc-notes"), + ]) + ir = v.build_workflow_ir(session_id="s1", name="Test") + assert "Firefox" in ir.applications + assert "Bloc-notes" in ir.applications + + +# ========================================================================= +# Historique +# ========================================================================= + + +class TestValidatorHistory: + + def test_historique_feedbacks(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + v.apply_feedback({"action": "validate", "step_index": 1}) + v.apply_feedback({ + "action": "correct", "step_index": 2, + "new_intent": "Écrire le texte" + }) + v.apply_feedback({"action": "undo", "step_index": 3}) + + history = v.history + assert len(history) == 3 + assert history[0].action == "validate" + assert history[1].action == "correct" + assert history[2].action == "undo" + + def test_apply_feedbacks_batch(self): + v = ShadowValidator() + v.set_steps(_make_3_steps()) + + results = v.apply_feedbacks([ + {"action": "validate", "step_index": 1}, + {"action": "validate", "step_index": 2}, + {"action": "undo", "step_index": 3}, + ]) + + assert len(results) == 3 + assert all(r.ok for r in results) + + +# ========================================================================= +# Intégration ShadowObserver + ShadowValidator +# ========================================================================= + + +class TestShadowObserverValidatorIntegration: + + def test_observer_vers_validator(self): + """Flow complet : Observer → Validator → WorkflowIR.""" + obs = ShadowObserver() + obs.start("sess_flow") + + # Simuler des événements + obs.observe_event("sess_flow", { + "type": "mouse_click", + "window": {"title": "Menu Démarrer", "app_name": "Menu Démarrer"}, + "vision_info": {"text": "Rechercher"}, + "timestamp": 100.0, + }) + obs.observe_event("sess_flow", { + "type": "text_input", + "text": "blocnote", + "window": {"title": "Menu Démarrer", "app_name": "Menu Démarrer"}, + "timestamp": 100.5, + }) + obs.observe_event("sess_flow", { + "type": "key_combo", + "keys": ["enter"], + "window": {"title": "Menu Démarrer", "app_name": "Menu Démarrer"}, + "timestamp": 101.0, + }) + # Changement d'application + obs.observe_event("sess_flow", { + "type": "text_input", + "text": "Hello world", + "window": {"title": "Sans titre - Bloc-notes", "app_name": "Bloc-notes"}, + "timestamp": 105.0, + }) + obs.stop("sess_flow") + + # Récupérer les étapes + internals = obs.get_steps_internal("sess_flow") + assert len(internals) >= 2 + + # Passer au validator + validator = ShadowValidator() + validator.set_steps(internals) + + # Valider la première étape, corriger la seconde + validator.apply_feedback({"action": "validate", "step_index": 1}) + validator.apply_feedback({ + "action": "correct", + "step_index": 2, + "new_intent": "Écrire un texte de démonstration", + }) + + # Construire le WorkflowIR + ir = validator.build_workflow_ir( + session_id="sess_flow", + name="Flow de test", + domain="generic", + ) + + assert ir is not None + assert len(ir.steps) >= 2 + assert ir.steps[1].intent == "Écrire un texte de démonstration" + assert len(ir.variables) >= 1 # Au moins "blocnote" ou "Hello world" + + def test_undo_puis_build(self): + obs = ShadowObserver() + obs.start("sess_undo") + for i in range(3): + obs.observe_event("sess_undo", { + "type": "mouse_click", + "window": {"title": f"App{i}"}, + "vision_info": {"text": "bouton"}, + "timestamp": 100.0 + i * 6.0, # > 4s pour créer des segments + }) + obs.stop("sess_undo") + + validator = ShadowValidator() + validator.set_steps(obs.get_steps_internal("sess_undo")) + nb_before = len(validator.steps) + assert nb_before >= 2 + + validator.apply_feedback({"action": "undo", "step_index": 1}) + + ir = validator.build_workflow_ir(session_id="sess_undo", name="Test") + assert len(ir.steps) == nb_before - 1