diff --git a/core/workflow/ir_builder.py b/core/workflow/ir_builder.py new file mode 100644 index 000000000..542cdbbc6 --- /dev/null +++ b/core/workflow/ir_builder.py @@ -0,0 +1,365 @@ +# core/workflow/ir_builder.py +""" +IRBuilder — Transforme une RawTrace en WorkflowIR. + +C'est le "compilateur de savoir-faire" : + RawTrace (clics bruts) → WorkflowIR (connaissance structurée) + +Le builder utilise gemma4 pour COMPRENDRE ce que l'utilisateur a fait : +- Segmenter les actions en étapes logiques +- Identifier l'intention de chaque étape +- Détecter les variables (données qui changent entre les exécutions) +- Définir les pré/postconditions + +Le builder est appelé UNE SEULE FOIS après l'enregistrement. +Le WorkflowIR produit est ensuite réutilisé pour chaque replay. +""" + +import json +import logging +import os +import time +from pathlib import Path +from typing import Any, Dict, List, Optional + +from .workflow_ir import WorkflowIR, Step, Action, Variable + +logger = logging.getLogger(__name__) + + +class IRBuilder: + """Construit un WorkflowIR depuis une RawTrace (événements bruts). + + Usage : + builder = IRBuilder() + ir = builder.build( + events=raw_events, + session_id="sess_xxx", + domain="tim_codage", + ) + ir.save("data/workflows/") + """ + + def __init__(self, gemma4_port: str = ""): + self._gemma4_port = gemma4_port or os.environ.get("GEMMA4_PORT", "11435") + self._gemma4_url = f"http://localhost:{self._gemma4_port}/api/chat" + + def build( + self, + events: List[Dict[str, Any]], + session_id: str = "", + session_dir: str = "", + domain: str = "generic", + name: str = "", + ) -> WorkflowIR: + """Construire un WorkflowIR depuis des événements bruts. + + Étapes : + 1. Filtrer les événements parasites + 2. Segmenter en étapes logiques (par changement de fenêtre/intention) + 3. Pour chaque étape, identifier l'intention via gemma4 + 4. Détecter les variables + 5. Définir pré/postconditions + """ + t_start = time.time() + + # Créer le WorkflowIR vide + ir = WorkflowIR.new( + name=name or f"Workflow du {time.strftime('%d/%m/%Y %H:%M')}", + domain=domain, + learned_from=session_id, + ) + + # 1. Filtrer les événements utiles + actionable = self._filter_events(events) + if not actionable: + logger.warning("IRBuilder: aucun événement actionable") + return ir + + # 2. Détecter les applications utilisées + ir.applications = self._detect_applications(actionable) + + # 3. Segmenter en étapes logiques + segments = self._segment_into_steps(actionable) + + # 4. Pour chaque segment, construire une Step + for i, segment in enumerate(segments): + step = self._build_step( + segment=segment, + step_index=i, + total_steps=len(segments), + workflow_name=ir.name, + domain=domain, + ) + ir.steps.append(step) + + # 5. Détecter les variables + ir.variables = self._detect_variables(ir.steps, actionable) + + elapsed = time.time() - t_start + logger.info( + f"IRBuilder: WorkflowIR construit en {elapsed:.1f}s — " + f"{len(ir.steps)} étapes, {len(ir.variables)} variables, " + f"{len(ir.applications)} applications" + ) + + return ir + + def _filter_events(self, events: List[Dict]) -> List[Dict]: + """Filtrer les événements parasites (heartbeat, focus_change, etc.).""" + ignored_types = {"heartbeat", "focus_change", "action_result", "window_focus_change"} + result = [] + for raw_evt in events: + evt = raw_evt.get("event", raw_evt) + if evt.get("type", "") not in ignored_types: + result.append(evt) + return result + + def _detect_applications(self, events: List[Dict]) -> List[str]: + """Détecter les applications utilisées.""" + apps = set() + for evt in events: + title = evt.get("window", {}).get("title", "") + if title and title != "unknown_window": + for sep in [" – ", " - ", " — "]: + if sep in title: + apps.add(title.split(sep)[-1].strip()) + break + return sorted(apps) + + def _segment_into_steps(self, events: List[Dict]) -> List[List[Dict]]: + """Segmenter les événements en étapes logiques. + + Critères de coupure : + - Changement d'application (fenêtre différente) + - Pause longue (> 5s entre deux événements) + - Transition logique (clic → frappe → clic = étapes différentes) + """ + if not events: + return [] + + segments = [] + current_segment = [events[0]] + current_app = self._get_app_name(events[0]) + + for evt in events[1:]: + app = self._get_app_name(evt) + evt_type = evt.get("type", "") + + # Coupure par changement d'application + app_changed = app and current_app and app != current_app + + # Coupure par pause longue + prev_ts = float(current_segment[-1].get("timestamp", 0)) + curr_ts = float(evt.get("timestamp", 0)) + long_pause = (curr_ts - prev_ts) > 5.0 if prev_ts > 0 and curr_ts > 0 else False + + # Coupure par transition clic → nouveau clic (nouvelle intention) + transition = ( + evt_type == "mouse_click" + and len(current_segment) >= 2 + and current_segment[-1].get("type") not in ("mouse_click",) + ) + + if app_changed or long_pause: + if current_segment: + segments.append(current_segment) + current_segment = [evt] + current_app = app + else: + current_segment.append(evt) + + if current_segment: + segments.append(current_segment) + + return segments + + def _get_app_name(self, evt: Dict) -> str: + """Extraire le nom d'application depuis un événement.""" + title = evt.get("window", {}).get("title", "") + for sep in [" – ", " - ", " — "]: + if sep in title: + return title.split(sep)[-1].strip() + return title + + def _build_step( + self, + segment: List[Dict], + step_index: int, + total_steps: int, + workflow_name: str, + domain: str, + ) -> Step: + """Construire une Step depuis un segment d'événements. + + Utilise gemma4 pour comprendre l'intention du segment. + """ + # Construire la description du segment pour gemma4 + actions = [] + for evt in segment: + action = self._event_to_action(evt) + if action: + actions.append(action) + + # Description textuelle du segment + segment_desc = self._describe_segment(segment) + + # Demander à gemma4 l'intention + intent, precondition, postcondition = self._analyze_intent( + segment_desc, step_index, total_steps, workflow_name, domain, + ) + + return Step( + step_id=f"s{step_index + 1}", + intent=intent or segment_desc, + precondition=precondition, + postcondition=postcondition, + actions=actions, + ) + + def _event_to_action(self, evt: Dict) -> Optional[Action]: + """Convertir un événement brut en Action.""" + evt_type = evt.get("type", "") + + if evt_type == "mouse_click": + window = evt.get("window", {}).get("title", "") + return Action( + type="click", + target=window, + anchor_hint=evt.get("vision_info", {}).get("text", ""), + ) + elif evt_type == "text_input": + text = evt.get("text", "") + if text: + return Action(type="type", text=text) + elif evt_type in ("key_combo", "key_press"): + keys = evt.get("keys", []) + if keys: + return Action(type="key_combo", keys=keys) + elif evt_type == "scroll": + return Action(type="scroll") + + return None + + def _describe_segment(self, segment: List[Dict]) -> str: + """Décrire un segment en langage naturel (pour gemma4).""" + parts = [] + window = "" + for evt in segment: + evt_type = evt.get("type", "") + w = evt.get("window", {}).get("title", "") + if w and w != window: + window = w + parts.append(f"[{w}]") + if evt_type == "mouse_click": + text = evt.get("vision_info", {}).get("text", "") + parts.append(f"clic sur '{text}'" if text else "clic") + elif evt_type == "text_input": + text = evt.get("text", "") + parts.append(f"saisie '{text[:30]}'") + elif evt_type in ("key_combo", "key_press"): + keys = evt.get("keys", []) + parts.append(f"touche {'+'.join(keys)}") + return " → ".join(parts) if parts else "action" + + def _analyze_intent( + self, + segment_desc: str, + step_index: int, + total_steps: int, + workflow_name: str, + domain: str, + ) -> tuple: + """Demander à gemma4 de comprendre l'intention d'un segment. + + Returns: + (intent, precondition, postcondition) + """ + import requests as _requests + + # Charger le contexte métier + domain_prompt = "" + try: + from agent_v0.server_v1.domain_context import get_domain_context + ctx = get_domain_context(domain) + if ctx.system_prompt: + domain_prompt = f"\nContexte métier : {ctx.name}\n" + except Exception: + pass + + prompt = ( + f"{domain_prompt}" + f"Workflow : {workflow_name} (étape {step_index + 1}/{total_steps})\n" + f"Actions observées : {segment_desc}\n\n" + f"Réponds en 3 lignes :\n" + f"INTENTION: que veut faire l'utilisateur avec ces actions (1 phrase)\n" + f"AVANT: état attendu de l'écran avant cette étape (1 phrase)\n" + f"APRÈS: état attendu de l'écran après cette étape (1 phrase)" + ) + + try: + resp = _requests.post( + self._gemma4_url, + json={ + "model": "gemma4:e4b", + "messages": [{"role": "user", "content": prompt}], + "stream": False, + "think": True, + "options": {"temperature": 0.1, "num_predict": 800}, + }, + timeout=30, + ) + if resp.ok: + content = resp.json().get("message", {}).get("content", "") + return self._parse_intent_response(content) + except Exception as e: + logger.debug(f"IRBuilder: gemma4 indisponible ({e})") + + return (segment_desc, "", "") + + def _parse_intent_response(self, content: str) -> tuple: + """Parser la réponse gemma4 (INTENTION/AVANT/APRÈS).""" + intent = "" + precondition = "" + postcondition = "" + + for line in content.split("\n"): + clean = line.strip() + upper = clean.upper() + if upper.startswith("INTENTION:"): + intent = clean.split(":", 1)[1].strip() + elif upper.startswith("AVANT:"): + precondition = clean.split(":", 1)[1].strip() + elif upper.startswith(("APRÈS:", "APRES:")): + postcondition = clean.split(":", 1)[1].strip() + + return (intent, precondition, postcondition) + + def _detect_variables(self, steps: List[Step], events: List[Dict]) -> List[Variable]: + """Détecter les variables dans le workflow. + + Une variable est une donnée qui change entre les exécutions : + - Texte saisi par l'utilisateur (noms, codes, dates) + - Données lues à l'écran (résultats de recherche) + """ + variables = [] + seen_texts = set() + + for step in steps: + for action in step.actions: + if action.type == "type" and action.text: + text = action.text.strip() + if text and text not in seen_texts and len(text) > 2: + seen_texts.add(text) + var_name = f"texte_{len(variables) + 1}" + variables.append(Variable( + name=var_name, + description=f"Texte saisi : '{text[:50]}'", + source="user", + default=text, + )) + # Marquer l'action comme variable + action.variable = True + action.text = f"{{{var_name}}}" + + return variables diff --git a/core/workflow/workflow_ir.py b/core/workflow/workflow_ir.py new file mode 100644 index 000000000..f6de7dbdd --- /dev/null +++ b/core/workflow/workflow_ir.py @@ -0,0 +1,268 @@ +# core/workflow/workflow_ir.py +""" +WorkflowIR — Représentation Intermédiaire d'un workflow. + +C'est la CONNAISSANCE que Léa a acquise en observant un utilisateur. +Pas les clics bruts (RawTrace), pas le plan d'exécution (ExecutionPlan). +C'est ce que Léa a COMPRIS. + +Format générique — fonctionne pour n'importe quel métier : +- TIM qui code des dossiers patients +- Comptable qui saisit des factures +- RH qui édite des fiches de paie +- Logisticien qui gère des stocks + +Le domaine métier est une couche par-dessus (domain_context), +pas dans le WorkflowIR lui-même. + +Cycle de vie : + RawTrace (capture) → WorkflowIR (compréhension) → ExecutionPlan (exécution) + +Le WorkflowIR est : +- versionné (chaque recompilation incrémente la version) +- indépendant de la résolution d'écran +- indépendant du poste cible +- paramétrable (variables substituables) +- enrichi par l'apprentissage (chaque replay améliore le IR) +""" + +import json +import logging +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +# ========================================================================= +# Structures de données +# ========================================================================= + + +@dataclass +class Variable: + """Variable substituable dans un workflow.""" + name: str # Identifiant (ex: "patient", "facture_num") + description: str = "" # Description humaine + source: str = "user" # Origine : "user", "screen", "file", "previous_step" + default: str = "" # Valeur par défaut + required: bool = True + + def to_dict(self) -> Dict[str, Any]: + return { + "name": self.name, + "description": self.description, + "source": self.source, + "default": self.default, + "required": self.required, + } + + @classmethod + def from_dict(cls, d: Dict) -> "Variable": + return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) + + +@dataclass +class Action: + """Action élémentaire dans une étape.""" + type: str # click, type, key_combo, wait, scroll + target: str = "" # Description de la cible ("bouton Enregistrer") + text: str = "" # Texte à taper (pour type) + keys: List[str] = field(default_factory=list) # Touches (pour key_combo) + duration_ms: int = 0 # Durée (pour wait) + variable: bool = False # True si le texte contient une variable {var} + anchor_hint: str = "" # Indice visuel pour aider la résolution + + def to_dict(self) -> Dict[str, Any]: + d = {"type": self.type} + if self.target: + d["target"] = self.target + if self.text: + d["text"] = self.text + if self.keys: + d["keys"] = self.keys + if self.duration_ms: + d["duration_ms"] = self.duration_ms + if self.variable: + d["variable"] = True + if self.anchor_hint: + d["anchor_hint"] = self.anchor_hint + return d + + @classmethod + def from_dict(cls, d: Dict) -> "Action": + return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) + + +@dataclass +class Step: + """Étape logique d'un workflow — une intention métier.""" + step_id: str + intent: str # "Ouvrir le dossier", "Saisir le code" + precondition: str = "" # "L'application est sur l'écran de liste" + postcondition: str = "" # "Le dossier est affiché" + actions: List[Action] = field(default_factory=list) + is_optional: bool = False # Étape optionnelle (peut être sautée) + is_loop: bool = False # Étape répétée (pour chaque élément) + loop_variable: str = "" # Variable de boucle + + def to_dict(self) -> Dict[str, Any]: + d = { + "step_id": self.step_id, + "intent": self.intent, + "actions": [a.to_dict() for a in self.actions], + } + if self.precondition: + d["precondition"] = self.precondition + if self.postcondition: + d["postcondition"] = self.postcondition + if self.is_optional: + d["is_optional"] = True + if self.is_loop: + d["is_loop"] = True + d["loop_variable"] = self.loop_variable + return d + + @classmethod + def from_dict(cls, d: Dict) -> "Step": + actions = [Action.from_dict(a) for a in d.get("actions", [])] + return cls( + step_id=d["step_id"], + intent=d.get("intent", ""), + precondition=d.get("precondition", ""), + postcondition=d.get("postcondition", ""), + actions=actions, + is_optional=d.get("is_optional", False), + is_loop=d.get("is_loop", False), + loop_variable=d.get("loop_variable", ""), + ) + + +@dataclass +class WorkflowIR: + """Représentation Intermédiaire d'un workflow — la connaissance compilée. + + C'est ce que Léa a compris en observant l'utilisateur. + Indépendant du poste, de la résolution, du runtime. + """ + workflow_id: str + version: int = 1 + name: str = "" + description: str = "" + domain: str = "generic" # Domaine métier (tim_codage, compta, rh, stocks...) + learned_from: str = "" # session_id source + created_at: float = 0.0 + updated_at: float = 0.0 + + # Contenu + variables: List[Variable] = field(default_factory=list) + steps: List[Step] = field(default_factory=list) + + # Métadonnées d'apprentissage + replay_count: int = 0 # Nombre de replays effectués + success_rate: float = 0.0 # Taux de succès moyen + last_replay_at: float = 0.0 + + # Applications utilisées (détectées lors de l'apprentissage) + applications: List[str] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + return { + "workflow_id": self.workflow_id, + "version": self.version, + "name": self.name, + "description": self.description, + "domain": self.domain, + "learned_from": self.learned_from, + "created_at": self.created_at, + "updated_at": self.updated_at, + "variables": [v.to_dict() for v in self.variables], + "steps": [s.to_dict() for s in self.steps], + "replay_count": self.replay_count, + "success_rate": round(self.success_rate, 3), + "last_replay_at": self.last_replay_at, + "applications": self.applications, + } + + def to_json(self, indent: int = 2) -> str: + return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent) + + @classmethod + def from_dict(cls, d: Dict) -> "WorkflowIR": + variables = [Variable.from_dict(v) for v in d.get("variables", [])] + steps = [Step.from_dict(s) for s in d.get("steps", [])] + return cls( + workflow_id=d["workflow_id"], + version=d.get("version", 1), + name=d.get("name", ""), + description=d.get("description", ""), + domain=d.get("domain", "generic"), + learned_from=d.get("learned_from", ""), + created_at=d.get("created_at", 0), + updated_at=d.get("updated_at", 0), + variables=variables, + steps=steps, + replay_count=d.get("replay_count", 0), + success_rate=d.get("success_rate", 0), + last_replay_at=d.get("last_replay_at", 0), + applications=d.get("applications", []), + ) + + @classmethod + def from_json(cls, json_str: str) -> "WorkflowIR": + return cls.from_dict(json.loads(json_str)) + + def save(self, directory: str) -> Path: + """Sauvegarder le WorkflowIR dans un fichier JSON.""" + dir_path = Path(directory) + dir_path.mkdir(parents=True, exist_ok=True) + file_path = dir_path / f"{self.workflow_id}_v{self.version}.json" + file_path.write_text(self.to_json(), encoding="utf-8") + logger.info(f"WorkflowIR sauvegardé : {file_path}") + return file_path + + @classmethod + def load(cls, file_path: str) -> "WorkflowIR": + """Charger un WorkflowIR depuis un fichier JSON.""" + return cls.from_json(Path(file_path).read_text(encoding="utf-8")) + + def increment_version(self) -> "WorkflowIR": + """Créer une nouvelle version du workflow (après recompilation).""" + import copy + new = copy.deepcopy(self) + new.version += 1 + new.updated_at = time.time() + return new + + def add_step(self, intent: str, actions: List[Dict] = None, **kwargs) -> Step: + """Ajouter une étape au workflow.""" + step = Step( + step_id=f"s{len(self.steps) + 1}", + intent=intent, + actions=[Action.from_dict(a) for a in (actions or [])], + **kwargs, + ) + self.steps.append(step) + return step + + def add_variable(self, name: str, **kwargs) -> Variable: + """Ajouter une variable au workflow.""" + var = Variable(name=name, **kwargs) + self.variables.append(var) + return var + + @staticmethod + def new(name: str, domain: str = "generic", learned_from: str = "") -> "WorkflowIR": + """Créer un nouveau WorkflowIR vide.""" + return WorkflowIR( + workflow_id=f"wf_{uuid.uuid4().hex[:12]}", + version=1, + name=name, + domain=domain, + learned_from=learned_from, + created_at=time.time(), + updated_at=time.time(), + ) diff --git a/tests/unit/test_workflow_ir.py b/tests/unit/test_workflow_ir.py new file mode 100644 index 000000000..72fbb2ca8 --- /dev/null +++ b/tests/unit/test_workflow_ir.py @@ -0,0 +1,261 @@ +""" +Tests du WorkflowIR et de l'IRBuilder. + +Vérifie que : +- Le format WorkflowIR est correct (sérialisation, désérialisation, versioning) +- L'IRBuilder segmente et comprend les traces brutes +- Les variables sont détectées et substituables +- Le tout fonctionne sans gemma4 (fallback gracieux) +""" + +import json +import shutil +import sys +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from core.workflow.workflow_ir import WorkflowIR, Step, Action, Variable +from core.workflow.ir_builder import IRBuilder + + +# ========================================================================= +# WorkflowIR — format et sérialisation +# ========================================================================= + + +class TestWorkflowIR: + + def test_creation_vide(self): + ir = WorkflowIR.new("Test workflow") + assert ir.workflow_id.startswith("wf_") + assert ir.version == 1 + assert ir.name == "Test workflow" + assert ir.steps == [] + assert ir.variables == [] + + def test_ajout_etapes(self): + ir = WorkflowIR.new("Test") + ir.add_step("Ouvrir l'application", actions=[ + {"type": "click", "target": "icône app"}, + {"type": "wait", "duration_ms": 2000}, + ]) + ir.add_step("Saisir les données", actions=[ + {"type": "type", "text": "bonjour"}, + ]) + assert len(ir.steps) == 2 + assert ir.steps[0].intent == "Ouvrir l'application" + assert len(ir.steps[0].actions) == 2 + assert ir.steps[0].actions[0].type == "click" + + def test_ajout_variables(self): + ir = WorkflowIR.new("Test") + ir.add_variable("patient", description="Nom du patient", source="screen") + ir.add_variable("code", description="Code à saisir", default="A00.0") + assert len(ir.variables) == 2 + assert ir.variables[0].name == "patient" + assert ir.variables[1].default == "A00.0" + + def test_serialisation_json(self): + ir = WorkflowIR.new("Mon workflow", domain="tim_codage") + ir.add_step("Étape 1") + ir.add_variable("var1", description="Une variable") + + json_str = ir.to_json() + data = json.loads(json_str) + + assert data["name"] == "Mon workflow" + assert data["domain"] == "tim_codage" + assert len(data["steps"]) == 1 + assert len(data["variables"]) == 1 + + def test_deserialisation_json(self): + ir = WorkflowIR.new("Test roundtrip") + ir.add_step("Ouvrir", actions=[{"type": "click", "target": "bouton"}]) + ir.add_variable("v1", description="test") + + json_str = ir.to_json() + ir2 = WorkflowIR.from_json(json_str) + + assert ir2.name == "Test roundtrip" + assert len(ir2.steps) == 1 + assert ir2.steps[0].intent == "Ouvrir" + assert ir2.steps[0].actions[0].type == "click" + assert len(ir2.variables) == 1 + + def test_save_et_load(self): + tmpdir = tempfile.mkdtemp() + try: + ir = WorkflowIR.new("Save test") + ir.add_step("Étape 1") + path = ir.save(tmpdir) + + assert path.is_file() + + ir2 = WorkflowIR.load(str(path)) + assert ir2.name == "Save test" + assert len(ir2.steps) == 1 + finally: + shutil.rmtree(tmpdir) + + def test_increment_version(self): + ir = WorkflowIR.new("Versionning") + assert ir.version == 1 + + ir2 = ir.increment_version() + assert ir2.version == 2 + assert ir.version == 1 # Original inchangé + assert ir2.name == "Versionning" + + def test_domaine_generique(self): + """Le WorkflowIR est générique — pas lié à un métier.""" + for domain in ["tim_codage", "comptabilite", "rh_paie", "stocks", "generic"]: + ir = WorkflowIR.new("Test", domain=domain) + assert ir.domain == domain + + def test_etape_optionnelle(self): + ir = WorkflowIR.new("Test") + ir.add_step("Vérification facultative", is_optional=True) + assert ir.steps[0].is_optional is True + + def test_etape_boucle(self): + ir = WorkflowIR.new("Test") + ir.add_step("Traiter chaque dossier", is_loop=True, loop_variable="dossier") + assert ir.steps[0].is_loop is True + assert ir.steps[0].loop_variable == "dossier" + + +# ========================================================================= +# IRBuilder — construction depuis RawTrace +# ========================================================================= + + +class TestIRBuilder: + + def _make_events(self): + """Créer des événements bruts simulés (comme live_events.jsonl).""" + return [ + {"event": {"type": "mouse_click", "pos": [400, 580], "window": {"title": "Lea : Explorateur"}, "timestamp": 100.0, "vision_info": {"text": "Rechercher"}}}, + {"event": {"type": "text_input", "text": "blocnote", "window": {"title": "Rechercher"}, "timestamp": 102.0}}, + {"event": {"type": "key_combo", "keys": ["enter"], "window": {"title": "Rechercher"}, "timestamp": 103.0}}, + {"event": {"type": "heartbeat", "timestamp": 104.0}}, # Parasite — doit être filtré + {"event": {"type": "mouse_click", "pos": [300, 200], "window": {"title": "Rechercher"}, "timestamp": 105.0, "vision_info": {"text": "Bloc-notes"}}}, + {"event": {"type": "mouse_click", "pos": [500, 300], "window": {"title": "Sans titre – Bloc-notes"}, "timestamp": 112.0, "vision_info": {"text": ""}}}, + {"event": {"type": "text_input", "text": "Bonjour le monde", "window": {"title": "*Sans titre – Bloc-notes"}, "timestamp": 113.0}}, + {"event": {"type": "key_combo", "keys": ["ctrl", "s"], "window": {"title": "*Sans titre – Bloc-notes"}, "timestamp": 115.0}}, + ] + + def test_builder_sans_gemma4(self): + """Le builder fonctionne même sans gemma4 (fallback gracieux).""" + builder = IRBuilder(gemma4_port="99999") # Port invalide + events = self._make_events() + + ir = builder.build(events, session_id="test_sess", domain="generic", name="Test") + + assert ir.name == "Test" + assert ir.learned_from == "test_sess" + assert len(ir.steps) >= 1 + assert len(ir.applications) >= 1 + + def test_filtre_heartbeat(self): + """Les heartbeat sont filtrés.""" + builder = IRBuilder(gemma4_port="99999") + events = self._make_events() + + ir = builder.build(events, name="Test") + + # Vérifier qu'aucune action n'est de type heartbeat + for step in ir.steps: + for action in step.actions: + assert action.type != "heartbeat" + + def test_detection_applications(self): + """Les applications utilisées sont détectées.""" + builder = IRBuilder(gemma4_port="99999") + events = self._make_events() + + ir = builder.build(events, name="Test") + + assert "Bloc-notes" in ir.applications or "Explorateur" in ir.applications + + def test_detection_variables(self): + """Le texte saisi est détecté comme variable.""" + builder = IRBuilder(gemma4_port="99999") + events = self._make_events() + + ir = builder.build(events, name="Test") + + # Le texte "blocnote" et "Bonjour le monde" doivent être des variables + assert len(ir.variables) >= 1 + var_defaults = [v.default for v in ir.variables] + assert any("blocnote" in d or "Bonjour" in d for d in var_defaults) + + def test_segmentation_par_application(self): + """Les événements sont segmentés par changement d'application.""" + builder = IRBuilder(gemma4_port="99999") + events = self._make_events() + + ir = builder.build(events, name="Test") + + # Au moins 2 étapes (Explorateur → Bloc-notes) + assert len(ir.steps) >= 2 + + def test_actions_dans_les_etapes(self): + """Chaque étape contient les bonnes actions.""" + builder = IRBuilder(gemma4_port="99999") + events = self._make_events() + + ir = builder.build(events, name="Test") + + all_actions = [] + for step in ir.steps: + all_actions.extend(step.actions) + + types = [a.type for a in all_actions] + assert "click" in types + assert "type" in types + assert "key_combo" in types + + def test_workflow_ir_complet_roundtrip(self): + """Build → JSON → reload → même contenu.""" + builder = IRBuilder(gemma4_port="99999") + events = self._make_events() + + ir = builder.build(events, name="Roundtrip test", domain="compta") + json_str = ir.to_json() + ir2 = WorkflowIR.from_json(json_str) + + assert ir2.name == "Roundtrip test" + assert ir2.domain == "compta" + assert len(ir2.steps) == len(ir.steps) + assert len(ir2.variables) == len(ir.variables) + + @patch("requests.post") + def test_builder_avec_gemma4_mock(self, mock_post): + """Avec gemma4, le builder enrichit les intentions.""" + mock_resp = MagicMock() + mock_resp.ok = True + mock_resp.json.return_value = { + "message": {"content": ( + "INTENTION: Rechercher et ouvrir le Bloc-notes\n" + "AVANT: L'explorateur de fichiers est ouvert\n" + "APRÈS: Le Bloc-notes est ouvert et actif" + )} + } + mock_post.return_value = mock_resp + + builder = IRBuilder() + events = self._make_events() + + ir = builder.build(events, name="Test gemma4") + + # Au moins une étape doit avoir une intention enrichie + intents = [s.intent for s in ir.steps] + has_enriched = any("Bloc-notes" in i or "Rechercher" in i for i in intents) + assert has_enriched or len(ir.steps) >= 1 # Fallback acceptable