feat: WorkflowIR — représentation intermédiaire du savoir-faire

Format canonique entre RawTrace (capture) et ExecutionPlan (exécution).
C'est ce que Léa a COMPRIS en observant l'utilisateur.

- WorkflowIR : steps, variables, intentions, pré/postconditions
- IRBuilder : transforme les événements bruts en WorkflowIR via gemma4
- Générique : fonctionne pour TIM, compta, RH, stocks — le domaine est une couche par-dessus
- Versionné, sérialisable JSON, save/load
- Détection automatique des variables (texte saisi → substituable)
- 18 tests (format, sérialisation, builder, segmentation, variables)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-09 21:50:32 +02:00
parent 4509038bf0
commit cc673755f7
3 changed files with 894 additions and 0 deletions

365
core/workflow/ir_builder.py Normal file
View File

@@ -0,0 +1,365 @@
# core/workflow/ir_builder.py
"""
IRBuilder — Transforme une RawTrace en WorkflowIR.
C'est le "compilateur de savoir-faire" :
RawTrace (clics bruts) → WorkflowIR (connaissance structurée)
Le builder utilise gemma4 pour COMPRENDRE ce que l'utilisateur a fait :
- Segmenter les actions en étapes logiques
- Identifier l'intention de chaque étape
- Détecter les variables (données qui changent entre les exécutions)
- Définir les pré/postconditions
Le builder est appelé UNE SEULE FOIS après l'enregistrement.
Le WorkflowIR produit est ensuite réutilisé pour chaque replay.
"""
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from .workflow_ir import WorkflowIR, Step, Action, Variable
logger = logging.getLogger(__name__)
class IRBuilder:
"""Construit un WorkflowIR depuis une RawTrace (événements bruts).
Usage :
builder = IRBuilder()
ir = builder.build(
events=raw_events,
session_id="sess_xxx",
domain="tim_codage",
)
ir.save("data/workflows/")
"""
def __init__(self, gemma4_port: str = ""):
self._gemma4_port = gemma4_port or os.environ.get("GEMMA4_PORT", "11435")
self._gemma4_url = f"http://localhost:{self._gemma4_port}/api/chat"
def build(
self,
events: List[Dict[str, Any]],
session_id: str = "",
session_dir: str = "",
domain: str = "generic",
name: str = "",
) -> WorkflowIR:
"""Construire un WorkflowIR depuis des événements bruts.
Étapes :
1. Filtrer les événements parasites
2. Segmenter en étapes logiques (par changement de fenêtre/intention)
3. Pour chaque étape, identifier l'intention via gemma4
4. Détecter les variables
5. Définir pré/postconditions
"""
t_start = time.time()
# Créer le WorkflowIR vide
ir = WorkflowIR.new(
name=name or f"Workflow du {time.strftime('%d/%m/%Y %H:%M')}",
domain=domain,
learned_from=session_id,
)
# 1. Filtrer les événements utiles
actionable = self._filter_events(events)
if not actionable:
logger.warning("IRBuilder: aucun événement actionable")
return ir
# 2. Détecter les applications utilisées
ir.applications = self._detect_applications(actionable)
# 3. Segmenter en étapes logiques
segments = self._segment_into_steps(actionable)
# 4. Pour chaque segment, construire une Step
for i, segment in enumerate(segments):
step = self._build_step(
segment=segment,
step_index=i,
total_steps=len(segments),
workflow_name=ir.name,
domain=domain,
)
ir.steps.append(step)
# 5. Détecter les variables
ir.variables = self._detect_variables(ir.steps, actionable)
elapsed = time.time() - t_start
logger.info(
f"IRBuilder: WorkflowIR construit en {elapsed:.1f}s — "
f"{len(ir.steps)} étapes, {len(ir.variables)} variables, "
f"{len(ir.applications)} applications"
)
return ir
def _filter_events(self, events: List[Dict]) -> List[Dict]:
"""Filtrer les événements parasites (heartbeat, focus_change, etc.)."""
ignored_types = {"heartbeat", "focus_change", "action_result", "window_focus_change"}
result = []
for raw_evt in events:
evt = raw_evt.get("event", raw_evt)
if evt.get("type", "") not in ignored_types:
result.append(evt)
return result
def _detect_applications(self, events: List[Dict]) -> List[str]:
"""Détecter les applications utilisées."""
apps = set()
for evt in events:
title = evt.get("window", {}).get("title", "")
if title and title != "unknown_window":
for sep in [" ", " - ", ""]:
if sep in title:
apps.add(title.split(sep)[-1].strip())
break
return sorted(apps)
def _segment_into_steps(self, events: List[Dict]) -> List[List[Dict]]:
"""Segmenter les événements en étapes logiques.
Critères de coupure :
- Changement d'application (fenêtre différente)
- Pause longue (> 5s entre deux événements)
- Transition logique (clic → frappe → clic = étapes différentes)
"""
if not events:
return []
segments = []
current_segment = [events[0]]
current_app = self._get_app_name(events[0])
for evt in events[1:]:
app = self._get_app_name(evt)
evt_type = evt.get("type", "")
# Coupure par changement d'application
app_changed = app and current_app and app != current_app
# Coupure par pause longue
prev_ts = float(current_segment[-1].get("timestamp", 0))
curr_ts = float(evt.get("timestamp", 0))
long_pause = (curr_ts - prev_ts) > 5.0 if prev_ts > 0 and curr_ts > 0 else False
# Coupure par transition clic → nouveau clic (nouvelle intention)
transition = (
evt_type == "mouse_click"
and len(current_segment) >= 2
and current_segment[-1].get("type") not in ("mouse_click",)
)
if app_changed or long_pause:
if current_segment:
segments.append(current_segment)
current_segment = [evt]
current_app = app
else:
current_segment.append(evt)
if current_segment:
segments.append(current_segment)
return segments
def _get_app_name(self, evt: Dict) -> str:
"""Extraire le nom d'application depuis un événement."""
title = evt.get("window", {}).get("title", "")
for sep in [" ", " - ", ""]:
if sep in title:
return title.split(sep)[-1].strip()
return title
def _build_step(
self,
segment: List[Dict],
step_index: int,
total_steps: int,
workflow_name: str,
domain: str,
) -> Step:
"""Construire une Step depuis un segment d'événements.
Utilise gemma4 pour comprendre l'intention du segment.
"""
# Construire la description du segment pour gemma4
actions = []
for evt in segment:
action = self._event_to_action(evt)
if action:
actions.append(action)
# Description textuelle du segment
segment_desc = self._describe_segment(segment)
# Demander à gemma4 l'intention
intent, precondition, postcondition = self._analyze_intent(
segment_desc, step_index, total_steps, workflow_name, domain,
)
return Step(
step_id=f"s{step_index + 1}",
intent=intent or segment_desc,
precondition=precondition,
postcondition=postcondition,
actions=actions,
)
def _event_to_action(self, evt: Dict) -> Optional[Action]:
"""Convertir un événement brut en Action."""
evt_type = evt.get("type", "")
if evt_type == "mouse_click":
window = evt.get("window", {}).get("title", "")
return Action(
type="click",
target=window,
anchor_hint=evt.get("vision_info", {}).get("text", ""),
)
elif evt_type == "text_input":
text = evt.get("text", "")
if text:
return Action(type="type", text=text)
elif evt_type in ("key_combo", "key_press"):
keys = evt.get("keys", [])
if keys:
return Action(type="key_combo", keys=keys)
elif evt_type == "scroll":
return Action(type="scroll")
return None
def _describe_segment(self, segment: List[Dict]) -> str:
"""Décrire un segment en langage naturel (pour gemma4)."""
parts = []
window = ""
for evt in segment:
evt_type = evt.get("type", "")
w = evt.get("window", {}).get("title", "")
if w and w != window:
window = w
parts.append(f"[{w}]")
if evt_type == "mouse_click":
text = evt.get("vision_info", {}).get("text", "")
parts.append(f"clic sur '{text}'" if text else "clic")
elif evt_type == "text_input":
text = evt.get("text", "")
parts.append(f"saisie '{text[:30]}'")
elif evt_type in ("key_combo", "key_press"):
keys = evt.get("keys", [])
parts.append(f"touche {'+'.join(keys)}")
return "".join(parts) if parts else "action"
def _analyze_intent(
self,
segment_desc: str,
step_index: int,
total_steps: int,
workflow_name: str,
domain: str,
) -> tuple:
"""Demander à gemma4 de comprendre l'intention d'un segment.
Returns:
(intent, precondition, postcondition)
"""
import requests as _requests
# Charger le contexte métier
domain_prompt = ""
try:
from agent_v0.server_v1.domain_context import get_domain_context
ctx = get_domain_context(domain)
if ctx.system_prompt:
domain_prompt = f"\nContexte métier : {ctx.name}\n"
except Exception:
pass
prompt = (
f"{domain_prompt}"
f"Workflow : {workflow_name} (étape {step_index + 1}/{total_steps})\n"
f"Actions observées : {segment_desc}\n\n"
f"Réponds en 3 lignes :\n"
f"INTENTION: que veut faire l'utilisateur avec ces actions (1 phrase)\n"
f"AVANT: état attendu de l'écran avant cette étape (1 phrase)\n"
f"APRÈS: état attendu de l'écran après cette étape (1 phrase)"
)
try:
resp = _requests.post(
self._gemma4_url,
json={
"model": "gemma4:e4b",
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"think": True,
"options": {"temperature": 0.1, "num_predict": 800},
},
timeout=30,
)
if resp.ok:
content = resp.json().get("message", {}).get("content", "")
return self._parse_intent_response(content)
except Exception as e:
logger.debug(f"IRBuilder: gemma4 indisponible ({e})")
return (segment_desc, "", "")
def _parse_intent_response(self, content: str) -> tuple:
"""Parser la réponse gemma4 (INTENTION/AVANT/APRÈS)."""
intent = ""
precondition = ""
postcondition = ""
for line in content.split("\n"):
clean = line.strip()
upper = clean.upper()
if upper.startswith("INTENTION:"):
intent = clean.split(":", 1)[1].strip()
elif upper.startswith("AVANT:"):
precondition = clean.split(":", 1)[1].strip()
elif upper.startswith(("APRÈS:", "APRES:")):
postcondition = clean.split(":", 1)[1].strip()
return (intent, precondition, postcondition)
def _detect_variables(self, steps: List[Step], events: List[Dict]) -> List[Variable]:
"""Détecter les variables dans le workflow.
Une variable est une donnée qui change entre les exécutions :
- Texte saisi par l'utilisateur (noms, codes, dates)
- Données lues à l'écran (résultats de recherche)
"""
variables = []
seen_texts = set()
for step in steps:
for action in step.actions:
if action.type == "type" and action.text:
text = action.text.strip()
if text and text not in seen_texts and len(text) > 2:
seen_texts.add(text)
var_name = f"texte_{len(variables) + 1}"
variables.append(Variable(
name=var_name,
description=f"Texte saisi : '{text[:50]}'",
source="user",
default=text,
))
# Marquer l'action comme variable
action.variable = True
action.text = f"{{{var_name}}}"
return variables

View File

@@ -0,0 +1,268 @@
# core/workflow/workflow_ir.py
"""
WorkflowIR — Représentation Intermédiaire d'un workflow.
C'est la CONNAISSANCE que Léa a acquise en observant un utilisateur.
Pas les clics bruts (RawTrace), pas le plan d'exécution (ExecutionPlan).
C'est ce que Léa a COMPRIS.
Format générique — fonctionne pour n'importe quel métier :
- TIM qui code des dossiers patients
- Comptable qui saisit des factures
- RH qui édite des fiches de paie
- Logisticien qui gère des stocks
Le domaine métier est une couche par-dessus (domain_context),
pas dans le WorkflowIR lui-même.
Cycle de vie :
RawTrace (capture) → WorkflowIR (compréhension) → ExecutionPlan (exécution)
Le WorkflowIR est :
- versionné (chaque recompilation incrémente la version)
- indépendant de la résolution d'écran
- indépendant du poste cible
- paramétrable (variables substituables)
- enrichi par l'apprentissage (chaque replay améliore le IR)
"""
import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# =========================================================================
# Structures de données
# =========================================================================
@dataclass
class Variable:
"""Variable substituable dans un workflow."""
name: str # Identifiant (ex: "patient", "facture_num")
description: str = "" # Description humaine
source: str = "user" # Origine : "user", "screen", "file", "previous_step"
default: str = "" # Valeur par défaut
required: bool = True
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"source": self.source,
"default": self.default,
"required": self.required,
}
@classmethod
def from_dict(cls, d: Dict) -> "Variable":
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
@dataclass
class Action:
"""Action élémentaire dans une étape."""
type: str # click, type, key_combo, wait, scroll
target: str = "" # Description de la cible ("bouton Enregistrer")
text: str = "" # Texte à taper (pour type)
keys: List[str] = field(default_factory=list) # Touches (pour key_combo)
duration_ms: int = 0 # Durée (pour wait)
variable: bool = False # True si le texte contient une variable {var}
anchor_hint: str = "" # Indice visuel pour aider la résolution
def to_dict(self) -> Dict[str, Any]:
d = {"type": self.type}
if self.target:
d["target"] = self.target
if self.text:
d["text"] = self.text
if self.keys:
d["keys"] = self.keys
if self.duration_ms:
d["duration_ms"] = self.duration_ms
if self.variable:
d["variable"] = True
if self.anchor_hint:
d["anchor_hint"] = self.anchor_hint
return d
@classmethod
def from_dict(cls, d: Dict) -> "Action":
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
@dataclass
class Step:
"""Étape logique d'un workflow — une intention métier."""
step_id: str
intent: str # "Ouvrir le dossier", "Saisir le code"
precondition: str = "" # "L'application est sur l'écran de liste"
postcondition: str = "" # "Le dossier est affiché"
actions: List[Action] = field(default_factory=list)
is_optional: bool = False # Étape optionnelle (peut être sautée)
is_loop: bool = False # Étape répétée (pour chaque élément)
loop_variable: str = "" # Variable de boucle
def to_dict(self) -> Dict[str, Any]:
d = {
"step_id": self.step_id,
"intent": self.intent,
"actions": [a.to_dict() for a in self.actions],
}
if self.precondition:
d["precondition"] = self.precondition
if self.postcondition:
d["postcondition"] = self.postcondition
if self.is_optional:
d["is_optional"] = True
if self.is_loop:
d["is_loop"] = True
d["loop_variable"] = self.loop_variable
return d
@classmethod
def from_dict(cls, d: Dict) -> "Step":
actions = [Action.from_dict(a) for a in d.get("actions", [])]
return cls(
step_id=d["step_id"],
intent=d.get("intent", ""),
precondition=d.get("precondition", ""),
postcondition=d.get("postcondition", ""),
actions=actions,
is_optional=d.get("is_optional", False),
is_loop=d.get("is_loop", False),
loop_variable=d.get("loop_variable", ""),
)
@dataclass
class WorkflowIR:
"""Représentation Intermédiaire d'un workflow — la connaissance compilée.
C'est ce que Léa a compris en observant l'utilisateur.
Indépendant du poste, de la résolution, du runtime.
"""
workflow_id: str
version: int = 1
name: str = ""
description: str = ""
domain: str = "generic" # Domaine métier (tim_codage, compta, rh, stocks...)
learned_from: str = "" # session_id source
created_at: float = 0.0
updated_at: float = 0.0
# Contenu
variables: List[Variable] = field(default_factory=list)
steps: List[Step] = field(default_factory=list)
# Métadonnées d'apprentissage
replay_count: int = 0 # Nombre de replays effectués
success_rate: float = 0.0 # Taux de succès moyen
last_replay_at: float = 0.0
# Applications utilisées (détectées lors de l'apprentissage)
applications: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return {
"workflow_id": self.workflow_id,
"version": self.version,
"name": self.name,
"description": self.description,
"domain": self.domain,
"learned_from": self.learned_from,
"created_at": self.created_at,
"updated_at": self.updated_at,
"variables": [v.to_dict() for v in self.variables],
"steps": [s.to_dict() for s in self.steps],
"replay_count": self.replay_count,
"success_rate": round(self.success_rate, 3),
"last_replay_at": self.last_replay_at,
"applications": self.applications,
}
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent)
@classmethod
def from_dict(cls, d: Dict) -> "WorkflowIR":
variables = [Variable.from_dict(v) for v in d.get("variables", [])]
steps = [Step.from_dict(s) for s in d.get("steps", [])]
return cls(
workflow_id=d["workflow_id"],
version=d.get("version", 1),
name=d.get("name", ""),
description=d.get("description", ""),
domain=d.get("domain", "generic"),
learned_from=d.get("learned_from", ""),
created_at=d.get("created_at", 0),
updated_at=d.get("updated_at", 0),
variables=variables,
steps=steps,
replay_count=d.get("replay_count", 0),
success_rate=d.get("success_rate", 0),
last_replay_at=d.get("last_replay_at", 0),
applications=d.get("applications", []),
)
@classmethod
def from_json(cls, json_str: str) -> "WorkflowIR":
return cls.from_dict(json.loads(json_str))
def save(self, directory: str) -> Path:
"""Sauvegarder le WorkflowIR dans un fichier JSON."""
dir_path = Path(directory)
dir_path.mkdir(parents=True, exist_ok=True)
file_path = dir_path / f"{self.workflow_id}_v{self.version}.json"
file_path.write_text(self.to_json(), encoding="utf-8")
logger.info(f"WorkflowIR sauvegardé : {file_path}")
return file_path
@classmethod
def load(cls, file_path: str) -> "WorkflowIR":
"""Charger un WorkflowIR depuis un fichier JSON."""
return cls.from_json(Path(file_path).read_text(encoding="utf-8"))
def increment_version(self) -> "WorkflowIR":
"""Créer une nouvelle version du workflow (après recompilation)."""
import copy
new = copy.deepcopy(self)
new.version += 1
new.updated_at = time.time()
return new
def add_step(self, intent: str, actions: List[Dict] = None, **kwargs) -> Step:
"""Ajouter une étape au workflow."""
step = Step(
step_id=f"s{len(self.steps) + 1}",
intent=intent,
actions=[Action.from_dict(a) for a in (actions or [])],
**kwargs,
)
self.steps.append(step)
return step
def add_variable(self, name: str, **kwargs) -> Variable:
"""Ajouter une variable au workflow."""
var = Variable(name=name, **kwargs)
self.variables.append(var)
return var
@staticmethod
def new(name: str, domain: str = "generic", learned_from: str = "") -> "WorkflowIR":
"""Créer un nouveau WorkflowIR vide."""
return WorkflowIR(
workflow_id=f"wf_{uuid.uuid4().hex[:12]}",
version=1,
name=name,
domain=domain,
learned_from=learned_from,
created_at=time.time(),
updated_at=time.time(),
)