Architecture 3 niveaux implémentée et testée (137 tests unitaires + 21 visuels) : MÉSO (acteur intelligent) : - P0 Critic : vérification sémantique post-action via gemma4 (replay_verifier.py) - P1 Observer : pré-analyse écran avant chaque action (api_stream.py /pre_analyze) - P2 Grounding/Policy : séparation localisation (grounding.py) et décision (policy.py) - P3 Recovery : rollback automatique Ctrl+Z/Escape/Alt+F4 (recovery.py) - P4 Learning : apprentissage runtime avec boucle de consolidation (replay_learner.py) MACRO (planificateur) : - TaskPlanner : comprend les ordres en langage naturel via gemma4 (task_planner.py) - Contexte métier TIM/CIM-10 pour les hôpitaux (domain_context.py) - Endpoint POST /api/v1/task pour l'exécution par instruction Traçabilité : - Audit trail complet avec 18 champs par action (audit_trail.py) - Endpoints GET /audit/history, /audit/summary, /audit/export (CSV) Grounding : - Fix parsing bbox_2d qwen2.5vl (pixels relatifs, pas grille 1000x1000) - Benchmarks visuels sur captures réelles (3 approches : baseline, zoom, Citrix) - Reproductibilité validée : variance < 0.008 sur 10 itérations Sécurité : - Tokens de production retirés du code source → .env.local - Secret key aléatoire si non configuré - Suppression logs qui leakent les tokens Résultats : 80% de replay (vs 12.5% avant), 100% détection visuelle Citrix JPEG Q20 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
597 lines
23 KiB
Python
597 lines
23 KiB
Python
# agent_v0/server_v1/task_planner.py
|
|
"""
|
|
TaskPlanner — Planificateur MACRO pour RPA Vision V3.
|
|
|
|
Responsabilité : comprendre un ordre en langage naturel et l'exécuter.
|
|
|
|
"Traite les dossiers de janvier" →
|
|
1. Comprendre l'instruction (gemma4)
|
|
2. Trouver le workflow appris correspondant
|
|
3. Identifier les paramètres/variables
|
|
4. Exécuter (replay avec substitution) ou planifier (actions libres)
|
|
|
|
C'est le niveau MACRO de l'architecture 3 niveaux :
|
|
MACRO (TaskPlanner) → décompose et orchestre
|
|
MÉSO (Policy/Observer/Critic) → décide et vérifie
|
|
MICRO (Grounding/Executor) → localise et clique
|
|
|
|
Ref: docs/PLAN_ACTEUR_V1.md — Phase 3 : Planificateur
|
|
Ref: docs/VISION_RPA_INTELLIGENT.md — "Il observe" → "Il devient autonome"
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TaskPlan:
|
|
"""Plan d'exécution généré par le planificateur."""
|
|
instruction: str # Instruction originale de l'utilisateur
|
|
understood: bool = False # L'instruction a été comprise
|
|
workflow_match: str = "" # ID du workflow correspondant (si trouvé)
|
|
workflow_name: str = "" # Nom du workflow correspondant
|
|
match_confidence: float = 0.0 # Confiance du match (0-1)
|
|
parameters: Dict[str, Any] = field(default_factory=dict) # Variables extraites
|
|
is_loop: bool = False # Boucle sur une liste d'éléments
|
|
loop_source: str = "" # Source des éléments (écran, fichier, requête)
|
|
steps: List[Dict[str, Any]] = field(default_factory=list) # Actions planifiées
|
|
mode: str = "" # "replay" (workflow connu) ou "free" (actions générées)
|
|
error: str = ""
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"instruction": self.instruction,
|
|
"understood": self.understood,
|
|
"workflow_match": self.workflow_match,
|
|
"workflow_name": self.workflow_name,
|
|
"match_confidence": round(self.match_confidence, 3),
|
|
"parameters": self.parameters,
|
|
"is_loop": self.is_loop,
|
|
"loop_source": self.loop_source,
|
|
"steps_count": len(self.steps),
|
|
"mode": self.mode,
|
|
"error": self.error,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class TaskResult:
|
|
"""Résultat de l'exécution d'une tâche."""
|
|
instruction: str
|
|
success: bool
|
|
total_items: int = 1 # Nombre d'éléments traités (1 si pas de boucle)
|
|
completed_items: int = 0
|
|
failed_items: int = 0
|
|
results: List[Dict[str, Any]] = field(default_factory=list)
|
|
elapsed_s: float = 0.0
|
|
summary: str = ""
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"instruction": self.instruction,
|
|
"success": self.success,
|
|
"total_items": self.total_items,
|
|
"completed_items": self.completed_items,
|
|
"failed_items": self.failed_items,
|
|
"elapsed_s": round(self.elapsed_s, 1),
|
|
"summary": self.summary,
|
|
}
|
|
|
|
|
|
class TaskPlanner:
|
|
"""Planificateur MACRO — comprend les instructions et orchestre l'exécution.
|
|
|
|
Usage :
|
|
planner = TaskPlanner()
|
|
plan = planner.understand("traite les dossiers de janvier")
|
|
result = planner.execute(plan, replay_callback=launch_replay)
|
|
"""
|
|
|
|
def __init__(self, gemma4_port: str = "", domain_id: str = ""):
|
|
self._gemma4_port = gemma4_port or os.environ.get("GEMMA4_PORT", "11435")
|
|
self._gemma4_url = f"http://localhost:{self._gemma4_port}/api/chat"
|
|
self._domain_id = domain_id or os.environ.get("RPA_DOMAIN", "generic")
|
|
|
|
# Charger le contexte métier
|
|
try:
|
|
from .domain_context import get_domain_context
|
|
self._domain = get_domain_context(self._domain_id)
|
|
except Exception:
|
|
self._domain = None
|
|
|
|
def understand(
|
|
self,
|
|
instruction: str,
|
|
available_workflows: Optional[List[Dict[str, Any]]] = None,
|
|
screen_context: str = "",
|
|
) -> TaskPlan:
|
|
"""Comprendre une instruction en langage naturel.
|
|
|
|
Étape 1 : gemma4 analyse l'instruction et identifie :
|
|
- Le type de tâche (ouvrir, traiter, rechercher, etc.)
|
|
- Le workflow correspondant (s'il en existe un)
|
|
- Les paramètres/variables (nom, date, fichier, etc.)
|
|
- Si c'est une boucle (traiter TOUS les dossiers)
|
|
|
|
Args:
|
|
instruction: L'ordre de l'utilisateur ("traite les dossiers de janvier")
|
|
available_workflows: Liste des workflows connus [{name, description, session_id}]
|
|
screen_context: Description de l'écran actuel (pour le contexte)
|
|
"""
|
|
import requests as _requests
|
|
|
|
plan = TaskPlan(instruction=instruction)
|
|
|
|
# Construire la liste des workflows disponibles pour le prompt (top 10)
|
|
workflows_desc = "Aucun workflow enregistré."
|
|
if available_workflows:
|
|
top_workflows = available_workflows[:10]
|
|
lines = []
|
|
for i, wf in enumerate(top_workflows):
|
|
name = wf.get("name", wf.get("session_id", f"workflow_{i}"))
|
|
desc = wf.get("description", "")
|
|
sid = wf.get("session_id", "")
|
|
# Montrer la description métier pour aider le matching sémantique
|
|
label = f"{name}"
|
|
if desc:
|
|
label += f" — {desc}"
|
|
lines.append(f" {i+1}. {label} (id={sid})")
|
|
workflows_desc = "\n".join(lines)
|
|
|
|
# Contexte métier
|
|
domain_prompt = ""
|
|
if self._domain and self._domain.system_prompt:
|
|
domain_prompt = f"\nCONTEXTE MÉTIER :\n{self._domain.system_prompt}\n"
|
|
|
|
prompt = (
|
|
f"Tu es le PLANIFICATEUR d'un robot RPA (Léa). "
|
|
f"Analyse l'ordre utilisateur et identifie le workflow correspondant.\n"
|
|
f"{domain_prompt}\n"
|
|
f"WORKFLOWS DISPONIBLES :\n{workflows_desc}\n\n"
|
|
f"ORDRE : \"{instruction}\"\n\n"
|
|
f"RÈGLE DE MATCHING :\n"
|
|
f"- Compare l'INTENTION de l'ordre avec la DESCRIPTION de chaque workflow\n"
|
|
f"- \"Ouvre le bloc-notes\" correspond à un workflow décrit \"Ouvrir Bloc-notes via recherche\"\n"
|
|
f"- Un workflow qui utilise la même application EST un match même si les mots diffèrent\n"
|
|
f"- Si aucun workflow ne correspond, réponds WORKFLOW: AUCUN\n\n"
|
|
f"Réponds EXACTEMENT dans ce format (une ligne par champ) :\n"
|
|
f"COMPRIS: OUI\n"
|
|
f"WORKFLOW: <numéro> (ou AUCUN)\n"
|
|
f"CONFIANCE: <0.0 à 1.0>\n"
|
|
f"PARAMETRES: clé1=valeur1, clé2=valeur2 (ou AUCUN)\n"
|
|
f"BOUCLE: OUI ou NON\n"
|
|
f"SOURCE_BOUCLE: écran, fichier, ou aucun\n"
|
|
f"PLAN:\n"
|
|
f"1. première étape\n"
|
|
f"2. deuxième étape\n"
|
|
)
|
|
|
|
try:
|
|
resp = _requests.post(
|
|
self._gemma4_url,
|
|
json={
|
|
"model": "gemma4:e4b",
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"stream": False,
|
|
"think": True,
|
|
"options": {"temperature": 0.2, "num_predict": 800},
|
|
},
|
|
timeout=120,
|
|
)
|
|
|
|
if not resp.ok:
|
|
plan.error = f"gemma4 HTTP {resp.status_code}"
|
|
return plan
|
|
|
|
content = resp.json().get("message", {}).get("content", "").strip()
|
|
logger.info(f"TaskPlanner: réponse gemma4 ({len(content)} chars)")
|
|
|
|
# Parser la réponse
|
|
plan = self._parse_understanding(plan, content, available_workflows)
|
|
|
|
except Exception as e:
|
|
plan.error = f"gemma4 erreur: {e}"
|
|
logger.warning(f"TaskPlanner: {plan.error}")
|
|
|
|
return plan
|
|
|
|
def _parse_understanding(
|
|
self,
|
|
plan: TaskPlan,
|
|
content: str,
|
|
available_workflows: Optional[List[Dict]] = None,
|
|
) -> TaskPlan:
|
|
"""Parser la réponse de gemma4 pour construire le plan.
|
|
|
|
Tolérant aux variations de format :
|
|
- "COMPRIS : OUI" ou "COMPRIS: oui" ou "**COMPRIS:** OUI"
|
|
- Numéros de workflow : "1", "1.", "#1", "Workflow 1"
|
|
- Paramètres : "clé=valeur" ou "clé: valeur" sur la même ligne ou les suivantes
|
|
"""
|
|
import re
|
|
|
|
# Nettoyer le markdown (gras, italique)
|
|
content_clean = re.sub(r'\*{1,2}([^*]+)\*{1,2}', r'\1', content)
|
|
|
|
in_params_section = False
|
|
in_plan_section = False
|
|
|
|
for line in content_clean.split("\n"):
|
|
line_clean = line.strip()
|
|
if not line_clean:
|
|
continue
|
|
upper = line_clean.upper()
|
|
|
|
# --- COMPRIS ---
|
|
if re.match(r'^COMPRIS\s*[:=]', upper):
|
|
val = re.split(r'[:=]', upper, 1)[1].strip()
|
|
plan.understood = "OUI" in val or "YES" in val or "TRUE" in val
|
|
in_params_section = False
|
|
in_plan_section = False
|
|
|
|
# --- WORKFLOW ---
|
|
elif re.match(r'^WORKFLOW\s*[:=]', upper):
|
|
val = line_clean.split(":", 1)[1].strip() if ":" in line_clean else line_clean.split("=", 1)[1].strip()
|
|
val_upper = val.upper().strip()
|
|
in_params_section = False
|
|
in_plan_section = False
|
|
if val_upper in ("AUCUN", "NONE", "NON", "N/A", "-", ""):
|
|
continue
|
|
# Extraire le numéro : "1", "1.", "#1", "Workflow 1", "1 (Bloc-notes)"
|
|
num_match = re.search(r'(\d+)', val)
|
|
if num_match and available_workflows:
|
|
idx = int(num_match.group(1)) - 1
|
|
if 0 <= idx < len(available_workflows):
|
|
wf = available_workflows[idx]
|
|
plan.workflow_match = wf.get("session_id", "")
|
|
plan.workflow_name = wf.get("name", "")
|
|
plan.match_confidence = 0.8
|
|
plan.mode = "replay"
|
|
|
|
# --- CONFIANCE ---
|
|
elif re.match(r'^CONFIANCE\s*[:=]', upper):
|
|
val = re.split(r'[:=]', line_clean, 1)[1].strip()
|
|
in_params_section = False
|
|
in_plan_section = False
|
|
# Extraire un float : "0.9", "0,9", "90%"
|
|
float_match = re.search(r'(\d+[.,]\d+)', val)
|
|
if float_match:
|
|
try:
|
|
plan.match_confidence = float(float_match.group(1).replace(",", "."))
|
|
except ValueError:
|
|
pass
|
|
elif "%" in val:
|
|
pct_match = re.search(r'(\d+)', val)
|
|
if pct_match:
|
|
plan.match_confidence = int(pct_match.group(1)) / 100.0
|
|
|
|
# --- PARAMETRES ---
|
|
elif re.match(r'^PARAM[EÈ]TRES?\s*[:=]', upper):
|
|
val = re.split(r'[:=]', line_clean, 1)[1].strip()
|
|
in_plan_section = False
|
|
val_upper = val.upper().strip()
|
|
if val_upper in ("AUCUN", "NONE", "NON", "N/A", "-"):
|
|
in_params_section = False
|
|
continue
|
|
# Vide = paramètres sur les lignes suivantes
|
|
in_params_section = True
|
|
if val and val_upper not in ("", ):
|
|
# Paramètres sur la même ligne : "clé1=val1, clé2=val2"
|
|
self._extract_params_from_line(val, plan)
|
|
|
|
# --- BOUCLE ---
|
|
elif re.match(r'^BOUCLE\s*[:=]', upper):
|
|
val = re.split(r'[:=]', upper, 1)[1].strip()
|
|
plan.is_loop = "OUI" in val or "YES" in val or "TRUE" in val
|
|
in_params_section = False
|
|
in_plan_section = False
|
|
|
|
# --- SOURCE_BOUCLE ---
|
|
elif re.match(r'^SOURCE[_ ]BOUCLE\s*[:=]', upper):
|
|
plan.loop_source = re.split(r'[:=]', line_clean, 1)[1].strip()
|
|
in_params_section = False
|
|
in_plan_section = False
|
|
|
|
# --- PLAN ---
|
|
elif re.match(r'^PLAN\s*[:=]?\s*$', upper) or upper == "PLAN:":
|
|
in_plan_section = True
|
|
in_params_section = False
|
|
|
|
# --- Lignes de contenu (paramètres d'abord, puis étapes) ---
|
|
elif in_params_section and ("=" in line_clean or ": " in line_clean):
|
|
self._extract_params_from_line(line_clean, plan)
|
|
|
|
elif in_plan_section and re.match(r'^(\d+[.)]\s+|- )', line_clean):
|
|
plan.steps.append({"description": line_clean})
|
|
|
|
elif re.match(r'^(\d+[.)]\s+|- )', line_clean) and not in_params_section:
|
|
# Étape numérotée en dehors d'une section explicite
|
|
plan.steps.append({"description": line_clean})
|
|
|
|
# Si pas de workflow trouvé mais compris → mode libre
|
|
if plan.understood and not plan.workflow_match:
|
|
plan.mode = "free"
|
|
|
|
return plan
|
|
|
|
@staticmethod
|
|
def _extract_params_from_line(text: str, plan: TaskPlan) -> None:
|
|
"""Extraire des paramètres clé=valeur ou clé: valeur d'une ligne."""
|
|
import re
|
|
text = text.strip().strip("- ")
|
|
# Ignorer les labels de section
|
|
if re.match(r'^(COMPRIS|WORKFLOW|BOUCLE|SOURCE|PLAN|CONFIANCE)', text.upper()):
|
|
return
|
|
# Essayer clé=valeur d'abord
|
|
if "=" in text:
|
|
for part in text.split(","):
|
|
part = part.strip()
|
|
if "=" in part:
|
|
k, v = part.split("=", 1)
|
|
k, v = k.strip().strip("- "), v.strip()
|
|
if k and v and v.upper() not in ("AUCUN", "NONE"):
|
|
plan.parameters[k] = v
|
|
# Sinon clé: valeur (mais pas les labels de section)
|
|
elif ": " in text:
|
|
k, v = text.split(": ", 1)
|
|
k, v = k.strip().strip("- "), v.strip()
|
|
if k and v and len(k) < 30 and v.upper() not in ("AUCUN", "NONE"):
|
|
plan.parameters[k] = v
|
|
|
|
def execute(
|
|
self,
|
|
plan: TaskPlan,
|
|
replay_callback=None,
|
|
machine_id: str = "default",
|
|
) -> TaskResult:
|
|
"""Exécuter un plan.
|
|
|
|
Deux modes :
|
|
1. "replay" : relancer un workflow enregistré avec substitution de variables
|
|
2. "free" : exécuter les actions planifiées par gemma4
|
|
|
|
Args:
|
|
plan: Le plan généré par understand()
|
|
replay_callback: Fonction qui lance un replay
|
|
signature: (session_id, machine_id, params) → replay_id
|
|
machine_id: Machine cible pour l'exécution
|
|
"""
|
|
t_start = time.time()
|
|
result = TaskResult(instruction=plan.instruction, success=False)
|
|
|
|
if not plan.understood:
|
|
result.summary = f"Instruction non comprise : {plan.error or 'réponse gemma4 invalide'}"
|
|
return result
|
|
|
|
if plan.mode == "replay" and plan.workflow_match:
|
|
# Mode replay : relancer un workflow connu
|
|
result = self._execute_replay(plan, replay_callback, machine_id)
|
|
|
|
elif plan.mode == "free" and plan.steps:
|
|
# Mode libre : actions planifiées par gemma4
|
|
result = self._execute_free(plan, replay_callback, machine_id)
|
|
|
|
else:
|
|
result.summary = "Pas de workflow correspondant et pas d'actions planifiées"
|
|
|
|
result.elapsed_s = time.time() - t_start
|
|
return result
|
|
|
|
def _execute_replay(
|
|
self,
|
|
plan: TaskPlan,
|
|
replay_callback,
|
|
machine_id: str,
|
|
) -> TaskResult:
|
|
"""Exécuter en mode replay (workflow connu)."""
|
|
result = TaskResult(instruction=plan.instruction, success=False)
|
|
|
|
if not replay_callback:
|
|
result.summary = "Pas de callback replay configuré"
|
|
return result
|
|
|
|
if plan.is_loop:
|
|
# Boucle : TODO — lister les éléments puis itérer
|
|
# Pour l'instant, exécution simple
|
|
logger.info(
|
|
f"TaskPlanner: boucle détectée mais pas encore implémentée, "
|
|
f"exécution simple du workflow {plan.workflow_name}"
|
|
)
|
|
|
|
try:
|
|
replay_id = replay_callback(
|
|
session_id=plan.workflow_match,
|
|
machine_id=machine_id,
|
|
params=plan.parameters,
|
|
)
|
|
result.success = True
|
|
result.completed_items = 1
|
|
result.total_items = 1
|
|
result.summary = (
|
|
f"Workflow '{plan.workflow_name}' lancé (replay={replay_id})"
|
|
f" avec paramètres {plan.parameters}" if plan.parameters else ""
|
|
)
|
|
result.results.append({
|
|
"replay_id": replay_id,
|
|
"workflow": plan.workflow_name,
|
|
"params": plan.parameters,
|
|
})
|
|
except Exception as e:
|
|
result.summary = f"Erreur lancement replay : {e}"
|
|
logger.error(f"TaskPlanner: {result.summary}")
|
|
|
|
return result
|
|
|
|
def _execute_free(
|
|
self,
|
|
plan: TaskPlan,
|
|
replay_callback,
|
|
machine_id: str,
|
|
) -> TaskResult:
|
|
"""Exécuter en mode libre (actions planifiées par gemma4)."""
|
|
result = TaskResult(instruction=plan.instruction, success=False)
|
|
|
|
# Convertir les étapes en actions replay
|
|
actions = self._steps_to_actions(plan.steps, plan.parameters)
|
|
|
|
if not actions:
|
|
result.summary = "Impossible de convertir le plan en actions exécutables"
|
|
return result
|
|
|
|
if replay_callback:
|
|
try:
|
|
replay_id = replay_callback(
|
|
actions=actions,
|
|
machine_id=machine_id,
|
|
task_description=plan.instruction,
|
|
)
|
|
result.success = True
|
|
result.completed_items = 1
|
|
result.summary = f"Plan libre exécuté ({len(actions)} actions, replay={replay_id})"
|
|
except Exception as e:
|
|
result.summary = f"Erreur exécution plan libre : {e}"
|
|
else:
|
|
result.summary = f"Plan prêt ({len(actions)} actions) mais pas de callback"
|
|
result.results = actions
|
|
|
|
return result
|
|
|
|
def _steps_to_actions(
|
|
self,
|
|
steps: List[Dict[str, Any]],
|
|
parameters: Dict[str, Any],
|
|
) -> List[Dict[str, Any]]:
|
|
"""Convertir les étapes textuelles en actions replay.
|
|
|
|
Utilise gemma4 pour traduire chaque étape en action structurée.
|
|
Les types d'actions supportés : click, type, key_combo, wait.
|
|
"""
|
|
import re
|
|
import requests as _requests
|
|
|
|
steps_text = "\n".join(
|
|
s.get("description", str(s)) for s in steps
|
|
)
|
|
|
|
prompt = (
|
|
"Convertis ces étapes RPA en actions JSON.\n\n"
|
|
f"ÉTAPES :\n{steps_text}\n\n"
|
|
f"PARAMÈTRES : {json.dumps(parameters, ensure_ascii=False)}\n\n"
|
|
"TYPES D'ACTIONS DISPONIBLES :\n"
|
|
'- Cliquer : {"type": "click", "target_spec": {"by_text": "texte du bouton"}}\n'
|
|
'- Taper du texte : {"type": "type", "text": "texte à taper"}\n'
|
|
'- Raccourci clavier : {"type": "key_combo", "keys": ["ctrl", "s"]}\n'
|
|
'- Attendre : {"type": "wait", "duration_ms": 2000}\n\n'
|
|
"RÈGLES :\n"
|
|
"- UNE action JSON par ligne\n"
|
|
"- Pas de commentaires, pas de texte autour, JUSTE le JSON\n"
|
|
"- Utilise les paramètres fournis dans les valeurs\n\n"
|
|
"ACTIONS :\n"
|
|
)
|
|
|
|
try:
|
|
resp = _requests.post(
|
|
self._gemma4_url,
|
|
json={
|
|
"model": "gemma4:e4b",
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"stream": False,
|
|
"think": True,
|
|
"options": {"temperature": 0.1, "num_predict": 1500},
|
|
},
|
|
timeout=120,
|
|
)
|
|
|
|
if not resp.ok:
|
|
return []
|
|
|
|
content = resp.json().get("message", {}).get("content", "")
|
|
return self._parse_actions_json(content)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"TaskPlanner: conversion étapes échouée : {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def _parse_actions_json(content: str) -> List[Dict[str, Any]]:
|
|
"""Parser des actions JSON depuis une réponse VLM.
|
|
|
|
Tolère :
|
|
- Un JSON par ligne
|
|
- Un tableau JSON [...]
|
|
- Du texte autour des JSON (markdown, commentaires)
|
|
- Des objets imbriqués (target_spec)
|
|
"""
|
|
import re
|
|
|
|
actions = []
|
|
valid_types = {"click", "type", "key_combo", "wait"}
|
|
|
|
# Stratégie 1 : essayer de parser comme un tableau JSON
|
|
array_match = re.search(r'\[[\s\S]*\]', content)
|
|
if array_match:
|
|
try:
|
|
parsed = json.loads(array_match.group())
|
|
if isinstance(parsed, list):
|
|
for item in parsed:
|
|
if isinstance(item, dict) and item.get("type") in valid_types:
|
|
if item["type"] == "click":
|
|
item["visual_mode"] = True
|
|
actions.append(item)
|
|
if actions:
|
|
return actions
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Stratégie 2 : extraire les objets JSON individuels (supporte imbrication)
|
|
# Trouver chaque { ... } en gérant les accolades imbriquées
|
|
i = 0
|
|
while i < len(content):
|
|
if content[i] == '{':
|
|
depth = 0
|
|
start = i
|
|
while i < len(content):
|
|
if content[i] == '{':
|
|
depth += 1
|
|
elif content[i] == '}':
|
|
depth -= 1
|
|
if depth == 0:
|
|
candidate = content[start:i+1]
|
|
try:
|
|
action = json.loads(candidate)
|
|
if isinstance(action, dict) and action.get("type") in valid_types:
|
|
if action["type"] == "click":
|
|
action["visual_mode"] = True
|
|
actions.append(action)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
break
|
|
i += 1
|
|
i += 1
|
|
|
|
return actions
|
|
|
|
def list_capabilities(
|
|
self,
|
|
available_workflows: List[Dict[str, Any]],
|
|
) -> str:
|
|
"""Lister ce que Léa sait faire (pour l'interface utilisateur)."""
|
|
if not available_workflows:
|
|
return "Léa n'a pas encore appris de workflows. Enregistrez-en un d'abord."
|
|
|
|
lines = ["Léa sait faire :"]
|
|
for wf in available_workflows:
|
|
name = wf.get("name", "?")
|
|
desc = wf.get("description", "")
|
|
lines.append(f" - {name}" + (f" ({desc})" if desc else ""))
|
|
|
|
lines.append("")
|
|
lines.append("Dites-lui ce que vous voulez faire en langage naturel.")
|
|
return "\n".join(lines)
|