feat: Léa apprentissage — mode Shadow amélioré (observation + validation)

Aspect 3/4 Léa : Léa montre ce qu'elle comprend pendant l'enregistrement.

ShadowObserver (observation temps réel) :
- Segmentation incrémentale en UnderstoodStep (changement app, pause, Ctrl+S)
- Détection de variables pendant la saisie (typage : date, email, code, texte)
- Notifications 4 niveaux : INFO, DECOUVERTE, QUESTION, VARIABLE
- Heartbeat périodique, hook gemma4 optionnel (asynchrone)
- Thread-safe (RLock), singleton partagé
- Performance : 1000 events en < 500ms

ShadowValidator (feedback utilisateur) :
- 6 actions : validate, correct, undo, cancel, merge_next, split
- Reconstruit un WorkflowIR propre avec variables substituées
- Historique complet des feedbacks

5 endpoints REST /api/v1/shadow/* :
- start, stop, feedback, understanding, build

Hook non-bloquant dans stream_event() (try/except, no-op si inactif).
Mode optionnel : pas d'impact tant que shadow/start n'est pas appelé.

54 tests (26 observer + 28 validator), 0 régression.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-10 09:04:37 +02:00
parent 42d49dd8bd
commit 172167f6c0
4 changed files with 2122 additions and 0 deletions

View File

@@ -0,0 +1,693 @@
# core/workflow/shadow_observer.py
"""
ShadowObserver — Observation en temps réel de ce que Léa comprend.
C'est le "mode Shadow amélioré" : pendant que l'utilisateur enregistre
une démonstration, Léa lui dit ce qu'elle comprend au fur et à mesure.
Contrairement à l'IRBuilder (qui analyse TOUT à la fin en appelant gemma4),
le ShadowObserver travaille en incrémental :
- À chaque événement reçu, il met à jour sa compréhension locale.
- Il segmente dès qu'un critère de coupure est détecté.
- Il émet des notifications légères ("Léa a compris : tu viens d'ouvrir le
Bloc-notes") via un callback.
- Il détecte les variables (texte saisi) pendant la frappe.
Le ShadowObserver n'est pas la source de vérité — c'est une couche
d'observation. La source de vérité reste `live_events.jsonl`.
Le WorkflowIR final est toujours reconstruit par l'IRBuilder après
validation, mais la compréhension temps réel accélère la boucle de
rétroaction avec l'utilisateur.
Usage :
def on_notify(event):
print(f"[{event.niveau}] {event.message}")
observer = ShadowObserver(notify_callback=on_notify)
observer.start("sess_abc")
observer.observe_event(event1)
observer.observe_event(event2)
...
comprehension = observer.get_understanding()
# → [{"step": 1, "intent": "Ouvrir le Bloc-notes", "confidence": 0.8}, ...]
observer.stop()
Contraintes :
- 100% asynchrone côté performance : la méthode observe_event() ne doit
jamais bloquer la capture (pas d'appel réseau synchrone).
- Optionnel : activable via paramètre, ne modifie pas la capture existante.
- 100% français dans les messages utilisateur.
"""
from __future__ import annotations
import logging
import threading
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger(__name__)
# =========================================================================
# Types d'événements observationnels
# =========================================================================
class NiveauNotification(str, Enum):
"""Niveau d'importance d'une notification.
- INFO : information passive ("Léa observe...")
- DECOUVERTE : Léa vient de comprendre quelque chose de nouveau
- QUESTION : Léa aimerait une confirmation (non bloquant)
- VARIABLE : une variable a été détectée
"""
INFO = "info"
DECOUVERTE = "decouverte"
QUESTION = "question"
VARIABLE = "variable"
@dataclass
class NotificationShadow:
"""Notification émise par le ShadowObserver vers la GUI utilisateur."""
notif_id: str
niveau: NiveauNotification
message: str # Texte affichable à l'utilisateur (français)
session_id: str
step_index: int = -1 # Index de l'étape concernée, -1 si global
data: Dict[str, Any] = field(default_factory=dict)
timestamp: float = 0.0
def to_dict(self) -> Dict[str, Any]:
return {
"notif_id": self.notif_id,
"niveau": self.niveau.value,
"message": self.message,
"session_id": self.session_id,
"step_index": self.step_index,
"data": self.data,
"timestamp": self.timestamp,
}
@dataclass
class UnderstoodStep:
"""Étape logique comprise en temps réel par le ShadowObserver.
C'est une version simplifiée de `Step` (core.workflow.workflow_ir),
optimisée pour la construction incrémentale. Elle sera convertie
en `Step` final par le ShadowValidator après validation.
"""
step_index: int
intent: str # Intention humaine (ex: "Ouvrir le Bloc-notes")
intent_provisoire: bool = True # True tant que gemma4 n'a pas confirmé
confidence: float = 0.5 # Score de confiance (0..1)
app_name: str = "" # Application principale
window_title: str = "" # Titre de la fenêtre au début du segment
events: List[Dict[str, Any]] = field(default_factory=list)
variables_detectees: List[str] = field(default_factory=list)
started_at: float = 0.0
ended_at: float = 0.0
validated: bool = False # L'utilisateur a validé l'étape
corrected: bool = False # L'utilisateur a corrigé l'intention
cancelled: bool = False # L'utilisateur a annulé l'étape
def to_dict(self) -> Dict[str, Any]:
return {
"step_index": self.step_index,
"intent": self.intent,
"intent_provisoire": self.intent_provisoire,
"confidence": round(self.confidence, 3),
"app_name": self.app_name,
"window_title": self.window_title,
"events_count": len(self.events),
"variables_detectees": list(self.variables_detectees),
"started_at": self.started_at,
"ended_at": self.ended_at,
"validated": self.validated,
"corrected": self.corrected,
"cancelled": self.cancelled,
}
# =========================================================================
# Observer
# =========================================================================
# Constantes de segmentation (en secondes). On évite de re-déclarer les
# constantes de l'IRBuilder car l'observation est incrémentale — on peut
# se permettre des seuils plus courts pour plus de réactivité.
_SEUIL_PAUSE_LONGUE_S = 4.0
_SEUIL_CONFIANCE_BASE = 0.5
_SEUIL_CONFIANCE_APP_CHANGE = 0.8
# Types d'événements ignorés
_EVENT_TYPES_IGNORES = {
"heartbeat",
"focus_change",
"action_result",
"window_focus_change",
}
class ShadowObserver:
"""Observe les événements en temps réel et met à jour la compréhension.
Thread-safe : peut être appelé depuis plusieurs threads (capture,
API, worker).
Le callback `notify_callback` est appelé de manière synchrone mais les
notifications sont extrêmement légères (juste un dataclass) — elles
sont destinées à être envoyées via WebSocket/HTTP long-poll depuis la
couche API.
"""
NotifyCallback = Callable[[NotificationShadow], None]
def __init__(
self,
notify_callback: Optional[NotifyCallback] = None,
*,
enable_gemma4: bool = False,
gemma4_callback: Optional[Callable[[UnderstoodStep], None]] = None,
):
"""
Args:
notify_callback: Fonction appelée à chaque notification
(doit être rapide, pas d'IO bloquant).
enable_gemma4: Si True, une tâche asynchrone peut enrichir
les intentions via gemma4 (non bloquant). En pratique,
on laisse le caller le brancher via `gemma4_callback`.
gemma4_callback: Fonction appelée en arrière-plan pour
enrichir une étape (via gemma4 ou autre LLM). Non bloquant.
"""
self._notify_callback = notify_callback
self._enable_gemma4 = enable_gemma4
self._gemma4_callback = gemma4_callback
self._lock = threading.RLock()
self._sessions: Dict[str, Dict[str, Any]] = {}
# ----- Cycle de vie --------------------------------------------------
def start(self, session_id: str) -> None:
"""Démarrer l'observation d'une session."""
with self._lock:
self._sessions[session_id] = {
"steps": [], # List[UnderstoodStep]
"current_step": None, # Optional[UnderstoodStep]
"last_event_ts": 0.0,
"last_notif_ts": 0.0,
"total_events": 0,
"notifications": [], # Historique des notifications
"started_at": time.time(),
"stopped_at": 0.0,
}
self._notifier(
session_id,
NiveauNotification.INFO,
"Léa t'observe. Fais ta tâche normalement, je vais apprendre.",
)
def stop(self, session_id: str) -> None:
"""Arrêter l'observation et finaliser le segment en cours."""
with self._lock:
state = self._sessions.get(session_id)
if not state:
return
current = state.get("current_step")
if current is not None and current.events:
current.ended_at = state["last_event_ts"] or time.time()
state["steps"].append(current)
state["current_step"] = None
state["stopped_at"] = time.time()
nb_steps = len(self.get_understanding(session_id))
if nb_steps > 0:
self._notifier(
session_id,
NiveauNotification.DECOUVERTE,
f"J'ai observé {nb_steps} étape(s). Tu veux que je te les "
f"montre pour validation ?",
)
def reset(self, session_id: str) -> None:
"""Supprimer l'état d'une session (après finalisation)."""
with self._lock:
self._sessions.pop(session_id, None)
# ----- Observation ---------------------------------------------------
def observe_event(self, session_id: str, event: Dict[str, Any]) -> None:
"""Observer un nouvel événement pendant la capture.
Cette méthode est appelée à chaque événement reçu par le serveur.
Elle doit être RAPIDE (pas d'IO réseau synchrone).
"""
evt_type = event.get("type", "")
if evt_type in _EVENT_TYPES_IGNORES:
return
with self._lock:
state = self._sessions.get(session_id)
if not state:
# Auto-start si pas encore démarré (robustesse)
self.start(session_id)
state = self._sessions[session_id]
state["total_events"] += 1
# 1. Décider si on démarre un nouveau segment
current = state.get("current_step")
should_cut, cut_reason = self._should_cut(state, event)
if should_cut and current is not None:
current.ended_at = state["last_event_ts"] or time.time()
state["steps"].append(current)
self._emit_step_closed(session_id, current, cut_reason)
current = None
state["current_step"] = None
if current is None:
step_index = len(state["steps"]) + 1
current = UnderstoodStep(
step_index=step_index,
intent=self._initial_intent(event),
intent_provisoire=True,
confidence=_SEUIL_CONFIANCE_BASE,
app_name=self._get_app_name(event),
window_title=self._get_window_title(event),
started_at=float(event.get("timestamp", 0)) or time.time(),
)
state["current_step"] = current
# 2. Ajouter l'événement au segment courant
current.events.append(event)
ts = float(event.get("timestamp", 0)) or time.time()
state["last_event_ts"] = ts
# 3. Rafraîchir l'intent provisoire à partir du contexte accumulé
current.intent = self._refine_intent(current, event)
# 4. Détection de variable pendant la frappe
if evt_type == "text_input":
self._handle_text_input(session_id, current, event)
# 5. Émission périodique d'un résumé (toutes les 5s)
self._maybe_emit_heartbeat(session_id, state)
# ----- API publique --------------------------------------------------
def get_understanding(
self, session_id: str, include_current: bool = True
) -> List[Dict[str, Any]]:
"""Récupérer ce que Léa a compris jusqu'ici.
Returns:
Liste de dicts au format :
[{"step": 1, "intent": "Ouvrir le Bloc-notes",
"confidence": 0.9, "app": "Bloc-notes",
"events_count": 4, ...}, ...]
"""
with self._lock:
state = self._sessions.get(session_id)
if not state:
return []
steps = list(state["steps"])
if include_current and state.get("current_step") is not None:
steps = steps + [state["current_step"]]
out = []
for step in steps:
d = step.to_dict()
d["step"] = d.pop("step_index")
out.append(d)
return out
def get_notifications(
self, session_id: str, since_ts: float = 0.0
) -> List[Dict[str, Any]]:
"""Récupérer les notifications émises depuis un timestamp."""
with self._lock:
state = self._sessions.get(session_id)
if not state:
return []
return [
n.to_dict() for n in state["notifications"]
if n.timestamp >= since_ts
]
def get_current_step(
self, session_id: str
) -> Optional[Dict[str, Any]]:
"""Retourner l'étape en cours de construction."""
with self._lock:
state = self._sessions.get(session_id)
if not state:
return None
current = state.get("current_step")
if current is None:
return None
return current.to_dict()
def get_steps_internal(
self, session_id: str, include_current: bool = True
) -> List[UnderstoodStep]:
"""Version interne : retourne les objets `UnderstoodStep`.
Utilisé par le ShadowValidator pour reconstruire un WorkflowIR.
"""
with self._lock:
state = self._sessions.get(session_id)
if not state:
return []
steps = list(state["steps"])
if include_current and state.get("current_step") is not None:
steps = steps + [state["current_step"]]
# Retourner des copies pour éviter les mutations externes
return [self._copy_step(s) for s in steps]
def has_session(self, session_id: str) -> bool:
with self._lock:
return session_id in self._sessions
# ----- Internals : segmentation --------------------------------------
def _should_cut(
self, state: Dict[str, Any], event: Dict[str, Any]
) -> tuple:
"""Décider si l'événement doit démarrer un nouveau segment.
Returns:
(should_cut, reason)
"""
current = state.get("current_step")
if current is None or not current.events:
return (False, "")
# Coupure : changement d'application
new_app = self._get_app_name(event)
if new_app and current.app_name and new_app != current.app_name:
return (True, "changement_application")
# Coupure : pause longue entre deux événements
prev_ts = float(current.events[-1].get("timestamp", 0))
curr_ts = float(event.get("timestamp", 0))
if prev_ts > 0 and curr_ts > 0:
if (curr_ts - prev_ts) > _SEUIL_PAUSE_LONGUE_S:
return (True, "pause_longue")
# Coupure : key_combo « lourd » type ctrl+s (sauvegarde) → fin logique
evt_type = event.get("type", "")
if evt_type in ("key_combo", "key_press"):
keys = [str(k).lower() for k in event.get("keys", [])]
if "ctrl" in keys and any(k in keys for k in ("s", "enter")):
# On accroche le key_combo à l'étape courante, puis on coupe
# APRÈS — retourner False ici, la coupure se fera au prochain
# événement. C'est voulu.
return (False, "")
return (False, "")
def _initial_intent(self, event: Dict[str, Any]) -> str:
"""Intention provisoire d'un tout nouveau segment."""
app = self._get_app_name(event) or self._get_window_title(event)
evt_type = event.get("type", "")
if evt_type == "mouse_click":
hint = event.get("vision_info", {}).get("text", "")
if hint:
return f"Cliquer sur « {hint} »"
if app:
return f"Interagir avec {app}"
return "Cliquer quelque part"
if evt_type == "text_input":
text = event.get("text", "")[:40]
return f"Saisir du texte" + (f" « {text} »" if text else "")
if evt_type in ("key_combo", "key_press"):
keys = event.get("keys", [])
return f"Appuyer sur {'+'.join(keys)}" if keys else "Raccourci clavier"
return f"Action dans {app}" if app else "Action"
def _refine_intent(
self, step: UnderstoodStep, event: Dict[str, Any]
) -> str:
"""Raffiner l'intention au fur et à mesure qu'on voit plus d'événements.
Heuristiques simples — pas de gemma4 ici pour rester rapide.
"""
types = [e.get("type", "") for e in step.events]
has_click = "mouse_click" in types
has_type = "text_input" in types
has_key = any(t in ("key_combo", "key_press") for t in types)
app = step.app_name or self._get_window_title(event)
# Cas 1 : clic + saisie + entrée → "Rechercher X"
if has_click and has_type:
texts = [e.get("text", "") for e in step.events if e.get("type") == "text_input"]
if texts and any("enter" in [k.lower() for k in e.get("keys", [])]
for e in step.events if e.get("type") in ("key_combo", "key_press")):
premier_texte = next((t for t in texts if t), "")
if premier_texte:
step.confidence = min(0.85, step.confidence + 0.05)
return f"Rechercher « {premier_texte[:30]} »"
# Cas 2 : saisie seule → "Écrire du texte"
if has_type and not has_click:
texts = [e.get("text", "") for e in step.events if e.get("type") == "text_input"]
premier_texte = next((t for t in texts if t), "")
if premier_texte:
return f"Écrire « {premier_texte[:40]} »"
return "Écrire du texte"
# Cas 3 : ctrl+s → "Sauvegarder"
if has_key:
for e in step.events:
if e.get("type") in ("key_combo", "key_press"):
keys = [str(k).lower() for k in e.get("keys", [])]
if "ctrl" in keys and "s" in keys:
step.confidence = min(0.9, step.confidence + 0.1)
return f"Sauvegarder{' dans ' + app if app else ''}"
if "ctrl" in keys and "c" in keys:
return f"Copier{' depuis ' + app if app else ''}"
if "ctrl" in keys and "v" in keys:
return f"Coller{' dans ' + app if app else ''}"
# Cas 4 : clic seul + app identifiable
if has_click and app:
hint = ""
for e in step.events:
if e.get("type") == "mouse_click":
hint = e.get("vision_info", {}).get("text", "")
if hint:
break
if hint:
return f"Cliquer sur « {hint} » dans {app}"
return f"Interagir avec {app}"
return step.intent
def _handle_text_input(
self,
session_id: str,
step: UnderstoodStep,
event: Dict[str, Any],
) -> None:
"""Détecter et notifier une variable lors d'une saisie texte."""
text = (event.get("text") or "").strip()
if not text or len(text) < 3:
return
# Déduire un nom de variable provisoire
var_name = f"texte_{len(step.variables_detectees) + 1}"
step.variables_detectees.append(var_name)
# Heuristique : détecter le type plausible
var_type = self._guess_variable_type(text)
self._notifier(
session_id,
NiveauNotification.VARIABLE,
f"Variable détectée : tu as tapé « {text[:40]} » — c'est {var_type} ?",
step_index=step.step_index,
data={
"variable_name": var_name,
"value": text,
"variable_type": var_type,
},
)
def _guess_variable_type(self, text: str) -> str:
"""Deviner le type d'une variable à partir de sa valeur."""
t = text.strip()
# Date (basique)
if len(t) == 10 and t[2] in "/-" and t[5] in "/-":
return "une date"
if t.isdigit():
return "un numéro"
if "@" in t and "." in t:
return "une adresse e-mail"
if len(t) <= 10 and t.replace(" ", "").replace("-", "").isalnum() and not any(c.islower() for c in t):
return "un code"
if " " in t and len(t) > 10:
return "un texte libre"
return "un texte"
# ----- Internals : notifications -------------------------------------
def _notifier(
self,
session_id: str,
niveau: NiveauNotification,
message: str,
*,
step_index: int = -1,
data: Optional[Dict[str, Any]] = None,
) -> None:
"""Créer et émettre une notification."""
notif = NotificationShadow(
notif_id=uuid.uuid4().hex[:12],
niveau=niveau,
message=message,
session_id=session_id,
step_index=step_index,
data=data or {},
timestamp=time.time(),
)
with self._lock:
state = self._sessions.get(session_id)
if state is not None:
state["notifications"].append(notif)
state["last_notif_ts"] = notif.timestamp
if self._notify_callback is not None:
try:
self._notify_callback(notif)
except Exception as e:
logger.debug(f"ShadowObserver: callback a échoué : {e}")
def _emit_step_closed(
self,
session_id: str,
step: UnderstoodStep,
reason: str,
) -> None:
"""Émettre une notification quand une étape est fermée."""
raison_humaine = {
"changement_application": "tu es passé à une autre application",
"pause_longue": "tu as fait une pause",
}.get(reason, "")
suffixe = f" ({raison_humaine})" if raison_humaine else ""
self._notifier(
session_id,
NiveauNotification.DECOUVERTE,
f"Nouvelle étape comprise : {step.intent}{suffixe}",
step_index=step.step_index,
data={"step": step.to_dict()},
)
if self._enable_gemma4 and self._gemma4_callback is not None:
# Non bloquant : on délègue au caller (qui peut utiliser un thread)
try:
self._gemma4_callback(self._copy_step(step))
except Exception as e:
logger.debug(f"ShadowObserver: gemma4_callback a échoué : {e}")
def _maybe_emit_heartbeat(
self,
session_id: str,
state: Dict[str, Any],
) -> None:
"""Émettre un résumé périodique (toutes les 5s env.)."""
now = time.time()
last = state.get("last_notif_ts", 0)
if now - last < 5.0:
return
nb_steps = len(state["steps"])
if state.get("current_step") is not None:
nb_steps += 1
if nb_steps == 0:
return
self._notifier(
session_id,
NiveauNotification.INFO,
f"J'ai compris {nb_steps} étape(s) jusqu'ici.",
data={"steps_count": nb_steps},
)
# ----- Utilitaires ---------------------------------------------------
@staticmethod
def _get_app_name(event: Dict[str, Any]) -> str:
"""Extraire le nom d'application depuis un événement."""
window = event.get("window") or {}
if isinstance(window, dict):
title = window.get("title", "")
app_name = window.get("app_name", "")
else:
title = event.get("window_title", "")
app_name = ""
# Préférer app_name si disponible
if app_name and app_name != "unknown":
return app_name
# Sinon, extraire depuis le titre
for sep in [" ", " - ", ""]:
if sep in title:
return title.split(sep)[-1].strip()
return title.strip() if title else ""
@staticmethod
def _get_window_title(event: Dict[str, Any]) -> str:
window = event.get("window") or {}
if isinstance(window, dict):
return window.get("title", "") or ""
return event.get("window_title", "") or ""
@staticmethod
def _copy_step(step: UnderstoodStep) -> UnderstoodStep:
"""Copie superficielle pour éviter les fuites de mutation."""
return UnderstoodStep(
step_index=step.step_index,
intent=step.intent,
intent_provisoire=step.intent_provisoire,
confidence=step.confidence,
app_name=step.app_name,
window_title=step.window_title,
events=list(step.events),
variables_detectees=list(step.variables_detectees),
started_at=step.started_at,
ended_at=step.ended_at,
validated=step.validated,
corrected=step.corrected,
cancelled=step.cancelled,
)
# =========================================================================
# Singleton partagé (optionnel)
# =========================================================================
_shared_observer: Optional[ShadowObserver] = None
_shared_lock = threading.Lock()
def get_shared_observer() -> ShadowObserver:
"""Observer partagé pour l'API (lazy init)."""
global _shared_observer
with _shared_lock:
if _shared_observer is None:
_shared_observer = ShadowObserver()
return _shared_observer

View File

@@ -0,0 +1,468 @@
# core/workflow/shadow_validator.py
"""
ShadowValidator — Applique les feedbacks utilisateur et reconstruit un WorkflowIR.
Le ShadowObserver observe et comprend en temps réel. Le ShadowValidator,
lui, prend les décisions de l'utilisateur (valider, corriger, annuler,
combiner) et reconstruit un WorkflowIR final « propre » qui sera
persisté et exécutable par le runtime.
Opérations supportées :
- validate(step_index) : marquer l'étape comme validée
- correct(step_index, new_intent) : corriger l'intention
- undo(step_index) : annuler l'étape (elle sera exclue du WorkflowIR)
- merge_with_next(step_index) : fusionner avec l'étape suivante
- cancel() : annuler tout le workflow
- split(step_index, at_event_index) : couper une étape en deux (bonus)
Le validator ne touche PAS aux événements bruts (events.jsonl) — il
travaille sur la liste des `UnderstoodStep` fournie par le ShadowObserver.
Une fois toutes les actions appliquées, `build_workflow_ir()` produit
un WorkflowIR exécutable à partir des étapes validées/corrigées.
Usage :
validator = ShadowValidator()
validator.set_steps(observer.get_steps_internal(session_id))
validator.apply_feedback({"action": "validate", "step_index": 1})
validator.apply_feedback({
"action": "correct",
"step_index": 2,
"new_intent": "Sauvegarder le document",
})
validator.apply_feedback({"action": "undo", "step_index": 3})
ir = validator.build_workflow_ir(
session_id="sess_abc",
name="Mon workflow",
domain="generic",
)
ir.save("data/workflows/")
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from .shadow_observer import UnderstoodStep
from .workflow_ir import Action, Step, Variable, WorkflowIR
logger = logging.getLogger(__name__)
# Actions supportées par le feedback
FEEDBACK_ACTIONS = {
"validate",
"correct",
"undo",
"cancel",
"merge_next",
"split",
}
@dataclass
class FeedbackResult:
"""Résultat d'une opération de feedback."""
ok: bool
action: str
step_index: int
message: str
data: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return {
"ok": self.ok,
"action": self.action,
"step_index": self.step_index,
"message": self.message,
"data": dict(self.data),
}
class ShadowValidator:
"""Applique les feedbacks utilisateur et produit un WorkflowIR."""
def __init__(self) -> None:
self._steps: List[UnderstoodStep] = []
self._cancelled_workflow: bool = False
self._history: List[FeedbackResult] = []
# ----- API -----------------------------------------------------------
def set_steps(self, steps: List[UnderstoodStep]) -> None:
"""Initialiser le validator avec la liste des étapes observées."""
self._steps = [self._clone(s) for s in steps]
self._cancelled_workflow = False
self._history = []
@property
def steps(self) -> List[UnderstoodStep]:
"""Vue en lecture des étapes courantes."""
return list(self._steps)
@property
def history(self) -> List[FeedbackResult]:
"""Historique des feedbacks appliqués."""
return list(self._history)
@property
def is_cancelled(self) -> bool:
return self._cancelled_workflow
def apply_feedback(self, feedback: Dict[str, Any]) -> FeedbackResult:
"""Appliquer un feedback utilisateur.
Le `feedback` est un dict au format :
{
"action": "validate" | "correct" | "undo" | "cancel" | "merge_next" | "split",
"step_index": 1, # Index 1-based (comme dans get_understanding)
"new_intent": "...", # Pour correct
"at_event_index": 3, # Pour split
}
Returns:
FeedbackResult
"""
action = (feedback.get("action") or "").strip()
if action not in FEEDBACK_ACTIONS:
return self._record(FeedbackResult(
ok=False, action=action, step_index=-1,
message=f"Action inconnue : « {action} »",
data={"supported": sorted(FEEDBACK_ACTIONS)},
))
if action == "cancel":
return self._do_cancel()
step_index = int(feedback.get("step_index", -1))
if not self._is_valid_step_index(step_index):
return self._record(FeedbackResult(
ok=False, action=action, step_index=step_index,
message=f"Index d'étape invalide : {step_index}",
data={"nb_steps": len(self._steps)},
))
if action == "validate":
return self._do_validate(step_index)
if action == "correct":
return self._do_correct(step_index, feedback.get("new_intent", ""))
if action == "undo":
return self._do_undo(step_index)
if action == "merge_next":
return self._do_merge_next(step_index)
if action == "split":
return self._do_split(
step_index, int(feedback.get("at_event_index", -1))
)
return self._record(FeedbackResult(
ok=False, action=action, step_index=step_index,
message="Action non implémentée", data={},
))
def apply_feedbacks(
self, feedbacks: List[Dict[str, Any]]
) -> List[FeedbackResult]:
"""Appliquer plusieurs feedbacks dans l'ordre."""
return [self.apply_feedback(f) for f in feedbacks]
# ----- Opérations ---------------------------------------------------
def _do_validate(self, step_index: int) -> FeedbackResult:
step = self._get_step(step_index)
step.validated = True
step.intent_provisoire = False
step.confidence = max(step.confidence, 0.95)
return self._record(FeedbackResult(
ok=True, action="validate", step_index=step_index,
message=f"Étape {step_index} validée : {step.intent}",
data={"intent": step.intent},
))
def _do_correct(
self, step_index: int, new_intent: str
) -> FeedbackResult:
new_intent = (new_intent or "").strip()
if not new_intent:
return self._record(FeedbackResult(
ok=False, action="correct", step_index=step_index,
message="Nouvelle intention vide",
data={},
))
step = self._get_step(step_index)
old_intent = step.intent
step.intent = new_intent
step.corrected = True
step.validated = True # Corriger = implicitement valider
step.intent_provisoire = False
step.confidence = 1.0
return self._record(FeedbackResult(
ok=True, action="correct", step_index=step_index,
message=f"Étape {step_index} corrigée : « {old_intent} » → « {new_intent} »",
data={"old_intent": old_intent, "new_intent": new_intent},
))
def _do_undo(self, step_index: int) -> FeedbackResult:
step = self._get_step(step_index)
step.cancelled = True
return self._record(FeedbackResult(
ok=True, action="undo", step_index=step_index,
message=f"Étape {step_index} annulée : {step.intent}",
data={"intent": step.intent},
))
def _do_merge_next(self, step_index: int) -> FeedbackResult:
"""Fusionner l'étape avec la suivante."""
if step_index >= len(self._steps):
return self._record(FeedbackResult(
ok=False, action="merge_next", step_index=step_index,
message="Aucune étape suivante à fusionner",
data={},
))
step = self._get_step(step_index)
next_step = self._get_step(step_index + 1)
merged = UnderstoodStep(
step_index=step.step_index,
intent=step.intent if len(step.intent) >= len(next_step.intent) else next_step.intent,
intent_provisoire=False,
confidence=max(step.confidence, next_step.confidence),
app_name=step.app_name or next_step.app_name,
window_title=step.window_title or next_step.window_title,
events=list(step.events) + list(next_step.events),
variables_detectees=list(step.variables_detectees)
+ list(next_step.variables_detectees),
started_at=step.started_at or next_step.started_at,
ended_at=next_step.ended_at or step.ended_at,
validated=True,
corrected=step.corrected or next_step.corrected,
cancelled=False,
)
# Remplacer [step, next_step] par [merged]
idx0 = step_index - 1 # 1-based → 0-based
self._steps.pop(idx0 + 1) # next_step
self._steps[idx0] = merged
self._renumber()
return self._record(FeedbackResult(
ok=True, action="merge_next", step_index=step_index,
message=f"Étapes {step_index} et {step_index + 1} fusionnées",
data={"intent": merged.intent},
))
def _do_split(
self, step_index: int, at_event_index: int
) -> FeedbackResult:
"""Couper une étape en deux au niveau de l'événement at_event_index.
`at_event_index` est 0-based parmi les events de l'étape.
"""
step = self._get_step(step_index)
if at_event_index <= 0 or at_event_index >= len(step.events):
return self._record(FeedbackResult(
ok=False, action="split", step_index=step_index,
message=f"Index de coupe invalide : {at_event_index}",
data={"nb_events": len(step.events)},
))
left_events = step.events[:at_event_index]
right_events = step.events[at_event_index:]
left = UnderstoodStep(
step_index=step.step_index,
intent=step.intent + " (1/2)",
intent_provisoire=True,
confidence=step.confidence * 0.9,
app_name=step.app_name,
window_title=step.window_title,
events=left_events,
started_at=step.started_at,
)
right = UnderstoodStep(
step_index=step.step_index + 1,
intent=step.intent + " (2/2)",
intent_provisoire=True,
confidence=step.confidence * 0.9,
app_name=step.app_name,
window_title=step.window_title,
events=right_events,
started_at=float(right_events[0].get("timestamp", 0))
if right_events else step.started_at,
ended_at=step.ended_at,
)
idx0 = step_index - 1
self._steps[idx0] = left
self._steps.insert(idx0 + 1, right)
self._renumber()
return self._record(FeedbackResult(
ok=True, action="split", step_index=step_index,
message=f"Étape {step_index} coupée en 2",
data={"nb_steps": len(self._steps)},
))
def _do_cancel(self) -> FeedbackResult:
self._cancelled_workflow = True
return self._record(FeedbackResult(
ok=True, action="cancel", step_index=-1,
message="Workflow annulé",
data={},
))
# ----- Construction du WorkflowIR -----------------------------------
def build_workflow_ir(
self,
session_id: str = "",
name: str = "",
domain: str = "generic",
*,
require_all_validated: bool = False,
) -> Optional[WorkflowIR]:
"""Construire un WorkflowIR à partir des étapes corrigées.
Args:
session_id: Identifiant de la session source.
name: Nom du workflow.
domain: Domaine métier.
require_all_validated: Si True, lève une erreur si au moins
une étape n'a pas été validée explicitement.
Returns:
WorkflowIR ou None si le workflow a été annulé.
"""
if self._cancelled_workflow:
logger.info("ShadowValidator: workflow annulé, pas de build")
return None
ir = WorkflowIR.new(
name=name or f"Workflow du {time.strftime('%d/%m/%Y %H:%M')}",
domain=domain,
learned_from=session_id,
)
variables: List[Variable] = []
seen_texts = set()
applications: set = set()
for step in self._steps:
if step.cancelled:
continue
if require_all_validated and not step.validated:
raise ValueError(
f"Étape {step.step_index} non validée : {step.intent}"
)
if step.app_name:
applications.add(step.app_name)
actions = []
for evt in step.events:
action = self._event_to_action(evt)
if action is None:
continue
# Détection de variable (texte saisi)
if action.type == "type" and action.text:
text = action.text.strip()
if text and text not in seen_texts and len(text) > 2:
seen_texts.add(text)
var_name = f"texte_{len(variables) + 1}"
variables.append(Variable(
name=var_name,
description=f"Texte saisi : « {text[:50]} »",
source="user",
default=text,
))
action.variable = True
action.text = "{" + var_name + "}"
actions.append(action)
ir_step = Step(
step_id=f"s{len(ir.steps) + 1}",
intent=step.intent,
actions=actions,
)
ir.steps.append(ir_step)
ir.variables = variables
ir.applications = sorted(applications)
ir.updated_at = time.time()
logger.info(
f"ShadowValidator: WorkflowIR construit — {len(ir.steps)} étapes, "
f"{len(ir.variables)} variables"
)
return ir
# ----- Utilitaires --------------------------------------------------
def _is_valid_step_index(self, step_index: int) -> bool:
return 1 <= step_index <= len(self._steps)
def _get_step(self, step_index: int) -> UnderstoodStep:
return self._steps[step_index - 1]
def _renumber(self) -> None:
for i, s in enumerate(self._steps, start=1):
s.step_index = i
def _record(self, result: FeedbackResult) -> FeedbackResult:
self._history.append(result)
return result
@staticmethod
def _clone(step: UnderstoodStep) -> UnderstoodStep:
return UnderstoodStep(
step_index=step.step_index,
intent=step.intent,
intent_provisoire=step.intent_provisoire,
confidence=step.confidence,
app_name=step.app_name,
window_title=step.window_title,
events=list(step.events),
variables_detectees=list(step.variables_detectees),
started_at=step.started_at,
ended_at=step.ended_at,
validated=step.validated,
corrected=step.corrected,
cancelled=step.cancelled,
)
@staticmethod
def _event_to_action(evt: Dict[str, Any]) -> Optional[Action]:
"""Convertir un événement brut en Action (miroir de IRBuilder)."""
evt_type = evt.get("type", "")
if evt_type == "mouse_click":
window = evt.get("window") or {}
if isinstance(window, dict):
target = window.get("title", "")
else:
target = evt.get("window_title", "")
return Action(
type="click",
target=target or "",
anchor_hint=(evt.get("vision_info") or {}).get("text", ""),
)
if evt_type == "text_input":
text = evt.get("text", "")
if text:
return Action(type="type", text=text)
if evt_type in ("key_combo", "key_press"):
keys = evt.get("keys", [])
if keys:
return Action(type="key_combo", keys=list(keys))
if evt_type == "scroll":
return Action(type="scroll")
return None

View File

@@ -0,0 +1,430 @@
"""
Tests du ShadowObserver — observation temps réel de Léa.
Vérifie que :
- L'observer démarre et s'arrête correctement
- Les événements sont segmentés en étapes logiques
- Les variables sont détectées pendant la frappe
- Les notifications sont émises avec le bon niveau
- La compréhension est accessible en temps réel
- Le callback de notification est appelé
- Les événements parasites (heartbeat) sont ignorés
"""
import sys
import time
from pathlib import Path
from unittest.mock import MagicMock
import pytest
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from core.workflow.shadow_observer import (
NiveauNotification,
NotificationShadow,
ShadowObserver,
UnderstoodStep,
get_shared_observer,
)
# =========================================================================
# Fixtures
# =========================================================================
def _evt_click(text="Rechercher", title="Explorateur", ts=100.0):
return {
"type": "mouse_click",
"pos": [400, 580],
"window": {"title": title, "app_name": title.split(" - ")[-1] if " - " in title else title},
"timestamp": ts,
"vision_info": {"text": text},
}
def _evt_type(text="bonjour", title="Bloc-notes", ts=101.0):
return {
"type": "text_input",
"text": text,
"window": {"title": title, "app_name": title},
"timestamp": ts,
}
def _evt_key(keys=None, title="Bloc-notes", ts=102.0):
return {
"type": "key_combo",
"keys": keys or ["enter"],
"window": {"title": title, "app_name": title},
"timestamp": ts,
}
def _evt_heartbeat(ts=103.0):
return {"type": "heartbeat", "timestamp": ts}
def _make_session_events():
"""Scénario typique : ouvrir Bloc-notes, taper du texte, sauvegarder."""
return [
_evt_click(text="Rechercher", title="Menu Démarrer", ts=100.0),
_evt_type(text="blocnote", title="Menu Démarrer", ts=101.0),
_evt_key(keys=["enter"], title="Menu Démarrer", ts=102.0),
_evt_heartbeat(ts=103.0), # Doit être ignoré
# Changement d'application → nouveau segment attendu
_evt_click(text="", title="Sans titre - Bloc-notes", ts=105.0),
_evt_type(text="Bonjour le monde", title="Sans titre - Bloc-notes", ts=106.0),
_evt_key(keys=["ctrl", "s"], title="Sans titre - Bloc-notes", ts=108.0),
]
# =========================================================================
# Tests de base
# =========================================================================
class TestShadowObserverBase:
def test_start_et_stop(self):
obs = ShadowObserver()
obs.start("sess_test")
assert obs.has_session("sess_test")
obs.stop("sess_test")
assert obs.has_session("sess_test") # stop ne supprime pas
obs.reset("sess_test")
assert not obs.has_session("sess_test")
def test_auto_start_sur_premier_event(self):
"""observe_event() sans start() doit auto-démarrer la session."""
obs = ShadowObserver()
obs.observe_event("sess_auto", _evt_click())
assert obs.has_session("sess_auto")
steps = obs.get_understanding("sess_auto")
assert len(steps) >= 1
def test_heartbeat_ignore(self):
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_heartbeat())
steps = obs.get_understanding("s1")
assert len(steps) == 0 # Aucune étape créée par un heartbeat
def test_focus_change_ignore(self):
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", {"type": "focus_change", "timestamp": 100})
assert len(obs.get_understanding("s1")) == 0
# =========================================================================
# Segmentation
# =========================================================================
class TestShadowObserverSegmentation:
def test_segmentation_par_changement_app(self):
"""Un changement d'application crée un nouveau segment."""
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_click(title="Firefox"))
obs.observe_event("s1", _evt_click(title="Firefox", ts=100.5))
obs.observe_event("s1", _evt_click(title="Bloc-notes", ts=101.0))
steps = obs.get_understanding("s1")
assert len(steps) >= 2 # Au moins 2 segments
def test_segmentation_par_pause_longue(self):
"""Une pause > 4s coupe le segment."""
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_click(title="App1", ts=100.0))
obs.observe_event("s1", _evt_click(title="App1", ts=100.5))
# Pause de 10 secondes
obs.observe_event("s1", _evt_click(title="App1", ts=110.5))
steps = obs.get_understanding("s1")
assert len(steps) >= 2
def test_segment_complet(self):
"""Scénario complet : Bloc-notes + texte + save."""
obs = ShadowObserver()
obs.start("s1")
for evt in _make_session_events():
obs.observe_event("s1", evt)
obs.stop("s1")
steps = obs.get_understanding("s1")
assert len(steps) >= 2 # Au moins Menu Démarrer + Bloc-notes
# Au moins une étape doit mentionner le Bloc-notes
intents = " ".join(s["intent"].lower() for s in steps)
assert "bloc" in intents or "enregistr" in intents or "sauvegard" in intents or \
"écrir" in intents or "text" in intents.lower()
# =========================================================================
# Intention et raffinement
# =========================================================================
class TestShadowObserverIntent:
def test_intent_recherche(self):
"""Clic + saisie + entrée → 'Rechercher X'."""
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_click(text="Champ recherche", title="Explorateur", ts=100.0))
obs.observe_event("s1", _evt_type(text="calculatrice", title="Explorateur", ts=100.5))
obs.observe_event("s1", _evt_key(keys=["enter"], title="Explorateur", ts=101.0))
current = obs.get_current_step("s1")
assert current is not None
assert "recherch" in current["intent"].lower() or "calculatrice" in current["intent"].lower()
def test_intent_ctrl_s(self):
"""Ctrl+S → 'Sauvegarder'."""
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_click(title="Bloc-notes", ts=100.0))
obs.observe_event("s1", _evt_key(keys=["ctrl", "s"], title="Bloc-notes", ts=100.5))
current = obs.get_current_step("s1")
assert current is not None
assert "sauvegard" in current["intent"].lower()
def test_intent_ecrire(self):
"""Saisie seule → 'Écrire'."""
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_type(text="Un texte libre", title="Bloc-notes"))
current = obs.get_current_step("s1")
assert current is not None
assert "écri" in current["intent"].lower() or "text" in current["intent"].lower()
def test_confidence_augmente_avec_contexte(self):
"""La confiance augmente quand le contexte devient clair."""
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_click(title="App"))
c1 = obs.get_current_step("s1")["confidence"]
obs.observe_event("s1", _evt_key(keys=["ctrl", "s"], title="App"))
c2 = obs.get_current_step("s1")["confidence"]
assert c2 >= c1
# =========================================================================
# Détection de variables
# =========================================================================
class TestShadowObserverVariables:
def test_variable_detectee_lors_saisie(self):
"""Une saisie texte > 3 caractères crée une variable."""
obs = ShadowObserver()
notifs = []
obs._notify_callback = lambda n: notifs.append(n)
obs.start("s1")
obs.observe_event("s1", _evt_type(text="Jean Dupont", title="Formulaire"))
var_notifs = [n for n in notifs if n.niveau == NiveauNotification.VARIABLE]
assert len(var_notifs) == 1
assert "Jean Dupont" in var_notifs[0].message
assert var_notifs[0].data["variable_name"].startswith("texte_")
def test_variable_type_date(self):
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_type(text="15/03/2026", title="Formulaire"))
current = obs.get_current_step("s1")
assert len(current["variables_detectees"]) == 1
def test_variable_type_email(self):
obs = ShadowObserver()
notifs = []
obs._notify_callback = lambda n: notifs.append(n)
obs.start("s1")
obs.observe_event("s1", _evt_type(text="jean@example.com", title="Formulaire"))
var_notifs = [n for n in notifs if n.niveau == NiveauNotification.VARIABLE]
assert len(var_notifs) == 1
assert "e-mail" in var_notifs[0].message or "mail" in var_notifs[0].message
def test_texte_court_ignore(self):
"""Un texte de moins de 3 caractères n'est pas une variable."""
obs = ShadowObserver()
notifs = []
obs._notify_callback = lambda n: notifs.append(n)
obs.start("s1")
obs.observe_event("s1", _evt_type(text="ab", title="App"))
var_notifs = [n for n in notifs if n.niveau == NiveauNotification.VARIABLE]
assert len(var_notifs) == 0
# =========================================================================
# Notifications et callbacks
# =========================================================================
class TestShadowObserverNotifications:
def test_notification_au_demarrage(self):
notifs = []
obs = ShadowObserver(notify_callback=lambda n: notifs.append(n))
obs.start("s1")
assert len(notifs) >= 1
assert notifs[0].niveau == NiveauNotification.INFO
assert "observe" in notifs[0].message.lower()
def test_notification_nouvelle_etape(self):
"""Un changement d'application émet une notification DECOUVERTE."""
notifs = []
obs = ShadowObserver(notify_callback=lambda n: notifs.append(n))
obs.start("s1")
obs.observe_event("s1", _evt_click(title="Firefox", ts=100.0))
obs.observe_event("s1", _evt_click(title="Bloc-notes", ts=101.0))
decouverts = [n for n in notifs if n.niveau == NiveauNotification.DECOUVERTE]
assert len(decouverts) >= 1
def test_notification_stop_resume(self):
"""Au stop, on émet un résumé du nombre d'étapes."""
notifs = []
obs = ShadowObserver(notify_callback=lambda n: notifs.append(n))
obs.start("s1")
for evt in _make_session_events():
obs.observe_event("s1", evt)
obs.stop("s1")
messages = [n.message.lower() for n in notifs]
assert any("étape" in m or "observ" in m for m in messages)
def test_notifications_since_ts(self):
"""get_notifications(since_ts=...) filtre correctement."""
obs = ShadowObserver()
obs.start("s1")
time.sleep(0.01)
mid_ts = time.time()
time.sleep(0.01)
obs.observe_event("s1", _evt_click(title="Firefox"))
obs.observe_event("s1", _evt_click(title="Bloc-notes"))
recentes = obs.get_notifications("s1", since_ts=mid_ts)
toutes = obs.get_notifications("s1", since_ts=0)
assert len(recentes) < len(toutes)
def test_callback_erreur_ne_plante_pas(self):
"""Un callback qui lève ne doit pas faire planter l'observer."""
def bad_callback(notif):
raise RuntimeError("boom")
obs = ShadowObserver(notify_callback=bad_callback)
obs.start("s1") # Devrait émettre une notification (qui plante en callback)
# Si on arrive ici, c'est OK
obs.observe_event("s1", _evt_click())
# =========================================================================
# Compréhension et API publique
# =========================================================================
class TestShadowObserverUnderstanding:
def test_get_understanding_format(self):
"""La structure retournée est bien celle attendue."""
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_click(title="App"))
steps = obs.get_understanding("s1")
assert isinstance(steps, list)
assert len(steps) >= 1
step = steps[0]
assert "step" in step
assert "intent" in step
assert "confidence" in step
assert isinstance(step["step"], int)
assert 0.0 <= step["confidence"] <= 1.0
def test_get_understanding_sans_session(self):
obs = ShadowObserver()
steps = obs.get_understanding("inexistante")
assert steps == []
def test_get_current_step(self):
obs = ShadowObserver()
obs.start("s1")
assert obs.get_current_step("s1") is None
obs.observe_event("s1", _evt_click(title="App"))
current = obs.get_current_step("s1")
assert current is not None
assert current["step_index"] == 1
def test_get_steps_internal(self):
"""get_steps_internal retourne des UnderstoodStep copiés."""
obs = ShadowObserver()
obs.start("s1")
obs.observe_event("s1", _evt_click(title="App"))
internals = obs.get_steps_internal("s1")
assert len(internals) >= 1
assert isinstance(internals[0], UnderstoodStep)
# Mutation externe ne doit pas affecter l'observer
internals[0].intent = "HACKED"
again = obs.get_steps_internal("s1")
assert again[0].intent != "HACKED"
# =========================================================================
# Singleton partagé
# =========================================================================
class TestSharedObserver:
def test_singleton(self):
obs1 = get_shared_observer()
obs2 = get_shared_observer()
assert obs1 is obs2
# =========================================================================
# Performance (contrainte : observe_event doit être rapide)
# =========================================================================
class TestShadowObserverPerformance:
def test_observe_event_rapide(self):
"""observe_event() doit traiter 1000 events en moins de 500ms."""
obs = ShadowObserver()
obs.start("s_perf")
events = []
for i in range(1000):
events.append(_evt_click(title="App", ts=100.0 + i * 0.01))
start = time.time()
for evt in events:
obs.observe_event("s_perf", evt)
elapsed = time.time() - start
assert elapsed < 0.5, f"Trop lent : {elapsed:.2f}s pour 1000 events"

View File

@@ -0,0 +1,531 @@
"""
Tests du ShadowValidator — feedback utilisateur et reconstruction WorkflowIR.
Vérifie que :
- Les feedbacks (validate/correct/undo/merge_next/split/cancel) sont appliqués
- Le WorkflowIR final est bien reconstruit à partir des étapes corrigées
- Les variables sont détectées dans les actions finales
- L'historique des feedbacks est conservé
- Les erreurs (index invalide, action inconnue) sont gérées proprement
"""
import sys
import time
from pathlib import Path
from unittest.mock import MagicMock
import pytest
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from core.workflow.shadow_observer import ShadowObserver, UnderstoodStep
from core.workflow.shadow_validator import FeedbackResult, ShadowValidator
from core.workflow.workflow_ir import WorkflowIR
# =========================================================================
# Fixtures
# =========================================================================
def _make_step(step_index=1, intent="Ouvrir Firefox", app="Firefox",
events=None, **kwargs):
return UnderstoodStep(
step_index=step_index,
intent=intent,
app_name=app,
window_title=app,
events=events or [
{"type": "mouse_click", "window": {"title": app},
"vision_info": {"text": "bouton"}, "timestamp": 100.0}
],
started_at=100.0,
ended_at=101.0,
**kwargs,
)
def _make_type_step(texte="Bonjour", app="Bloc-notes", step_index=1):
return UnderstoodStep(
step_index=step_index,
intent=f"Écrire « {texte} »",
app_name=app,
window_title=app,
events=[
{"type": "text_input", "text": texte,
"window": {"title": app}, "timestamp": 100.0}
],
started_at=100.0,
)
def _make_3_steps():
return [
_make_step(1, "Ouvrir le Bloc-notes", "Bloc-notes"),
_make_type_step("Bonjour le monde", step_index=2),
_make_step(3, "Sauvegarder", "Bloc-notes", events=[
{"type": "key_combo", "keys": ["ctrl", "s"],
"window": {"title": "Bloc-notes"}, "timestamp": 103.0}
]),
]
# =========================================================================
# Initialisation
# =========================================================================
class TestShadowValidatorBase:
def test_creation(self):
v = ShadowValidator()
assert v.steps == []
assert v.history == []
assert not v.is_cancelled
def test_set_steps(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
assert len(v.steps) == 3
assert v.steps[0].intent == "Ouvrir le Bloc-notes"
def test_clone_protege_mutation(self):
"""set_steps clone les étapes pour éviter les mutations externes."""
v = ShadowValidator()
steps = _make_3_steps()
v.set_steps(steps)
steps[0].intent = "HACKED"
assert v.steps[0].intent == "Ouvrir le Bloc-notes"
# =========================================================================
# validate
# =========================================================================
class TestValidatorValidate:
def test_validate_etape(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({"action": "validate", "step_index": 1})
assert result.ok is True
assert v.steps[0].validated is True
assert v.steps[0].confidence >= 0.95
assert v.steps[0].intent_provisoire is False
def test_validate_index_invalide(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({"action": "validate", "step_index": 99})
assert result.ok is False
assert "invalide" in result.message.lower()
def test_validate_toutes_les_etapes(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
for i in range(1, 4):
v.apply_feedback({"action": "validate", "step_index": i})
assert all(s.validated for s in v.steps)
# =========================================================================
# correct
# =========================================================================
class TestValidatorCorrect:
def test_correct_intent(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({
"action": "correct",
"step_index": 1,
"new_intent": "Démarrer la rédaction d'un email",
})
assert result.ok is True
assert v.steps[0].intent == "Démarrer la rédaction d'un email"
assert v.steps[0].corrected is True
assert v.steps[0].validated is True # Corriger = valider implicitement
assert "old_intent" in result.data
def test_correct_intent_vide(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({
"action": "correct",
"step_index": 1,
"new_intent": "",
})
assert result.ok is False
assert v.steps[0].corrected is False
# =========================================================================
# undo
# =========================================================================
class TestValidatorUndo:
def test_undo_etape(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({"action": "undo", "step_index": 2})
assert result.ok is True
assert v.steps[1].cancelled is True
def test_undo_exclut_etape_du_workflow(self):
"""Une étape undo ne doit pas apparaître dans le WorkflowIR final."""
v = ShadowValidator()
v.set_steps(_make_3_steps())
v.apply_feedback({"action": "undo", "step_index": 2})
ir = v.build_workflow_ir(session_id="s1", name="Test")
assert ir is not None
assert len(ir.steps) == 2 # 3 - 1 = 2
# =========================================================================
# merge_next
# =========================================================================
class TestValidatorMergeNext:
def test_merge_deux_etapes(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({"action": "merge_next", "step_index": 1})
assert result.ok is True
assert len(v.steps) == 2 # 3 - 1 = 2
def test_merge_conserve_les_events(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
total_events_before = sum(len(s.events) for s in v.steps)
v.apply_feedback({"action": "merge_next", "step_index": 2})
total_events_after = sum(len(s.events) for s in v.steps)
assert total_events_before == total_events_after
def test_merge_derniere_etape_echoue(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({"action": "merge_next", "step_index": 3})
assert result.ok is False
# =========================================================================
# split
# =========================================================================
class TestValidatorSplit:
def test_split_en_deux(self):
v = ShadowValidator()
multi_events_step = _make_step(
1, "Étape composite",
events=[
{"type": "mouse_click", "window": {"title": "App"},
"timestamp": 100.0, "vision_info": {}},
{"type": "text_input", "text": "partie 1",
"window": {"title": "App"}, "timestamp": 101.0},
{"type": "text_input", "text": "partie 2",
"window": {"title": "App"}, "timestamp": 102.0},
],
)
v.set_steps([multi_events_step])
result = v.apply_feedback({
"action": "split",
"step_index": 1,
"at_event_index": 2,
})
assert result.ok is True
assert len(v.steps) == 2
assert len(v.steps[0].events) == 2
assert len(v.steps[1].events) == 1
def test_split_index_invalide(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({
"action": "split",
"step_index": 1,
"at_event_index": 99,
})
assert result.ok is False
# =========================================================================
# cancel
# =========================================================================
class TestValidatorCancel:
def test_cancel_workflow(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({"action": "cancel"})
assert result.ok is True
assert v.is_cancelled
def test_cancel_build_retourne_none(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
v.apply_feedback({"action": "cancel"})
ir = v.build_workflow_ir(session_id="s1", name="Test")
assert ir is None
# =========================================================================
# Action inconnue
# =========================================================================
class TestValidatorUnknownAction:
def test_action_inconnue(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
result = v.apply_feedback({"action": "do_magic"})
assert result.ok is False
assert "inconnue" in result.message.lower()
# =========================================================================
# Construction du WorkflowIR
# =========================================================================
class TestValidatorBuild:
def test_build_workflow_ir(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
ir = v.build_workflow_ir(
session_id="sess_test",
name="Mon workflow",
domain="generic",
)
assert ir is not None
assert isinstance(ir, WorkflowIR)
assert ir.name == "Mon workflow"
assert ir.learned_from == "sess_test"
assert len(ir.steps) == 3
def test_build_with_variables(self):
"""Les textes saisis deviennent des variables dans le WorkflowIR."""
v = ShadowValidator()
v.set_steps([
_make_type_step("Jean Dupont", step_index=1),
_make_type_step("jean@example.com", app="Email", step_index=2),
])
ir = v.build_workflow_ir(session_id="s1", name="Test")
assert len(ir.variables) == 2
# Les actions de type type doivent référencer les variables
for step in ir.steps:
for action in step.actions:
if action.type == "type":
assert action.variable is True
assert action.text.startswith("{")
def test_build_respecte_corrections(self):
"""Les intentions corrigées se retrouvent dans le WorkflowIR."""
v = ShadowValidator()
v.set_steps(_make_3_steps())
v.apply_feedback({
"action": "correct",
"step_index": 1,
"new_intent": "Lancer l'application de prise de notes",
})
ir = v.build_workflow_ir(session_id="s1", name="Test")
assert ir.steps[0].intent == "Lancer l'application de prise de notes"
def test_build_exclut_etapes_annulees(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
v.apply_feedback({"action": "undo", "step_index": 2})
ir = v.build_workflow_ir(session_id="s1", name="Test")
assert len(ir.steps) == 2
def test_build_require_all_validated(self):
"""Avec require_all_validated, erreur si une étape n'est pas validée."""
v = ShadowValidator()
v.set_steps(_make_3_steps())
v.apply_feedback({"action": "validate", "step_index": 1})
# Étapes 2 et 3 pas validées
with pytest.raises(ValueError):
v.build_workflow_ir(
session_id="s1", name="Test", require_all_validated=True
)
def test_build_applications_detectees(self):
v = ShadowValidator()
v.set_steps([
_make_step(1, "Ouvrir Firefox", "Firefox"),
_make_step(2, "Écrire", "Bloc-notes"),
])
ir = v.build_workflow_ir(session_id="s1", name="Test")
assert "Firefox" in ir.applications
assert "Bloc-notes" in ir.applications
# =========================================================================
# Historique
# =========================================================================
class TestValidatorHistory:
def test_historique_feedbacks(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
v.apply_feedback({"action": "validate", "step_index": 1})
v.apply_feedback({
"action": "correct", "step_index": 2,
"new_intent": "Écrire le texte"
})
v.apply_feedback({"action": "undo", "step_index": 3})
history = v.history
assert len(history) == 3
assert history[0].action == "validate"
assert history[1].action == "correct"
assert history[2].action == "undo"
def test_apply_feedbacks_batch(self):
v = ShadowValidator()
v.set_steps(_make_3_steps())
results = v.apply_feedbacks([
{"action": "validate", "step_index": 1},
{"action": "validate", "step_index": 2},
{"action": "undo", "step_index": 3},
])
assert len(results) == 3
assert all(r.ok for r in results)
# =========================================================================
# Intégration ShadowObserver + ShadowValidator
# =========================================================================
class TestShadowObserverValidatorIntegration:
def test_observer_vers_validator(self):
"""Flow complet : Observer → Validator → WorkflowIR."""
obs = ShadowObserver()
obs.start("sess_flow")
# Simuler des événements
obs.observe_event("sess_flow", {
"type": "mouse_click",
"window": {"title": "Menu Démarrer", "app_name": "Menu Démarrer"},
"vision_info": {"text": "Rechercher"},
"timestamp": 100.0,
})
obs.observe_event("sess_flow", {
"type": "text_input",
"text": "blocnote",
"window": {"title": "Menu Démarrer", "app_name": "Menu Démarrer"},
"timestamp": 100.5,
})
obs.observe_event("sess_flow", {
"type": "key_combo",
"keys": ["enter"],
"window": {"title": "Menu Démarrer", "app_name": "Menu Démarrer"},
"timestamp": 101.0,
})
# Changement d'application
obs.observe_event("sess_flow", {
"type": "text_input",
"text": "Hello world",
"window": {"title": "Sans titre - Bloc-notes", "app_name": "Bloc-notes"},
"timestamp": 105.0,
})
obs.stop("sess_flow")
# Récupérer les étapes
internals = obs.get_steps_internal("sess_flow")
assert len(internals) >= 2
# Passer au validator
validator = ShadowValidator()
validator.set_steps(internals)
# Valider la première étape, corriger la seconde
validator.apply_feedback({"action": "validate", "step_index": 1})
validator.apply_feedback({
"action": "correct",
"step_index": 2,
"new_intent": "Écrire un texte de démonstration",
})
# Construire le WorkflowIR
ir = validator.build_workflow_ir(
session_id="sess_flow",
name="Flow de test",
domain="generic",
)
assert ir is not None
assert len(ir.steps) >= 2
assert ir.steps[1].intent == "Écrire un texte de démonstration"
assert len(ir.variables) >= 1 # Au moins "blocnote" ou "Hello world"
def test_undo_puis_build(self):
obs = ShadowObserver()
obs.start("sess_undo")
for i in range(3):
obs.observe_event("sess_undo", {
"type": "mouse_click",
"window": {"title": f"App{i}"},
"vision_info": {"text": "bouton"},
"timestamp": 100.0 + i * 6.0, # > 4s pour créer des segments
})
obs.stop("sess_undo")
validator = ShadowValidator()
validator.set_steps(obs.get_steps_internal("sess_undo"))
nb_before = len(validator.steps)
assert nb_before >= 2
validator.apply_feedback({"action": "undo", "step_index": 1})
ir = validator.build_workflow_ir(session_id="sess_undo", name="Test")
assert len(ir.steps) == nb_before - 1