Architecture 3 niveaux implémentée et testée (137 tests unitaires + 21 visuels) : MÉSO (acteur intelligent) : - P0 Critic : vérification sémantique post-action via gemma4 (replay_verifier.py) - P1 Observer : pré-analyse écran avant chaque action (api_stream.py /pre_analyze) - P2 Grounding/Policy : séparation localisation (grounding.py) et décision (policy.py) - P3 Recovery : rollback automatique Ctrl+Z/Escape/Alt+F4 (recovery.py) - P4 Learning : apprentissage runtime avec boucle de consolidation (replay_learner.py) MACRO (planificateur) : - TaskPlanner : comprend les ordres en langage naturel via gemma4 (task_planner.py) - Contexte métier TIM/CIM-10 pour les hôpitaux (domain_context.py) - Endpoint POST /api/v1/task pour l'exécution par instruction Traçabilité : - Audit trail complet avec 18 champs par action (audit_trail.py) - Endpoints GET /audit/history, /audit/summary, /audit/export (CSV) Grounding : - Fix parsing bbox_2d qwen2.5vl (pixels relatifs, pas grille 1000x1000) - Benchmarks visuels sur captures réelles (3 approches : baseline, zoom, Citrix) - Reproductibilité validée : variance < 0.008 sur 10 itérations Sécurité : - Tokens de production retirés du code source → .env.local - Secret key aléatoire si non configuré - Suppression logs qui leakent les tokens Résultats : 80% de replay (vs 12.5% avant), 100% détection visuelle Citrix JPEG Q20 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
347 lines
12 KiB
Python
347 lines
12 KiB
Python
# agent_v0/server_v1/replay_learner.py
|
|
"""
|
|
Module Learning — apprentissage à partir des résultats de replay.
|
|
|
|
Responsabilité : "Chaque replay qui échoue enrichit notre base de connaissances."
|
|
|
|
Stocke les résultats structurés de chaque action (succès/échec, méthode,
|
|
screenshots, correction appliquée) pour :
|
|
1. Améliorer les décisions futures (Policy)
|
|
2. Affiner les stratégies de grounding (quel méthode marche pour quel écran)
|
|
3. Détecter les patterns récurrents d'échec
|
|
4. Alimenter le fine-tuning futur du VLM
|
|
|
|
Format inspiré du cahier des charges (docs/VISION_RPA_INTELLIGENT.md) :
|
|
{
|
|
"screenshot_before": "base64...",
|
|
"action": {"type": "click", "target": "Bouton Valider", ...},
|
|
"screenshot_after": "base64...",
|
|
"success": true,
|
|
"resolution_method": "som_text_match",
|
|
"correction": null,
|
|
"human_validated": false
|
|
}
|
|
|
|
Ref: docs/VISION_RPA_INTELLIGENT.md — Boucle d'apprentissage (section 4)
|
|
Ref: docs/PLAN_ACTEUR_V1.md — Phase 3 : apprentissage continu
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass, field, asdict
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Répertoire par défaut pour le stockage des résultats d'apprentissage
|
|
_DEFAULT_LEARNING_DIR = os.environ.get(
|
|
"RPA_LEARNING_DIR", "data/learning/replay_results"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class ActionOutcome:
|
|
"""Résultat structuré d'une action de replay."""
|
|
# Identifiants
|
|
session_id: str
|
|
action_id: str
|
|
action_type: str # click, type, key_combo
|
|
timestamp: float = 0.0 # Epoch
|
|
|
|
# Contexte
|
|
target_description: str = "" # "Clic sur 'Enregistrer' dans Bloc-notes"
|
|
intention: str = "" # "Sauvegarder le fichier"
|
|
window_title: str = ""
|
|
|
|
# Résolution
|
|
resolution_method: str = "" # server_som, anchor_template, vlm_direct...
|
|
resolution_score: float = 0.0
|
|
resolution_elapsed_ms: float = 0.0
|
|
|
|
# Résultat
|
|
success: bool = False
|
|
error: str = ""
|
|
warning: str = ""
|
|
|
|
# Vérification (Critic)
|
|
pixel_verified: Optional[bool] = None
|
|
semantic_verified: Optional[bool] = None
|
|
critic_detail: str = ""
|
|
|
|
# Recovery
|
|
recovery_action: str = "" # undo, escape, close, none
|
|
recovery_success: bool = False
|
|
|
|
# Screenshots (chemins relatifs, pas base64 — trop lourd)
|
|
screenshot_before_path: str = ""
|
|
screenshot_after_path: str = ""
|
|
|
|
# Correction humaine (feedback loop)
|
|
human_validated: bool = False
|
|
human_correction: str = "" # Description de la correction
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return asdict(self)
|
|
|
|
|
|
class ReplayLearner:
|
|
"""Apprentissage à partir des résultats de replay.
|
|
|
|
Stocke chaque action dans un fichier JSONL par session.
|
|
Fournit des requêtes pour améliorer les décisions futures.
|
|
|
|
Usage côté serveur (api_stream.py) :
|
|
learner = ReplayLearner()
|
|
learner.record(outcome)
|
|
|
|
Usage côté Policy :
|
|
history = learner.query_similar(target_description, window_title)
|
|
# → "La dernière fois, template matching a échoué mais SoM a trouvé"
|
|
"""
|
|
|
|
def __init__(self, learning_dir: str = ""):
|
|
self.learning_dir = Path(learning_dir or _DEFAULT_LEARNING_DIR)
|
|
self.learning_dir.mkdir(parents=True, exist_ok=True)
|
|
# Cache mémoire des derniers résultats (pour requêtes rapides)
|
|
self._recent: List[ActionOutcome] = []
|
|
self._max_recent = 500
|
|
|
|
def record(self, outcome: ActionOutcome) -> None:
|
|
"""Enregistrer le résultat d'une action.
|
|
|
|
Écrit en append dans un fichier JSONL par session.
|
|
Garde aussi en mémoire pour les requêtes rapides.
|
|
"""
|
|
if not outcome.timestamp:
|
|
outcome.timestamp = time.time()
|
|
|
|
# Fichier JSONL par session
|
|
session_file = self.learning_dir / f"{outcome.session_id}.jsonl"
|
|
try:
|
|
with open(session_file, "a") as f:
|
|
f.write(json.dumps(outcome.to_dict(), ensure_ascii=False) + "\n")
|
|
except Exception as e:
|
|
logger.warning(f"Learning: échec écriture {session_file}: {e}")
|
|
|
|
# Cache mémoire
|
|
self._recent.append(outcome)
|
|
if len(self._recent) > self._max_recent:
|
|
self._recent = self._recent[-self._max_recent:]
|
|
|
|
# Log résumé
|
|
status = "OK" if outcome.success else "ÉCHEC"
|
|
logger.info(
|
|
f"Learning: {status} {outcome.action_type} "
|
|
f"'{outcome.target_description[:40]}' "
|
|
f"[{outcome.resolution_method}] "
|
|
f"critic={'OK' if outcome.semantic_verified else 'NON' if outcome.semantic_verified is False else '?'}"
|
|
)
|
|
|
|
def record_from_replay_result(
|
|
self,
|
|
session_id: str,
|
|
action: Dict[str, Any],
|
|
result: Dict[str, Any],
|
|
verification: Optional[Dict] = None,
|
|
) -> None:
|
|
"""Enregistrer depuis les structures existantes du replay.
|
|
|
|
Convertit le format action/result du replay en ActionOutcome.
|
|
Appelé depuis api_stream.py après chaque action de replay.
|
|
"""
|
|
target_spec = action.get("target_spec", {})
|
|
outcome = ActionOutcome(
|
|
session_id=session_id,
|
|
action_id=action.get("action_id", ""),
|
|
action_type=action.get("type", ""),
|
|
target_description=target_spec.get("by_text", ""),
|
|
intention=action.get("intention", ""),
|
|
window_title=target_spec.get("window_title", ""),
|
|
resolution_method=result.get("resolution_method", ""),
|
|
resolution_score=result.get("resolution_score", 0.0),
|
|
resolution_elapsed_ms=result.get("resolution_elapsed_ms", 0.0),
|
|
success=result.get("success", False),
|
|
error=result.get("error", ""),
|
|
warning=result.get("warning", ""),
|
|
)
|
|
|
|
if verification:
|
|
outcome.pixel_verified = verification.get("verified")
|
|
outcome.semantic_verified = verification.get("semantic_verified")
|
|
outcome.critic_detail = verification.get("semantic_detail", "")
|
|
|
|
self.record(outcome)
|
|
|
|
def query_similar(
|
|
self,
|
|
target_description: str = "",
|
|
window_title: str = "",
|
|
limit: int = 10,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Chercher des résultats similaires dans l'historique.
|
|
|
|
Recherche par correspondance textuelle sur la description de cible
|
|
et le titre de fenêtre. Retourne les plus récents en premier.
|
|
|
|
Utile pour le Policy : "qu'est-ce qui a marché avant pour cette cible ?"
|
|
"""
|
|
results = []
|
|
target_lower = target_description.lower()
|
|
window_lower = window_title.lower()
|
|
|
|
for outcome in reversed(self._recent):
|
|
score = 0
|
|
if target_lower and target_lower in outcome.target_description.lower():
|
|
score += 2
|
|
if window_lower and window_lower in outcome.window_title.lower():
|
|
score += 1
|
|
if score > 0:
|
|
results.append({
|
|
"outcome": outcome.to_dict(),
|
|
"relevance": score,
|
|
})
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
return sorted(results, key=lambda x: x["relevance"], reverse=True)
|
|
|
|
def best_strategy_for(
|
|
self,
|
|
target_description: str = "",
|
|
window_title: str = "",
|
|
) -> Optional[str]:
|
|
"""Quelle méthode de grounding a le mieux marché pour cette cible ?
|
|
|
|
Consulte l'historique et retourne la méthode qui a le plus haut
|
|
taux de succès pour des cibles similaires. C'est la boucle
|
|
d'apprentissage : les replays passés améliorent les suivants.
|
|
|
|
Returns:
|
|
Nom de la meilleure méthode (ex: "som_text_match") ou None
|
|
"""
|
|
similar = self.query_similar(target_description, window_title, limit=20)
|
|
if not similar:
|
|
return None
|
|
|
|
# Compter les succès par méthode
|
|
method_stats: Dict[str, List[int]] = {} # method → [successes, total]
|
|
for entry in similar:
|
|
outcome = entry["outcome"]
|
|
method = outcome.get("resolution_method", "")
|
|
if not method:
|
|
continue
|
|
if method not in method_stats:
|
|
method_stats[method] = [0, 0]
|
|
method_stats[method][1] += 1
|
|
if outcome.get("success"):
|
|
method_stats[method][0] += 1
|
|
|
|
if not method_stats:
|
|
return None
|
|
|
|
# Retourner la méthode avec le meilleur taux de succès (minimum 2 occurrences)
|
|
best = None
|
|
best_rate = 0.0
|
|
for method, (successes, total) in method_stats.items():
|
|
if total >= 2: # Au moins 2 essais pour être significatif
|
|
rate = successes / total
|
|
if rate > best_rate:
|
|
best_rate = rate
|
|
best = method
|
|
|
|
if best:
|
|
logger.info(
|
|
f"Learning: meilleure stratégie pour '{target_description[:30]}' → "
|
|
f"{best} ({best_rate:.0%} sur {method_stats[best][1]} essais)"
|
|
)
|
|
|
|
return best
|
|
|
|
def consolidate_workflow(
|
|
self,
|
|
actions: list,
|
|
session_id: str = "",
|
|
) -> int:
|
|
"""Consolider un workflow avec les apprentissages passés.
|
|
|
|
Pour chaque action du workflow, vérifie si l'historique suggère
|
|
une meilleure stratégie de résolution. Si oui, l'ajoute en
|
|
hint dans le target_spec de l'action.
|
|
|
|
Modifie les actions in-place. Retourne le nombre d'actions enrichies.
|
|
|
|
C'est la cross-pollination : un replay qui a réussi "Enregistrer"
|
|
via som_text améliore tous les futurs workflows qui cliquent sur "Enregistrer".
|
|
"""
|
|
enriched = 0
|
|
for action in actions:
|
|
if action.get("type") != "click":
|
|
continue
|
|
target_spec = action.get("target_spec", {})
|
|
by_text = target_spec.get("by_text", "")
|
|
window = target_spec.get("window_title", "")
|
|
if not by_text:
|
|
continue
|
|
|
|
best = self.best_strategy_for(by_text, window)
|
|
if best:
|
|
target_spec["_learned_strategy"] = best
|
|
enriched += 1
|
|
|
|
if enriched:
|
|
logger.info(
|
|
f"Consolidation : {enriched} actions enrichies par l'apprentissage "
|
|
f"(session {session_id})"
|
|
)
|
|
return enriched
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Statistiques globales des résultats de replay."""
|
|
if not self._recent:
|
|
return {"total": 0}
|
|
|
|
total = len(self._recent)
|
|
successes = sum(1 for o in self._recent if o.success)
|
|
methods = {}
|
|
for o in self._recent:
|
|
m = o.resolution_method or "unknown"
|
|
if m not in methods:
|
|
methods[m] = {"total": 0, "success": 0}
|
|
methods[m]["total"] += 1
|
|
if o.success:
|
|
methods[m]["success"] += 1
|
|
|
|
return {
|
|
"total": total,
|
|
"success_rate": round(successes / total, 3) if total > 0 else 0,
|
|
"methods": {
|
|
m: {
|
|
"total": v["total"],
|
|
"success_rate": round(v["success"] / v["total"], 3) if v["total"] > 0 else 0,
|
|
}
|
|
for m, v in methods.items()
|
|
},
|
|
}
|
|
|
|
def load_session(self, session_id: str) -> List[ActionOutcome]:
|
|
"""Charger tous les résultats d'une session depuis le fichier JSONL."""
|
|
session_file = self.learning_dir / f"{session_id}.jsonl"
|
|
if not session_file.is_file():
|
|
return []
|
|
|
|
outcomes = []
|
|
try:
|
|
with open(session_file) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
data = json.loads(line)
|
|
outcomes.append(ActionOutcome(**data))
|
|
except Exception as e:
|
|
logger.warning(f"Learning: échec lecture {session_file}: {e}")
|
|
|
|
return outcomes
|