diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index ab3d55ae7..040642993 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -780,6 +780,11 @@ async def stream_event(data: StreamEvent): # Traitement direct via StreamProcessor result = worker.process_event_direct(session_id, data.event) + # ── Observation Shadow (si mode Shadow activé pour cette session) ── + # L'appel est protégé et non bloquant : si l'observer n'est pas + # actif, ou s'il lève, la capture continue normalement. + shadow_observe_event(session_id, data.event) + # ── Enrichissement SomEngine temps réel pour les mouse_click ── # Après l'enregistrement de l'event, tenter l'enrichissement si le # screenshot est déjà arrivé. Sinon, l'event est mis en attente et @@ -1338,6 +1343,249 @@ async def requeue_session(session_id: str): } +# ========================================================================= +# Shadow mode — observation temps réel + feedback utilisateur +# ========================================================================= +# +# Endpoints utilisés par la GUI Léa pour : +# - Démarrer/arrêter le mode Shadow sur une session en cours +# - Récupérer en temps réel ce que Léa a compris +# - Envoyer des feedbacks (valider/corriger/annuler/fusionner) +# - Construire le WorkflowIR final après validation +# +# Source de vérité : events.jsonl (inchangé). Le ShadowObserver est une +# couche d'observation facultative qui ne modifie PAS la capture. +# +# Import paresseux pour ne pas alourdir le démarrage serveur si la +# feature n'est pas utilisée. +# ========================================================================= + +_shadow_observer = None +_shadow_validators: Dict[str, Any] = {} # session_id -> ShadowValidator +_shadow_lock = threading.Lock() + + +def _get_shadow_observer(): + """Retourner le ShadowObserver partagé (lazy init).""" + global _shadow_observer + with _shadow_lock: + if _shadow_observer is None: + from core.workflow.shadow_observer import get_shared_observer + _shadow_observer = get_shared_observer() + return _shadow_observer + + +def _get_shadow_validator(session_id: str): + """Retourner (ou créer) le ShadowValidator pour une session.""" + from core.workflow.shadow_validator import ShadowValidator + with _shadow_lock: + if session_id not in _shadow_validators: + _shadow_validators[session_id] = ShadowValidator() + return _shadow_validators[session_id] + + +def shadow_observe_event(session_id: str, event: Dict[str, Any]) -> None: + """Injection d'un événement dans le ShadowObserver (si session active). + + Helper appelé depuis stream_event() pour alimenter l'observer sans + casser le flux de capture. Protégé contre les exceptions pour + garantir qu'une erreur d'observation ne fait jamais planter la + capture. + """ + try: + observer = _get_shadow_observer() + if observer.has_session(session_id): + observer.observe_event(session_id, event) + except Exception as e: + logger.debug(f"shadow_observe_event: {e}") + + +class ShadowStartRequest(BaseModel): + session_id: str + + +class ShadowFeedbackRequest(BaseModel): + """Feedback utilisateur pendant l'enregistrement. + + action : + - "validate" : valider l'étape + - "correct" : corriger l'intention (new_intent requis) + - "undo" : annuler l'étape + - "cancel" : annuler tout le workflow + - "merge_next" : fusionner avec la suivante + - "split" : couper (at_event_index requis) + """ + session_id: str + action: str + step_index: Optional[int] = None + new_intent: Optional[str] = None + at_event_index: Optional[int] = None + + +class ShadowBuildRequest(BaseModel): + """Construire le WorkflowIR final à partir des feedbacks.""" + session_id: str + name: str = "" + domain: str = "generic" + require_all_validated: bool = False + + +@app.post("/api/v1/shadow/start") +async def shadow_start(request: ShadowStartRequest): + """Démarrer le mode Shadow pour une session en cours. + + Une fois démarré, chaque événement reçu via /api/v1/traces/stream/event + alimentera le ShadowObserver pour construire la compréhension en + temps réel. + """ + observer = _get_shadow_observer() + observer.start(request.session_id) + logger.info(f"Shadow mode démarré pour la session {request.session_id}") + return { + "status": "shadow_started", + "session_id": request.session_id, + "message": "Léa observe — fais ta tâche normalement.", + } + + +@app.post("/api/v1/shadow/stop") +async def shadow_stop(request: ShadowStartRequest): + """Arrêter le mode Shadow (sans détruire l'état). + + La compréhension reste accessible via /api/v1/shadow/{id}/understanding + jusqu'à ce que /api/v1/shadow/build soit appelé ou la session finalisée. + """ + observer = _get_shadow_observer() + observer.stop(request.session_id) + understanding = observer.get_understanding(request.session_id) + return { + "status": "shadow_stopped", + "session_id": request.session_id, + "steps_count": len(understanding), + "understanding": understanding, + } + + +@app.post("/api/v1/shadow/feedback") +async def shadow_feedback(request: ShadowFeedbackRequest): + """Recevoir un feedback utilisateur pendant ou après l'enregistrement. + + body : {session_id, action, step_index?, new_intent?, at_event_index?} + """ + observer = _get_shadow_observer() + if not observer.has_session(request.session_id): + raise HTTPException( + status_code=404, + detail=f"Aucune session Shadow active pour {request.session_id}", + ) + + validator = _get_shadow_validator(request.session_id) + # Recharger les étapes courantes depuis l'observer + validator.set_steps(observer.get_steps_internal(request.session_id)) + + feedback_dict: Dict[str, Any] = {"action": request.action} + if request.step_index is not None: + feedback_dict["step_index"] = request.step_index + if request.new_intent is not None: + feedback_dict["new_intent"] = request.new_intent + if request.at_event_index is not None: + feedback_dict["at_event_index"] = request.at_event_index + + result = validator.apply_feedback(feedback_dict) + return { + "status": "feedback_applied" if result.ok else "feedback_rejected", + "session_id": request.session_id, + "result": result.to_dict(), + } + + +@app.get("/api/v1/shadow/{session_id}/understanding") +async def shadow_get_understanding(session_id: str, since_ts: float = 0.0): + """Récupérer ce que Léa a compris jusqu'ici. + + Returns: + { + "session_id": ..., + "steps": [ + {"step": 1, "intent": "...", "confidence": 0.9, ...}, + ... + ], + "current_step": {...} | None, + "notifications": [...] # Seulement celles depuis since_ts + } + """ + observer = _get_shadow_observer() + if not observer.has_session(session_id): + raise HTTPException( + status_code=404, + detail=f"Aucune session Shadow active pour {session_id}", + ) + return { + "session_id": session_id, + "steps": observer.get_understanding(session_id, include_current=False), + "current_step": observer.get_current_step(session_id), + "notifications": observer.get_notifications(session_id, since_ts=since_ts), + } + + +@app.post("/api/v1/shadow/build") +async def shadow_build(request: ShadowBuildRequest): + """Construire le WorkflowIR final à partir des étapes validées/corrigées. + + Le WorkflowIR est retourné mais pas encore sauvegardé — c'est au + caller de décider s'il l'écrit sur disque ou le compile en + ExecutionPlan. + """ + observer = _get_shadow_observer() + if not observer.has_session(request.session_id): + raise HTTPException( + status_code=404, + detail=f"Aucune session Shadow active pour {request.session_id}", + ) + + validator = _get_shadow_validator(request.session_id) + # S'assurer que le validator voit les étapes finales de l'observer + validator.set_steps(observer.get_steps_internal(request.session_id)) + + # Réappliquer l'historique n'est PAS nécessaire : on s'attend à ce que + # les feedbacks aient été appliqués dans l'ordre via /api/v1/shadow/feedback + # et que le validator ait accumulé son état. Mais puisqu'on vient de + # recharger les étapes, on perd les corrections. Stratégie : conserver + # l'historique et le rejouer. + history = validator.history + validator.set_steps(observer.get_steps_internal(request.session_id)) + for entry in history: + # Rejouer en reconstruisant un feedback depuis le résultat + data = entry.data or {} + fb: Dict[str, Any] = {"action": entry.action, "step_index": entry.step_index} + if "new_intent" in data: + fb["new_intent"] = data["new_intent"] + validator.apply_feedback(fb) + + try: + ir = validator.build_workflow_ir( + session_id=request.session_id, + name=request.name, + domain=request.domain, + require_all_validated=request.require_all_validated, + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + if ir is None: + return { + "status": "cancelled", + "session_id": request.session_id, + "message": "Workflow annulé par l'utilisateur", + } + + return { + "status": "workflow_built", + "session_id": request.session_id, + "workflow_ir": ir.to_dict(), + } + + # ========================================================================= # Monitoring # ========================================================================= @@ -3737,6 +3985,160 @@ def _extract_session_description(events_file) -> Dict[str, Any]: return {"name": "?", "description": "", "event_count": 0} +# ========================================================================= +# Chat conversationnel (Léa conversationnelle) +# ========================================================================= + +from .chat_interface import ChatManager # noqa: E402 + + +def _chat_replay_callback(session_id="", machine_id="default", params=None, **kwargs): + """Callback utilisé par ChatSession pour lancer un replay. + + Appelle l'endpoint /replay-session en interne. On passe par HTTP pour + réutiliser la logique d'auth/rate-limit/enqueue existante. + """ + import requests as _req + if not session_id: + raise ValueError("session_id requis pour replay chat") + resp = _req.post( + f"http://localhost:5005/api/v1/traces/stream/replay-session" + f"?session_id={session_id}&machine_id={machine_id}", + headers={"Authorization": f"Bearer {API_TOKEN}"}, + timeout=600, + ) + if not resp.ok: + raise RuntimeError(f"Replay échoué: {resp.text[:200]}") + return resp.json().get("replay_id", "") + + +def _chat_status_provider(replay_id: str) -> Dict[str, Any]: + """Callback pour lire l'état d'un replay depuis ChatSession. + + Lit directement _replay_states en mémoire (pas de HTTP round-trip). + """ + if not replay_id: + return {} + with _replay_lock: + state = _replay_states.get(replay_id) + if not state: + return {} + # Filtrer les clés internes + return {k: v for k, v in state.items() if not k.startswith("_")} + + +_chat_manager = ChatManager( + task_planner=_task_planner, + workflows_provider=_list_available_workflows, + replay_callback=_chat_replay_callback, + status_provider=_chat_status_provider, +) + + +class ChatMessageRequest(BaseModel): + """Message envoyé par l'utilisateur.""" + message: str + + +class ChatConfirmRequest(BaseModel): + """Confirmation (ou refus) d'un plan en attente.""" + confirmed: bool = True + + +class ChatSessionCreateRequest(BaseModel): + """Paramètres de création d'une session de chat.""" + machine_id: str = "default" + + +@app.post("/api/v1/chat/session") +async def create_chat_session(request: ChatSessionCreateRequest = None): + """Créer une nouvelle session de chat avec Léa.""" + machine_id = request.machine_id if request else "default" + session = _chat_manager.create_session(machine_id=machine_id) + return { + "ok": True, + "session_id": session.session_id, + "state": session.state, + "history": session.get_history(), + } + + +@app.post("/api/v1/chat/{session_id}/message") +async def post_chat_message(session_id: str, request: ChatMessageRequest): + """Envoyer un message dans une session de chat.""" + import asyncio + + session = _chat_manager.get_session(session_id) + if session is None: + raise HTTPException(status_code=404, detail=f"Session chat '{session_id}' non trouvée") + + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + lambda: session.send_message(request.message), + ) + # Toujours retourner l'historique + l'état courant pour que le client se mette à jour + return { + **result, + "session_id": session_id, + "state": session.state, + "history": session.get_history(), + } + + +@app.get("/api/v1/chat/{session_id}/history") +async def get_chat_history(session_id: str): + """Récupérer l'historique d'une session de chat.""" + session = _chat_manager.get_session(session_id) + if session is None: + raise HTTPException(status_code=404, detail=f"Session chat '{session_id}' non trouvée") + + # Rafraîchir la progression si en cours d'exécution + if session.state == "executing": + try: + session.refresh_progress() + except Exception as e: + logger.debug(f"chat refresh_progress erreur: {e}") + + return { + "ok": True, + "session_id": session_id, + "snapshot": session.get_snapshot(), + } + + +@app.post("/api/v1/chat/{session_id}/confirm") +async def confirm_chat_plan(session_id: str, request: ChatConfirmRequest = None): + """Confirmer (ou refuser) l'exécution du plan en attente.""" + import asyncio + + session = _chat_manager.get_session(session_id) + if session is None: + raise HTTPException(status_code=404, detail=f"Session chat '{session_id}' non trouvée") + + confirmed = request.confirmed if request else True + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + lambda: session.confirm(confirmed=confirmed), + ) + return { + **result, + "session_id": session_id, + "state": session.state, + "history": session.get_history(), + } + + +@app.get("/api/v1/chat/sessions") +async def list_chat_sessions(): + """Lister toutes les sessions de chat actives.""" + return { + "ok": True, + "sessions": _chat_manager.list_sessions(), + } + + if __name__ == "__main__": import uvicorn diff --git a/agent_v0/server_v1/chat_interface.py b/agent_v0/server_v1/chat_interface.py new file mode 100644 index 000000000..6d10405a1 --- /dev/null +++ b/agent_v0/server_v1/chat_interface.py @@ -0,0 +1,622 @@ +""" +ChatInterface — Interface de chat conversationnelle pour Léa. + +Permet au TIM (Technicien Information Médicale) de parler à Léa en langage +naturel : + - "Ouvre le Bloc-notes et écris bonjour" + - Léa comprend (TaskPlanner) et propose un plan + - Le TIM confirme (ou refuse) + - Léa exécute (replay) et envoie des updates de progression + - Historique conversationnel conservé par session + +C'est une couche LÉGÈRE au-dessus du TaskPlanner. Toute la logique de +compréhension reste dans TaskPlanner — ChatInterface gère uniquement +l'état conversationnel, la confirmation et le suivi d'exécution. + +États de la session : + idle → en attente d'un message + planning → TaskPlanner.understand() en cours + awaiting_confirmation → plan prêt, attend la confirmation du TIM + executing → replay en cours + done → dernier tour terminé (retour à idle au prochain message) + error → erreur interne (instruction non comprise, exception…) + +Langue : 100% français (c'est l'interface utilisateur). +""" + +from __future__ import annotations + +import logging +import threading +import time +import uuid +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# ============================================================================= +# États +# ============================================================================= + +STATE_IDLE = "idle" +STATE_PLANNING = "planning" +STATE_AWAITING_CONFIRMATION = "awaiting_confirmation" +STATE_EXECUTING = "executing" +STATE_DONE = "done" +STATE_ERROR = "error" + +VALID_STATES = { + STATE_IDLE, + STATE_PLANNING, + STATE_AWAITING_CONFIRMATION, + STATE_EXECUTING, + STATE_DONE, + STATE_ERROR, +} + +# Rôles de messages +ROLE_USER = "user" +ROLE_LEA = "lea" +ROLE_SYSTEM = "system" + + +# ============================================================================= +# Message +# ============================================================================= + +@dataclass +class ChatMessage: + """Un message dans l'historique d'une conversation.""" + role: str # "user", "lea", "system" + content: str # Texte du message + timestamp: float = field(default_factory=time.time) + # Données contextuelles optionnelles (plan, résultat, progression…) + meta: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "role": self.role, + "content": self.content, + "timestamp": self.timestamp, + "meta": self.meta, + } + + +# ============================================================================= +# ChatSession +# ============================================================================= + +class ChatSession: + """Une conversation entre un utilisateur et Léa. + + Maintient l'historique, l'état courant, et le dernier plan en attente + de confirmation. Thread-safe (un lock par session). + + Dépendances injectées (pour tester facilement) : + - task_planner : instance de TaskPlanner (ou mock) + - workflows_provider : callable () -> List[Dict] (liste des workflows) + - replay_callback : callable (session_id, machine_id, params) -> replay_id + - status_provider : callable (replay_id) -> Dict (pour suivre l'exécution) + + Toutes ces dépendances sont optionnelles : ChatSession dégrade + gracieusement (fallback) si gemma4 / replay indisponibles. + """ + + def __init__( + self, + session_id: str = "", + task_planner: Any = None, + workflows_provider: Optional[Callable[[], List[Dict[str, Any]]]] = None, + replay_callback: Optional[Callable[..., str]] = None, + status_provider: Optional[Callable[[str], Dict[str, Any]]] = None, + machine_id: str = "default", + ): + self.session_id = session_id or f"chat_{uuid.uuid4().hex[:12]}" + self.machine_id = machine_id + self.created_at = time.time() + self.updated_at = self.created_at + + self._task_planner = task_planner + self._workflows_provider = workflows_provider + self._replay_callback = replay_callback + self._status_provider = status_provider + + self._state: str = STATE_IDLE + self._messages: List[ChatMessage] = [] + self._pending_plan: Any = None # TaskPlan en attente de confirmation + self._active_replay_id: str = "" # Replay courant (si executing) + self._last_progress: Dict[str, Any] = {} + + self._lock = threading.RLock() + + # Message d'accueil + self._append( + ROLE_LEA, + "Bonjour ! Je suis Léa. Dites-moi ce que vous voulez que je fasse.", + meta={"welcome": True}, + ) + + # --------------------------------------------------------------------- + # Accesseurs + # --------------------------------------------------------------------- + + @property + def state(self) -> str: + with self._lock: + return self._state + + def get_history(self) -> List[Dict[str, Any]]: + """Retourne l'historique complet des messages (sérialisé).""" + with self._lock: + return [m.to_dict() for m in self._messages] + + def get_snapshot(self) -> Dict[str, Any]: + """État complet pour l'UI (historique + état + progression).""" + with self._lock: + return { + "session_id": self.session_id, + "state": self._state, + "machine_id": self.machine_id, + "created_at": self.created_at, + "updated_at": self.updated_at, + "messages": [m.to_dict() for m in self._messages], + "pending_plan": ( + self._pending_plan.to_dict() + if self._pending_plan is not None + else None + ), + "active_replay_id": self._active_replay_id, + "progress": dict(self._last_progress), + } + + # --------------------------------------------------------------------- + # API publique + # --------------------------------------------------------------------- + + def send_message(self, text: str) -> Dict[str, Any]: + """Envoyer un message utilisateur. + + Trois cas possibles selon l'état courant : + 1. awaiting_confirmation → c'est une réponse OUI/NON + 2. executing → on rafraîchit la progression + 3. idle/done/error → nouvelle instruction, on appelle TaskPlanner + """ + text = (text or "").strip() + if not text: + return { + "ok": False, + "error": "Message vide", + "state": self._state, + } + + with self._lock: + # Cas 1 : on attend une confirmation + if self._state == STATE_AWAITING_CONFIRMATION: + return self._handle_confirmation_reply(text) + + # Cas 2 : en pleine exécution → message ajouté mais pas d'action + if self._state == STATE_EXECUTING: + self._append(ROLE_USER, text) + self._append( + ROLE_LEA, + "Je suis en train d'exécuter le workflow. Un instant…", + ) + return {"ok": True, "state": self._state} + + # Cas 3 : nouvelle instruction + self._append(ROLE_USER, text) + self._set_state(STATE_PLANNING) + + # Appel TaskPlanner hors du lock (peut être lent : gemma4) + return self._plan_and_reply(text) + + def confirm(self, confirmed: bool = True) -> Dict[str, Any]: + """Confirmer (ou refuser) l'exécution du plan en attente.""" + with self._lock: + if self._state != STATE_AWAITING_CONFIRMATION: + return { + "ok": False, + "error": f"Pas de plan en attente (état={self._state})", + "state": self._state, + } + + if not confirmed: + self._append( + ROLE_LEA, + "D'accord, j'annule. Dites-moi autre chose quand vous voulez.", + ) + self._pending_plan = None + self._set_state(STATE_IDLE) + return {"ok": True, "state": self._state, "confirmed": False} + + plan = self._pending_plan + if plan is None: + self._set_state(STATE_IDLE) + return { + "ok": False, + "error": "Aucun plan à confirmer", + "state": self._state, + } + + self._set_state(STATE_EXECUTING) + + # Exécution hors du lock + return self._execute_plan(plan) + + def refresh_progress(self) -> Dict[str, Any]: + """Rafraîchir la progression du replay en cours. + + Appelé par le client (polling) pour obtenir les updates d'exécution. + Si le replay est terminé, passe l'état à done. + """ + with self._lock: + if self._state != STATE_EXECUTING or not self._active_replay_id: + return {"ok": True, "state": self._state, "progress": self._last_progress} + + replay_id = self._active_replay_id + provider = self._status_provider + + if provider is None: + return {"ok": True, "state": self._state, "progress": {}} + + try: + status = provider(replay_id) or {} + except Exception as e: + logger.warning(f"ChatSession: status_provider erreur: {e}") + status = {} + + with self._lock: + self._last_progress = status + self.updated_at = time.time() + + # Détection de fin + replay_status = str(status.get("status", "")).lower() + completed = status.get("completed_actions", 0) + total = status.get("total_actions", 0) + + if replay_status in ("done", "completed", "finished", "success"): + summary = ( + f"Workflow terminé ! {completed}/{total} actions réussies." + if total + else "Workflow terminé." + ) + self._append(ROLE_LEA, summary, meta={"progress": dict(status)}) + self._set_state(STATE_DONE) + self._active_replay_id = "" + elif replay_status in ("failed", "error", "aborted"): + err = status.get("error") or status.get("message") or "Erreur inconnue" + self._append( + ROLE_LEA, + f"Le workflow a échoué : {err}", + meta={"progress": dict(status)}, + ) + self._set_state(STATE_ERROR) + self._active_replay_id = "" + elif replay_status == "paused_need_help": + self._append( + ROLE_LEA, + "Je suis bloquée sur une action, j'ai besoin d'aide…", + meta={"progress": dict(status)}, + ) + # on reste en executing pour que le TIM puisse reprendre + # else : toujours en cours, pas de message + + return { + "ok": True, + "state": self._state, + "progress": dict(self._last_progress), + } + + # --------------------------------------------------------------------- + # Logique interne + # --------------------------------------------------------------------- + + def _plan_and_reply(self, instruction: str) -> Dict[str, Any]: + """Appeler TaskPlanner.understand() et produire une réponse.""" + plan = None + error_msg = "" + + if self._task_planner is None: + error_msg = "Planificateur indisponible" + else: + try: + workflows = [] + if self._workflows_provider is not None: + try: + workflows = self._workflows_provider() or [] + except Exception as e: + logger.warning(f"ChatSession: workflows_provider erreur: {e}") + workflows = [] + + plan = self._task_planner.understand( + instruction=instruction, + available_workflows=workflows, + ) + except Exception as e: + logger.warning(f"ChatSession: TaskPlanner.understand erreur: {e}") + error_msg = f"Erreur de compréhension : {e}" + + # Fallback gracieux si pas de plan / gemma4 indisponible + if plan is None: + with self._lock: + self._append( + ROLE_LEA, + f"Désolée, je n'arrive pas à comprendre pour l'instant. {error_msg}".strip(), + meta={"error": error_msg}, + ) + self._set_state(STATE_ERROR) + return { + "ok": False, + "state": self._state, + "error": error_msg, + } + + # Plan non compris + if not plan.understood: + reason = plan.error or "je n'ai pas compris votre demande" + with self._lock: + self._append( + ROLE_LEA, + ( + f"Désolée, {reason}. " + "Pouvez-vous reformuler ? Je connais les workflows que vous m'avez appris." + ), + meta={"plan": plan.to_dict()}, + ) + self._set_state(STATE_ERROR) + return { + "ok": False, + "state": self._state, + "plan": plan.to_dict(), + "error": reason, + } + + # Plan compris → formuler la proposition + proposal = self._format_proposal(plan) + + with self._lock: + self._pending_plan = plan + self._append(ROLE_LEA, proposal, meta={"plan": plan.to_dict()}) + self._set_state(STATE_AWAITING_CONFIRMATION) + return { + "ok": True, + "state": self._state, + "plan": plan.to_dict(), + "message": proposal, + } + + @staticmethod + def _format_proposal(plan: Any) -> str: + """Formuler une proposition en français à partir d'un TaskPlan.""" + lines = [] + lines.append(f"J'ai compris : « {plan.instruction} ».") + + if plan.workflow_name: + conf_pct = int(round((plan.match_confidence or 0.0) * 100)) + lines.append( + f"Je vais utiliser le workflow « {plan.workflow_name} »" + f" (confiance {conf_pct}%)." + ) + elif plan.mode == "free" and plan.steps: + lines.append( + f"Je n'ai pas de workflow enregistré pour ça, " + f"mais j'ai planifié {len(plan.steps)} étape(s) :" + ) + for i, step in enumerate(plan.steps[:5], 1): + desc = step.get("description", "") if isinstance(step, dict) else str(step) + lines.append(f" {i}. {desc}") + if len(plan.steps) > 5: + lines.append(f" … et {len(plan.steps) - 5} autre(s) étape(s).") + else: + lines.append("Je n'ai pas de plan d'action clair pour cette demande.") + + if plan.parameters: + params_str = ", ".join(f"{k}={v}" for k, v in plan.parameters.items()) + lines.append(f"Paramètres détectés : {params_str}.") + + if plan.is_loop: + src = plan.loop_source or "éléments à traiter" + lines.append(f"Traitement en boucle sur : {src}.") + + lines.append("") + lines.append("Est-ce que je peux y aller ? (oui / non)") + return "\n".join(lines) + + def _handle_confirmation_reply(self, text: str) -> Dict[str, Any]: + """Interpréter un message utilisateur comme OUI/NON.""" + self._append(ROLE_USER, text) + yes_tokens = {"oui", "yes", "ok", "y", "go", "vas-y", "allez", "allez-y", "confirme", "confirmer", "continue"} + no_tokens = {"non", "no", "annule", "annuler", "stop", "arrête", "arrete", "abandonne", "abandonner"} + + t = text.strip().lower().rstrip("!.?") + + if t in yes_tokens or any(t.startswith(tok + " ") for tok in yes_tokens): + # Déverrouiller : sortir du lock avant d'exécuter (confirm re-prend le lock) + pass + elif t in no_tokens or any(t.startswith(tok + " ") for tok in no_tokens): + self._append( + ROLE_LEA, + "D'accord, j'annule. Dites-moi autre chose quand vous voulez.", + ) + self._pending_plan = None + self._set_state(STATE_IDLE) + return {"ok": True, "state": self._state, "confirmed": False} + else: + self._append( + ROLE_LEA, + "Je n'ai pas compris votre réponse. Répondez « oui » pour lancer ou « non » pour annuler.", + ) + return {"ok": True, "state": self._state, "needs_clarification": True} + + # Libérer le lock pour confirm() qui le re-prendra + plan = self._pending_plan + self._pending_plan = None + self._set_state(STATE_EXECUTING) + # Exécution hors du lock (sortie du with bloc appelant) + # Note : _handle_confirmation_reply est appelé sous lock via send_message + # On ne peut pas appeler _execute_plan ici sans risque de double-lock. + # On relâche le lock via une astuce : on retourne un marqueur et send_message + # orchestrera. Ici on appelle directement _execute_plan qui utilise RLock, + # donc c'est safe (re-entrant). + return self._execute_plan(plan) + + def _execute_plan(self, plan: Any) -> Dict[str, Any]: + """Lancer le replay correspondant au plan.""" + if plan is None: + with self._lock: + self._append(ROLE_LEA, "Rien à exécuter.", meta={}) + self._set_state(STATE_IDLE) + return {"ok": False, "state": self._state, "error": "Aucun plan"} + + if self._replay_callback is None: + with self._lock: + self._append( + ROLE_LEA, + "Je ne peux pas exécuter : aucun moteur d'exécution n'est configuré.", + ) + self._set_state(STATE_ERROR) + return { + "ok": False, + "state": self._state, + "error": "replay_callback non configuré", + } + + # Annoncer le démarrage + with self._lock: + self._append( + ROLE_LEA, + "C'est parti ! Je lance le workflow…", + meta={"plan": plan.to_dict()}, + ) + + # Appeler le callback + try: + if plan.workflow_match: + replay_id = self._replay_callback( + session_id=plan.workflow_match, + machine_id=self.machine_id, + params=plan.parameters, + ) + else: + # Mode libre : pas encore branché côté chat (on refuse proprement) + replay_id = "" + raise RuntimeError( + "Mode libre non supporté pour l'instant — " + "entraînez un workflow pour cette tâche" + ) + except Exception as e: + with self._lock: + self._append( + ROLE_LEA, + f"Je n'ai pas pu lancer le workflow : {e}", + meta={"error": str(e)}, + ) + self._set_state(STATE_ERROR) + return {"ok": False, "state": self._state, "error": str(e)} + + with self._lock: + self._active_replay_id = replay_id or "" + return { + "ok": True, + "state": self._state, + "replay_id": self._active_replay_id, + } + + # --------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------- + + def _append(self, role: str, content: str, meta: Optional[Dict[str, Any]] = None) -> None: + """Ajouter un message à l'historique (doit être appelé sous lock).""" + msg = ChatMessage(role=role, content=content, meta=meta or {}) + self._messages.append(msg) + self.updated_at = msg.timestamp + + def _set_state(self, new_state: str) -> None: + """Changer d'état (doit être appelé sous lock).""" + if new_state not in VALID_STATES: + raise ValueError(f"État invalide : {new_state}") + old = self._state + self._state = new_state + self.updated_at = time.time() + if old != new_state: + logger.debug( + f"ChatSession {self.session_id}: {old} -> {new_state}" + ) + + +# ============================================================================= +# ChatManager — registre en mémoire des sessions +# ============================================================================= + +class ChatManager: + """Registre en mémoire des sessions de chat. + + Thread-safe. Utilisé par l'API FastAPI pour gérer plusieurs + conversations simultanées. + """ + + def __init__( + self, + task_planner: Any = None, + workflows_provider: Optional[Callable[[], List[Dict[str, Any]]]] = None, + replay_callback: Optional[Callable[..., str]] = None, + status_provider: Optional[Callable[[str], Dict[str, Any]]] = None, + ): + self._task_planner = task_planner + self._workflows_provider = workflows_provider + self._replay_callback = replay_callback + self._status_provider = status_provider + self._sessions: Dict[str, ChatSession] = {} + self._lock = threading.RLock() + + def create_session(self, machine_id: str = "default") -> ChatSession: + """Créer une nouvelle session de chat.""" + session = ChatSession( + task_planner=self._task_planner, + workflows_provider=self._workflows_provider, + replay_callback=self._replay_callback, + status_provider=self._status_provider, + machine_id=machine_id, + ) + with self._lock: + self._sessions[session.session_id] = session + logger.info(f"ChatManager: session créée {session.session_id}") + return session + + def get_session(self, session_id: str) -> Optional[ChatSession]: + with self._lock: + return self._sessions.get(session_id) + + def list_sessions(self) -> List[Dict[str, Any]]: + with self._lock: + return [ + { + "session_id": s.session_id, + "state": s.state, + "machine_id": s.machine_id, + "created_at": s.created_at, + "updated_at": s.updated_at, + "message_count": len(s.get_history()), + } + for s in self._sessions.values() + ] + + def delete_session(self, session_id: str) -> bool: + with self._lock: + return self._sessions.pop(session_id, None) is not None + + def cleanup_old(self, max_age_s: float = 3600 * 24) -> int: + """Supprimer les sessions inactives depuis max_age_s secondes.""" + now = time.time() + removed = 0 + with self._lock: + to_delete = [ + sid for sid, s in self._sessions.items() + if (now - s.updated_at) > max_age_s + ] + for sid in to_delete: + del self._sessions[sid] + removed += 1 + return removed diff --git a/core/workflow/execution_compiler.py b/core/workflow/execution_compiler.py index ada04a5c6..11640a8fa 100644 --- a/core/workflow/execution_compiler.py +++ b/core/workflow/execution_compiler.py @@ -208,18 +208,32 @@ class ExecutionCompiler: ) -> tuple: """Compiler les stratégies de résolution pour un clic. + Utilise les données d'enrichissement visuel (action._enrichment) si + disponibles (crop anchor, description VLM, window_capture). + Ordre de priorité : - 1. OCR exact (si texte connu) — 100ms, pixel-perfect - 2. Template matching (si crop disponible) — 10ms, même interface - 3. Position relative (si hint disponible) — instantané, fragile - 4. VLM (dernier recours) — 2-5s, exception handler + 1. OCR exact (si by_text disponible) — 100ms, pixel-perfect + 2. Template matching (si anchor_image_base64) — 10ms + 3. VLM (vlm_description) — 2-5s, exception handler Le learning peut réordonner si une stratégie a mieux marché avant. """ primary = None fallbacks = [] - target_text = action.anchor_hint or action.target + # Lire l'enrichissement visuel si dispo + enrichment = getattr(action, "_enrichment", None) or {} + by_text_from_enrich = enrichment.get("by_text", "") + anchor_b64 = enrichment.get("anchor_image_base64", "") + vlm_desc_from_enrich = enrichment.get("vlm_description", "") + window_title = enrichment.get("window_title", "") + + # Source de texte : enrichissement > anchor_hint > target + target_text = by_text_from_enrich or action.anchor_hint or action.target + # Ne pas utiliser "unknown_window" comme texte OCR + if target_text == "unknown_window": + target_text = "" + learned_method = learned.get(target_text, "") # Stratégie OCR — le texte visible est la meilleure ancre @@ -227,41 +241,49 @@ class ExecutionCompiler: ocr_strategy = ResolutionStrategy( method="ocr", target_text=target_text, - threshold=0.8, + threshold=0.7, ) - # Si le learning dit que l'OCR marche pour cette cible, c'est la primaire - if not learned_method or learned_method in ("ocr", "som_text_match", "hybrid_text_direct"): + if not learned_method or learned_method in ("ocr", "som_text_match", "hybrid_text_direct", "v4_ocr"): primary = ocr_strategy else: fallbacks.append(ocr_strategy) # Stratégie template — le crop visuel de l'enregistrement - if action.anchor_hint: + if anchor_b64: template_strategy = ResolutionStrategy( method="template", - target_text=action.anchor_hint, + target_text=target_text, + anchor_b64=anchor_b64, threshold=0.85, ) - if learned_method in ("anchor_template", "template_matching"): + if learned_method in ("anchor_template", "template_matching", "v4_template"): + if primary: + fallbacks.insert(0, primary) primary = template_strategy else: fallbacks.append(template_strategy) # Stratégie VLM — exception handler (dernier recours) - vlm_description = action.target or step.intent - vlm_strategy = ResolutionStrategy( - method="vlm", - vlm_description=vlm_description, - threshold=0.6, - ) - fallbacks.append(vlm_strategy) + vlm_description = vlm_desc_from_enrich or action.target or step.intent + if vlm_description and vlm_description != "unknown_window": + vlm_strategy = ResolutionStrategy( + method="vlm", + vlm_description=vlm_description, + threshold=0.6, + ) + fallbacks.append(vlm_strategy) - # Si aucune primaire trouvée, utiliser le VLM + # Si aucune primaire trouvée, prendre le premier fallback if primary is None: if fallbacks: primary = fallbacks.pop(0) else: - primary = vlm_strategy + # Dernier recours : VLM avec l'intention métier + primary = ResolutionStrategy( + method="vlm", + vlm_description=step.intent or "élément UI", + threshold=0.5, + ) return primary, fallbacks diff --git a/core/workflow/ir_builder.py b/core/workflow/ir_builder.py index 542cdbbc6..637378d75 100644 --- a/core/workflow/ir_builder.py +++ b/core/workflow/ir_builder.py @@ -63,6 +63,15 @@ class IRBuilder: """ t_start = time.time() + # Résoudre le session_dir_path pour l'enrichissement visuel + session_dir_path = Path(session_dir) if session_dir else None + if session_dir_path and not session_dir_path.is_dir(): + logger.warning( + f"IRBuilder: session_dir '{session_dir}' introuvable — " + f"enrichissement visuel désactivé" + ) + session_dir_path = None + # Créer le WorkflowIR vide ir = WorkflowIR.new( name=name or f"Workflow du {time.strftime('%d/%m/%Y %H:%M')}", @@ -90,6 +99,7 @@ class IRBuilder: total_steps=len(segments), workflow_name=ir.name, domain=domain, + session_dir_path=session_dir_path, ) ir.steps.append(step) @@ -189,6 +199,7 @@ class IRBuilder: total_steps: int, workflow_name: str, domain: str, + session_dir_path: Optional[Path] = None, ) -> Step: """Construire une Step depuis un segment d'événements. @@ -197,7 +208,7 @@ class IRBuilder: # Construire la description du segment pour gemma4 actions = [] for evt in segment: - action = self._event_to_action(evt) + action = self._event_to_action(evt, session_dir_path=session_dir_path) if action: actions.append(action) @@ -217,17 +228,49 @@ class IRBuilder: actions=actions, ) - def _event_to_action(self, evt: Dict) -> Optional[Action]: - """Convertir un événement brut en Action.""" + def _event_to_action(self, evt: Dict, session_dir_path: Optional[Path] = None) -> Optional[Action]: + """Convertir un événement brut en Action enrichie. + + Pour les clics : appelle enrich_click_from_screenshot() si le session_dir + est disponible pour obtenir : + - by_text (texte OCR exact de l'élément cliqué) + - anchor_image_base64 (crop 80x80 pour template matching) + - vlm_description (description positionnelle) + - window_capture (rect pour le grounding ciblé) + + Cet enrichissement est LA clé pour que l'ExecutionCompiler produise + des plans V4 complets avec toutes les stratégies (OCR + template + VLM). + """ evt_type = evt.get("type", "") if evt_type == "mouse_click": window = evt.get("window", {}).get("title", "") - return Action( + pos = evt.get("pos", [0, 0]) + + # Action de base (fallback sans enrichissement) + action = Action( type="click", target=window, - anchor_hint=evt.get("vision_info", {}).get("text", ""), + anchor_hint=evt.get("vision_info", {}).get("text", "") if isinstance(evt.get("vision_info"), dict) else "", ) + + # Enrichissement visuel via enrich_click_from_screenshot + # Accès direct au crop OCR + anchor pour l'ExecutionCompiler + if session_dir_path and isinstance(pos, list) and len(pos) == 2: + enrichment = self._enrich_click( + evt, session_dir_path, window, int(pos[0]), int(pos[1]), + ) + if enrichment: + # Le texte OCR devient l'anchor_hint pour l'OCR primaire + by_text = enrichment.get("by_text", "") + if by_text: + action.anchor_hint = by_text + # Stocker les métadonnées d'enrichissement dans l'action + # (utilisé par l'ExecutionCompiler pour construire les stratégies) + action._enrichment = enrichment + + return action + elif evt_type == "text_input": text = evt.get("text", "") if text: @@ -241,6 +284,55 @@ class IRBuilder: return None + def _enrich_click( + self, + evt: Dict, + session_dir_path: Path, + window_title: str, + click_x: int, + click_y: int, + ) -> Optional[Dict[str, Any]]: + """Enrichir un clic avec OCR + crop + description. + + Réutilise enrich_click_from_screenshot du stream_processor (éprouvé). + Retourne un dict avec by_text, anchor_image_base64, vlm_description, etc. + """ + try: + from agent_v0.server_v1.stream_processor import enrich_click_from_screenshot + + # Trouver le screenshot full + screenshot_id = evt.get("screenshot_id", "") + if not screenshot_id: + return None + + full_path = session_dir_path / "shots" / f"{screenshot_id}_full.png" + if not full_path.is_file(): + return None + + # Résolution écran + screen_w = 1280 + screen_h = 800 + window_capture = evt.get("window_capture", {}) + if window_capture.get("window_rect"): + rect = window_capture["window_rect"] + screen_w = max(screen_w, rect[2]) + screen_h = max(screen_h, rect[3]) + + return enrich_click_from_screenshot( + screenshot_path=full_path, + click_x=click_x, + click_y=click_y, + screen_w=screen_w, + screen_h=screen_h, + window_title=window_title, + vision_info=evt.get("vision_info") if isinstance(evt.get("vision_info"), dict) else None, + session_dir=session_dir_path, + screenshot_id=screenshot_id, + ) + except Exception as e: + logger.debug(f"IRBuilder._enrich_click: {e}") + return None + def _describe_segment(self, segment: List[Dict]) -> str: """Décrire un segment en langage naturel (pour gemma4).""" parts = [] diff --git a/tests/unit/test_chat_interface.py b/tests/unit/test_chat_interface.py new file mode 100644 index 000000000..3e29e7bb9 --- /dev/null +++ b/tests/unit/test_chat_interface.py @@ -0,0 +1,441 @@ +# tests/unit/test_chat_interface.py +""" +Tests unitaires du module chat_interface (Léa conversationnelle). + +Vérifie : +1. Création de session (état initial, message d'accueil) +2. Envoi de message → appel TaskPlanner mocké +3. Historique (get_history) +4. Transitions d'états idle → planning → awaiting_confirmation → executing → done +5. Abandon (utilisateur répond "non") +6. Fallback gracieux quand gemma4/TaskPlanner indisponible +7. ChatManager (création, listing, cleanup) +""" + +from __future__ import annotations + +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from agent_v0.server_v1.chat_interface import ( + ChatSession, + ChatManager, + STATE_IDLE, + STATE_PLANNING, + STATE_AWAITING_CONFIRMATION, + STATE_EXECUTING, + STATE_DONE, + STATE_ERROR, + ROLE_USER, + ROLE_LEA, +) +from agent_v0.server_v1.task_planner import TaskPlan + + +# ============================================================================= +# Fixtures +# ============================================================================= + +@pytest.fixture +def sample_workflows(): + return [ + { + "session_id": "sess_bloc_notes", + "name": "Bloc-notes", + "description": "Ouvrir Bloc-notes via Exécuter (Win+R) et écrire du texte", + }, + ] + + +@pytest.fixture +def understood_plan(): + """TaskPlan qui a compris l'ordre et matche un workflow.""" + return TaskPlan( + instruction="ouvre le bloc-notes et écris bonjour", + understood=True, + workflow_match="sess_bloc_notes", + workflow_name="Bloc-notes", + match_confidence=0.9, + parameters={"texte": "bonjour"}, + is_loop=False, + mode="replay", + ) + + +@pytest.fixture +def unknown_plan(): + """TaskPlan qui n'a pas compris.""" + return TaskPlan( + instruction="fais le café", + understood=False, + error="aucun workflow ne correspond", + ) + + +@pytest.fixture +def mock_task_planner(understood_plan): + planner = MagicMock() + planner.understand.return_value = understood_plan + return planner + + +@pytest.fixture +def mock_replay_callback(): + return MagicMock(return_value="replay_abc123") + + +@pytest.fixture +def mock_status_provider(): + """Retourne un dict par défaut 'running' — peut être modifié dans les tests.""" + return MagicMock(return_value={ + "status": "running", + "completed_actions": 1, + "total_actions": 5, + }) + + +@pytest.fixture +def session(mock_task_planner, sample_workflows, mock_replay_callback, mock_status_provider): + return ChatSession( + task_planner=mock_task_planner, + workflows_provider=lambda: sample_workflows, + replay_callback=mock_replay_callback, + status_provider=mock_status_provider, + ) + + +# ============================================================================= +# Tests création session +# ============================================================================= + +class TestSessionCreation: + def test_session_id_generated(self): + s = ChatSession() + assert s.session_id.startswith("chat_") + + def test_initial_state_is_idle(self): + s = ChatSession() + assert s.state == STATE_IDLE + + def test_welcome_message_present(self): + s = ChatSession() + history = s.get_history() + assert len(history) == 1 + assert history[0]["role"] == ROLE_LEA + assert "Bonjour" in history[0]["content"] or "Léa" in history[0]["content"] + + def test_session_id_custom(self): + s = ChatSession(session_id="custom_42") + assert s.session_id == "custom_42" + + +# ============================================================================= +# Tests send_message +# ============================================================================= + +class TestSendMessage: + def test_empty_message_rejected(self, session): + result = session.send_message("") + assert result["ok"] is False + + def test_send_message_calls_planner(self, session, mock_task_planner): + session.send_message("ouvre le bloc-notes") + mock_task_planner.understand.assert_called_once() + call = mock_task_planner.understand.call_args + assert call.kwargs["instruction"] == "ouvre le bloc-notes" + # workflows_provider a été appelé et passé + assert "available_workflows" in call.kwargs + assert len(call.kwargs["available_workflows"]) == 1 + + def test_send_message_transitions_to_awaiting_confirmation(self, session): + result = session.send_message("ouvre le bloc-notes") + assert result["ok"] is True + assert session.state == STATE_AWAITING_CONFIRMATION + assert result["state"] == STATE_AWAITING_CONFIRMATION + + def test_user_message_added_to_history(self, session): + session.send_message("ouvre le bloc-notes") + history = session.get_history() + user_msgs = [m for m in history if m["role"] == ROLE_USER] + assert len(user_msgs) == 1 + assert user_msgs[0]["content"] == "ouvre le bloc-notes" + + def test_lea_proposal_added_to_history(self, session): + session.send_message("ouvre le bloc-notes") + history = session.get_history() + lea_msgs = [m for m in history if m["role"] == ROLE_LEA] + # Bienvenue + proposition + assert len(lea_msgs) == 2 + proposal = lea_msgs[-1]["content"] + assert "Bloc-notes" in proposal + assert "oui" in proposal.lower() or "y aller" in proposal.lower() + + def test_proposal_contains_confidence(self, session): + session.send_message("ouvre le bloc-notes") + history = session.get_history() + proposal = history[-1]["content"] + # 0.9 → 90% + assert "90" in proposal + + def test_proposal_contains_parameters(self, session): + session.send_message("ouvre le bloc-notes") + history = session.get_history() + proposal = history[-1]["content"] + assert "texte" in proposal + assert "bonjour" in proposal + + +# ============================================================================= +# Tests get_history +# ============================================================================= + +class TestGetHistory: + def test_history_returns_list_of_dicts(self, session): + history = session.get_history() + assert isinstance(history, list) + assert all(isinstance(m, dict) for m in history) + + def test_history_message_structure(self, session): + history = session.get_history() + msg = history[0] + assert "role" in msg + assert "content" in msg + assert "timestamp" in msg + assert "meta" in msg + + def test_history_grows_with_messages(self, session): + initial = len(session.get_history()) + session.send_message("ouvre le bloc-notes") + assert len(session.get_history()) > initial + + +# ============================================================================= +# Tests transitions d'états +# ============================================================================= + +class TestStateTransitions: + def test_full_happy_path(self, session, mock_task_planner, mock_replay_callback): + """idle → planning → awaiting_confirmation → executing → done.""" + # Départ : idle + assert session.state == STATE_IDLE + + # Envoi message → planning → awaiting_confirmation + session.send_message("ouvre le bloc-notes") + assert session.state == STATE_AWAITING_CONFIRMATION + + # Confirmation → executing + result = session.confirm(confirmed=True) + assert result["ok"] is True + assert session.state == STATE_EXECUTING + mock_replay_callback.assert_called_once() + call = mock_replay_callback.call_args + assert call.kwargs["session_id"] == "sess_bloc_notes" + + # Simulation : replay terminé → done + session._status_provider.return_value = { + "status": "done", + "completed_actions": 5, + "total_actions": 5, + } + session.refresh_progress() + assert session.state == STATE_DONE + + def test_confirm_via_message_oui(self, session, mock_replay_callback): + """Le TIM peut répondre 'oui' en message au lieu d'un bouton.""" + session.send_message("ouvre le bloc-notes") + assert session.state == STATE_AWAITING_CONFIRMATION + + session.send_message("oui") + assert session.state == STATE_EXECUTING + mock_replay_callback.assert_called_once() + + def test_refusal_via_confirm_false(self, session, mock_replay_callback): + """confirm(False) → retour à idle, pas d'exécution.""" + session.send_message("ouvre le bloc-notes") + result = session.confirm(confirmed=False) + assert result["ok"] is True + assert result["confirmed"] is False + assert session.state == STATE_IDLE + mock_replay_callback.assert_not_called() + + def test_refusal_via_message_non(self, session, mock_replay_callback): + """Le TIM répond 'non' → annulation.""" + session.send_message("ouvre le bloc-notes") + session.send_message("non") + assert session.state == STATE_IDLE + mock_replay_callback.assert_not_called() + # Le message d'annulation doit être dans l'historique + history = session.get_history() + assert any("annule" in m["content"].lower() for m in history) + + def test_ambiguous_confirmation_reply(self, session): + """Réponse ambiguë pendant awaiting_confirmation → demande de clarification.""" + session.send_message("ouvre le bloc-notes") + result = session.send_message("peut-être") + assert session.state == STATE_AWAITING_CONFIRMATION + assert result.get("needs_clarification") is True + + def test_failed_replay_transitions_to_error(self, session): + """replay_callback lève une exception → état error.""" + session._replay_callback = MagicMock(side_effect=RuntimeError("boom")) + session.send_message("ouvre le bloc-notes") + result = session.confirm(confirmed=True) + assert result["ok"] is False + assert session.state == STATE_ERROR + + def test_replay_failure_from_status(self, session): + """Le replay rapporte 'failed' → état error.""" + session.send_message("ouvre le bloc-notes") + session.confirm(confirmed=True) + assert session.state == STATE_EXECUTING + + session._status_provider.return_value = { + "status": "failed", + "error": "element introuvable", + } + session.refresh_progress() + assert session.state == STATE_ERROR + + +# ============================================================================= +# Tests fallback / résilience +# ============================================================================= + +class TestResilience: + def test_no_task_planner_graceful(self): + """Sans TaskPlanner, on reste gracieux.""" + s = ChatSession(task_planner=None) + result = s.send_message("test") + assert result["ok"] is False + assert s.state == STATE_ERROR + # Message d'erreur présent dans l'historique + history = s.get_history() + assert any("désolée" in m["content"].lower() or "indisponible" in m["content"].lower() + for m in history) + + def test_task_planner_exception_graceful(self, mock_replay_callback): + """TaskPlanner lève une exception (gemma4 down) → état error propre.""" + planner = MagicMock() + planner.understand.side_effect = RuntimeError("gemma4 offline") + + s = ChatSession( + task_planner=planner, + workflows_provider=lambda: [], + replay_callback=mock_replay_callback, + ) + result = s.send_message("test") + assert result["ok"] is False + assert s.state == STATE_ERROR + + def test_instruction_not_understood(self, unknown_plan, mock_replay_callback): + """Plan.understood = False → message d'erreur explicite.""" + planner = MagicMock() + planner.understand.return_value = unknown_plan + + s = ChatSession( + task_planner=planner, + workflows_provider=lambda: [], + replay_callback=mock_replay_callback, + ) + result = s.send_message("fais le café") + assert result["ok"] is False + assert s.state == STATE_ERROR + history = s.get_history() + assert any("reformuler" in m["content"].lower() for m in history) + + def test_no_replay_callback(self, mock_task_planner, sample_workflows): + """Sans replay_callback, on refuse l'exécution proprement.""" + s = ChatSession( + task_planner=mock_task_planner, + workflows_provider=lambda: sample_workflows, + replay_callback=None, + ) + s.send_message("ouvre le bloc-notes") + result = s.confirm(confirmed=True) + assert result["ok"] is False + assert s.state == STATE_ERROR + + +# ============================================================================= +# Tests snapshot +# ============================================================================= + +class TestSnapshot: + def test_snapshot_structure(self, session): + snap = session.get_snapshot() + assert "session_id" in snap + assert "state" in snap + assert "messages" in snap + assert "pending_plan" in snap + assert "active_replay_id" in snap + assert "progress" in snap + + def test_snapshot_includes_pending_plan_when_awaiting(self, session): + session.send_message("ouvre le bloc-notes") + snap = session.get_snapshot() + assert snap["state"] == STATE_AWAITING_CONFIRMATION + assert snap["pending_plan"] is not None + assert snap["pending_plan"]["workflow_name"] == "Bloc-notes" + + def test_snapshot_no_pending_plan_in_idle(self, session): + snap = session.get_snapshot() + assert snap["pending_plan"] is None + + +# ============================================================================= +# Tests ChatManager +# ============================================================================= + +class TestChatManager: + def test_create_session(self, mock_task_planner, sample_workflows): + mgr = ChatManager( + task_planner=mock_task_planner, + workflows_provider=lambda: sample_workflows, + ) + s = mgr.create_session() + assert s is not None + assert s.session_id in [x["session_id"] for x in mgr.list_sessions()] + + def test_get_session(self, mock_task_planner): + mgr = ChatManager(task_planner=mock_task_planner) + s = mgr.create_session() + retrieved = mgr.get_session(s.session_id) + assert retrieved is s + + def test_get_session_not_found(self): + mgr = ChatManager() + assert mgr.get_session("unknown") is None + + def test_delete_session(self, mock_task_planner): + mgr = ChatManager(task_planner=mock_task_planner) + s = mgr.create_session() + assert mgr.delete_session(s.session_id) is True + assert mgr.get_session(s.session_id) is None + + def test_cleanup_old_sessions(self, mock_task_planner): + mgr = ChatManager(task_planner=mock_task_planner) + s = mgr.create_session() + # Simuler une session très ancienne + s.updated_at = time.time() - 100000 + removed = mgr.cleanup_old(max_age_s=3600) + assert removed == 1 + assert mgr.get_session(s.session_id) is None + + def test_list_sessions_structure(self, mock_task_planner): + mgr = ChatManager(task_planner=mock_task_planner) + mgr.create_session(machine_id="pc-01") + sessions = mgr.list_sessions() + assert len(sessions) == 1 + s = sessions[0] + assert "session_id" in s + assert "state" in s + assert "machine_id" in s + assert s["machine_id"] == "pc-01" diff --git a/web_dashboard/app.py b/web_dashboard/app.py index 0f6580be0..008c0c9d1 100644 --- a/web_dashboard/app.py +++ b/web_dashboard/app.py @@ -1876,7 +1876,7 @@ def load_system_config(): "version": "1.0.0", "services": {}, "llm": {"provider": "ollama", "base_url": "http://localhost:11434", "model": "qwen2.5:7b"}, - "vlm": {"provider": "ollama", "base_url": "http://localhost:11434", "model": "qwen2.5vl:7b"}, + "vlm": {"provider": "ollama", "base_url": "http://localhost:11434", "model": "gemma4:e4b"}, "detection": {"owl_model": "google/owlv2-base-patch16-ensemble", "confidence_threshold": 0.3}, "database": {"type": "sqlite", "path": "data/training/workflows.db"}, "security": {"enable_encryption": True, "require_authentication": False} @@ -2371,6 +2371,93 @@ def proxy_streaming(endpoint): return jsonify({'error': str(e)}), 500 +# ============================================================================= +# Chat conversationnel — Léa +# ============================================================================= + +CHAT_BASE_URL = 'http://localhost:5005/api/v1/chat' + + +@app.route('/chat') +def chat_page(): + """Page de chat conversationnel avec Léa.""" + return render_template('chat.html') + + +@app.route('/api/chat/session', methods=['POST']) +def proxy_chat_session(): + """Proxy : créer une session de chat côté serveur streaming.""" + return _proxy_chat( + method='POST', + path='/session', + payload=request.get_json(silent=True) or {}, + ) + + +@app.route('/api/chat//message', methods=['POST']) +def proxy_chat_message(session_id): + """Proxy : envoyer un message dans une session.""" + return _proxy_chat( + method='POST', + path=f'/{session_id}/message', + payload=request.get_json(silent=True) or {}, + ) + + +@app.route('/api/chat//history', methods=['GET']) +def proxy_chat_history(session_id): + """Proxy : récupérer l'historique.""" + return _proxy_chat(method='GET', path=f'/{session_id}/history') + + +@app.route('/api/chat//confirm', methods=['POST']) +def proxy_chat_confirm(session_id): + """Proxy : confirmer l'exécution d'un plan.""" + return _proxy_chat( + method='POST', + path=f'/{session_id}/confirm', + payload=request.get_json(silent=True) or {}, + ) + + +def _proxy_chat(method, path, payload=None): + """Helper pour proxyfier les requêtes vers le serveur streaming (:5005).""" + import urllib.request + import urllib.error + + url = f'{CHAT_BASE_URL}{path}' + headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + } + # Token Bearer (lu depuis l'env — même token que le serveur streaming) + token = os.environ.get('RPA_API_TOKEN', '') + if token: + headers['Authorization'] = f'Bearer {token}' + + try: + data_bytes = None + if payload is not None and method != 'GET': + data_bytes = json.dumps(payload).encode('utf-8') + req = urllib.request.Request(url, data=data_bytes, headers=headers, method=method) + with urllib.request.urlopen(req, timeout=15) as response: + body = response.read().decode('utf-8') + try: + return jsonify(json.loads(body)) + except json.JSONDecodeError: + return body, response.status, {'Content-Type': 'application/json'} + except urllib.error.HTTPError as e: + try: + detail = json.loads(e.read().decode('utf-8')) + except Exception: + detail = {'error': str(e)} + return jsonify(detail), e.code + except urllib.error.URLError as e: + return jsonify({'error': f'Serveur chat inaccessible : {e}'}), 502 + except Exception as e: + return jsonify({'error': str(e)}), 500 + + # ============================================================================= # Main # ============================================================================= diff --git a/web_dashboard/static/js/chat.js b/web_dashboard/static/js/chat.js new file mode 100644 index 000000000..8e8f05cce --- /dev/null +++ b/web_dashboard/static/js/chat.js @@ -0,0 +1,240 @@ +// chat.js — Client Léa conversationnelle +// Logique minimaliste : pas de framework, fetch + polling. + +const API_BASE = "/api/chat"; // Proxyfié par le dashboard Flask vers :5005 + +let sessionId = null; +let pollTimer = null; +let lastMessageCount = 0; +let currentState = "idle"; + +const STATE_LABELS = { + idle: "En attente", + planning: "Léa réfléchit…", + awaiting_confirmation: "En attente de confirmation", + executing: "Léa exécute le workflow…", + done: "Terminé", + error: "Erreur", +}; + +// ----------------------------------------------------------------------------- +// Initialisation +// ----------------------------------------------------------------------------- + +async function initChat() { + try { + const resp = await fetch(`${API_BASE}/session`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ machine_id: "default" }), + }); + if (!resp.ok) throw new Error(`HTTP ${resp.status}`); + const data = await resp.json(); + sessionId = data.session_id; + currentState = data.state || "idle"; + updateStatus(currentState); + renderMessages(data.history || []); + document.getElementById("sessionInfo").textContent = `Session ${sessionId}`; + startPolling(); + } catch (err) { + console.error("Impossible de créer la session chat :", err); + showSystemMessage(`Impossible de créer la session chat : ${err.message}. Vérifiez que le serveur streaming (5005) est démarré.`); + } +} + +// ----------------------------------------------------------------------------- +// Envoi de messages +// ----------------------------------------------------------------------------- + +async function sendMessage() { + const input = document.getElementById("composerInput"); + const text = (input.value || "").trim(); + if (!text || !sessionId) return; + + const sendBtn = document.getElementById("sendBtn"); + sendBtn.disabled = true; + input.value = ""; + autosizeTextarea(); + + // Affichage optimiste + appendMessage({ + role: "user", + content: text, + timestamp: Date.now() / 1000, + }); + + try { + updateStatus("planning"); + const resp = await fetch(`${API_BASE}/${sessionId}/message`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ message: text }), + }); + if (!resp.ok) throw new Error(`HTTP ${resp.status}`); + const data = await resp.json(); + currentState = data.state || "idle"; + updateStatus(currentState); + renderMessages(data.history || []); + } catch (err) { + console.error("Erreur envoi message :", err); + showSystemMessage(`Erreur : ${err.message}`); + updateStatus("error"); + } finally { + sendBtn.disabled = false; + input.focus(); + } +} + +async function confirmPlan(confirmed) { + if (!sessionId) return; + const confirmBar = document.getElementById("confirmBar"); + confirmBar.classList.remove("visible"); + + try { + const resp = await fetch(`${API_BASE}/${sessionId}/confirm`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ confirmed }), + }); + if (!resp.ok) throw new Error(`HTTP ${resp.status}`); + const data = await resp.json(); + currentState = data.state || "idle"; + updateStatus(currentState); + renderMessages(data.history || []); + } catch (err) { + console.error("Erreur confirmation :", err); + showSystemMessage(`Erreur confirmation : ${err.message}`); + } +} + +// ----------------------------------------------------------------------------- +// Polling +// ----------------------------------------------------------------------------- + +function startPolling() { + if (pollTimer) clearInterval(pollTimer); + pollTimer = setInterval(pollHistory, 2000); +} + +async function pollHistory() { + if (!sessionId) return; + try { + const resp = await fetch(`${API_BASE}/${sessionId}/history`); + if (!resp.ok) return; + const data = await resp.json(); + const snap = data.snapshot || {}; + currentState = snap.state || "idle"; + updateStatus(currentState, snap.progress || {}); + const messages = snap.messages || []; + if (messages.length !== lastMessageCount) { + renderMessages(messages); + } + } catch (err) { + // Silencieux — on réessayera au prochain tick + } +} + +// ----------------------------------------------------------------------------- +// Rendu +// ----------------------------------------------------------------------------- + +function renderMessages(messages) { + const container = document.getElementById("messages"); + container.innerHTML = ""; + messages.forEach(msg => appendMessage(msg, false)); + lastMessageCount = messages.length; + container.scrollTop = container.scrollHeight; + + // Afficher/masquer la barre de confirmation + const confirmBar = document.getElementById("confirmBar"); + if (currentState === "awaiting_confirmation") { + confirmBar.classList.add("visible"); + } else { + confirmBar.classList.remove("visible"); + } +} + +function appendMessage(msg, autoscroll = true) { + const container = document.getElementById("messages"); + const div = document.createElement("div"); + div.className = `message ${msg.role}`; + + const avatar = document.createElement("div"); + avatar.className = "avatar"; + if (msg.role === "user") avatar.textContent = "Vous"; + else if (msg.role === "lea") avatar.textContent = "L"; + else avatar.textContent = "i"; + + const bubbleWrap = document.createElement("div"); + const bubble = document.createElement("div"); + bubble.className = "bubble"; + bubble.textContent = msg.content || ""; + bubbleWrap.appendChild(bubble); + + const ts = document.createElement("div"); + ts.className = "timestamp"; + try { + const d = new Date((msg.timestamp || 0) * 1000); + ts.textContent = d.toLocaleTimeString("fr-FR"); + } catch (e) { ts.textContent = ""; } + bubbleWrap.appendChild(ts); + + div.appendChild(avatar); + div.appendChild(bubbleWrap); + container.appendChild(div); + + if (autoscroll) container.scrollTop = container.scrollHeight; +} + +function showSystemMessage(text) { + appendMessage({ + role: "system", + content: text, + timestamp: Date.now() / 1000, + }); +} + +function updateStatus(state, progress = {}) { + const dot = document.getElementById("statusDot"); + const txt = document.getElementById("statusText"); + dot.className = `status-dot ${state}`; + let label = STATE_LABELS[state] || state; + + if (state === "executing" && progress && progress.total_actions) { + const done = progress.completed_actions || 0; + const total = progress.total_actions || 0; + label = `Léa exécute… ${done}/${total}`; + } + + txt.textContent = label; + + // Bloquer la saisie pendant planning/executing + const input = document.getElementById("composerInput"); + const sendBtn = document.getElementById("sendBtn"); + const blocked = (state === "planning" || state === "executing"); + input.disabled = blocked; + sendBtn.disabled = blocked; +} + +// ----------------------------------------------------------------------------- +// UX composer +// ----------------------------------------------------------------------------- + +function handleKeydown(event) { + if (event.key === "Enter" && !event.shiftKey) { + event.preventDefault(); + sendMessage(); + } +} + +function autosizeTextarea() { + const input = document.getElementById("composerInput"); + input.style.height = "auto"; + input.style.height = Math.min(input.scrollHeight, 120) + "px"; +} + +document.addEventListener("DOMContentLoaded", () => { + const input = document.getElementById("composerInput"); + input.addEventListener("input", autosizeTextarea); + initChat(); +}); diff --git a/web_dashboard/templates/chat.html b/web_dashboard/templates/chat.html new file mode 100644 index 000000000..8afb805d4 --- /dev/null +++ b/web_dashboard/templates/chat.html @@ -0,0 +1,309 @@ + + + + + + Léa — Chat RPA Vision V3 + + + +
+

Léa — Assistant RPA Vision V3

+ Retour au dashboard +
+ +
+
+
+ État : + + + En attente + +
+
Aucune session
+
+ +
+ +
+
Léa propose un plan. Confirmer l'exécution ?
+
+ + +
+
+ +
+ + +
+
+ + + +