#!/usr/bin/env python3 """ Script d'amélioration automatique du système de matching. Analyse les échecs et propose/applique des améliorations automatiques: - Mise à jour des prototypes de nodes - Ajustement des seuils - Création de nouveaux nodes """ import json import sys import shutil from pathlib import Path from datetime import datetime from typing import List, Dict, Any, Optional import numpy as np import argparse class MatchingAutoImprover: """Amélioration automatique du système de matching.""" def __init__( self, failed_matches_dir: str = "data/failed_matches", workflows_dir: str = "data/workflows", dry_run: bool = True ): self.failed_matches_dir = Path(failed_matches_dir) self.workflows_dir = Path(workflows_dir) self.dry_run = dry_run self.improvements = [] def analyze_and_improve(self, min_confidence: float = 0.75) -> List[Dict[str, Any]]: """ Analyser les échecs et générer des améliorations. Args: min_confidence: Seuil minimum pour considérer une mise à jour """ print("\n🔍 Analyse des échecs de matching...") # Charger tous les rapports reports = self._load_all_reports() if not reports: print("⚠️ Aucun échec à analyser") return [] print(f"✓ {len(reports)} rapports chargés") # Identifier les améliorations possibles self.improvements = [] # 1. Nodes à mettre à jour (near misses) self._identify_prototype_updates(reports, min_confidence) # 2. Nouveaux nodes à créer self._identify_new_nodes(reports) # 3. Ajustements de seuil self._identify_threshold_adjustments(reports) return self.improvements def _load_all_reports(self) -> List[Dict[str, Any]]: """Charger tous les rapports d'échecs.""" if not self.failed_matches_dir.exists(): return [] reports = [] for match_dir in self.failed_matches_dir.iterdir(): if not match_dir.is_dir(): continue report_path = match_dir / "report.json" if report_path.exists(): try: with open(report_path, 'r') as f: report = json.load(f) report['_dir'] = match_dir reports.append(report) except: continue return reports def _identify_prototype_updates(self, reports: List[Dict], min_confidence: float): """Identifier les prototypes à mettre à jour.""" # Grouper par node_id les near misses node_near_misses = {} for report in reports: similarities = report['matching_results'].get('similarities', []) if not similarities: continue best = similarities[0] confidence = best['similarity'] # Near miss: entre min_confidence et threshold threshold = report['matching_results']['threshold'] if min_confidence <= confidence < threshold: node_id = best['node_id'] if node_id not in node_near_misses: node_near_misses[node_id] = [] node_near_misses[node_id].append({ 'report': report, 'confidence': confidence, 'embedding_path': report['_dir'] / "state_embedding.npy" }) # Proposer des mises à jour pour les nodes avec plusieurs near misses for node_id, misses in node_near_misses.items(): if len(misses) >= 3: # Au moins 3 near misses self.improvements.append({ 'type': 'UPDATE_PROTOTYPE', 'node_id': node_id, 'node_label': misses[0]['report']['matching_results']['similarities'][0]['node_label'], 'near_miss_count': len(misses), 'avg_confidence': sum(m['confidence'] for m in misses) / len(misses), 'embeddings': [m['embedding_path'] for m in misses] }) def _identify_new_nodes(self, reports: List[Dict]): """Identifier les nouveaux nodes à créer.""" # Grouper les états très différents (confidence < 0.70) new_states = [] for report in reports: confidence = report['matching_results']['best_confidence'] if confidence < 0.70: new_states.append({ 'report': report, 'confidence': confidence, 'screenshot': report['_dir'] / "screenshot.png", 'embedding': report['_dir'] / "state_embedding.npy", 'window_title': report['state']['window_title'] }) if new_states: # Grouper par fenêtre by_window = {} for state in new_states: window = state['window_title'] or 'unknown' if window not in by_window: by_window[window] = [] by_window[window].append(state) # Proposer création de nodes for window, states in by_window.items(): if len(states) >= 2: # Au moins 2 occurrences self.improvements.append({ 'type': 'CREATE_NODE', 'window_title': window, 'occurrence_count': len(states), 'avg_confidence': sum(s['confidence'] for s in states) / len(states), 'screenshots': [s['screenshot'] for s in states], 'embeddings': [s['embedding'] for s in states] }) def _identify_threshold_adjustments(self, reports: List[Dict]): """Identifier les ajustements de seuil nécessaires.""" confidences = [r['matching_results']['best_confidence'] for r in reports] if not confidences: return # Calculer statistiques sorted_conf = sorted(confidences) p90 = sorted_conf[int(len(sorted_conf) * 0.9)] current_threshold = reports[0]['matching_results']['threshold'] # Si beaucoup d'échecs ont une confiance proche du seuil near_threshold = sum(1 for c in confidences if current_threshold - 0.05 <= c < current_threshold) if near_threshold > len(confidences) * 0.3: # Plus de 30% recommended = max(0.70, p90 - 0.02) self.improvements.append({ 'type': 'ADJUST_THRESHOLD', 'current_threshold': current_threshold, 'recommended_threshold': recommended, 'reason': f"{near_threshold} échecs proches du seuil ({near_threshold/len(confidences)*100:.1f}%)", 'p90_confidence': p90 }) def apply_improvements(self, improvements: List[Dict[str, Any]] = None): """Appliquer les améliorations identifiées.""" if improvements is None: improvements = self.improvements if not improvements: print("\n⚠️ Aucune amélioration à appliquer") return print(f"\n{'🔧 SIMULATION' if self.dry_run else '🔧 APPLICATION'} DES AMÉLIORATIONS") print("="*70) for i, improvement in enumerate(improvements, 1): print(f"\n{i}. {improvement['type']}") if improvement['type'] == 'UPDATE_PROTOTYPE': self._apply_prototype_update(improvement) elif improvement['type'] == 'CREATE_NODE': self._apply_node_creation(improvement) elif improvement['type'] == 'ADJUST_THRESHOLD': self._apply_threshold_adjustment(improvement) if self.dry_run: print("\n💡 Mode simulation - Aucune modification appliquée") print(" Relancez avec --apply pour appliquer les changements") def _apply_prototype_update(self, improvement: Dict): """Appliquer une mise à jour de prototype.""" print(f" Node: {improvement['node_label']} (ID: {improvement['node_id']})") print(f" Near misses: {improvement['near_miss_count']}") print(f" Confiance moyenne: {improvement['avg_confidence']:.3f}") if not self.dry_run: # Charger tous les embeddings embeddings = [] for emb_path in improvement['embeddings']: if Path(emb_path).exists(): embeddings.append(np.load(emb_path)) if embeddings: # Calculer le nouveau prototype (moyenne) new_prototype = np.mean(embeddings, axis=0) # Sauvegarder (à adapter selon votre structure) prototype_path = self.workflows_dir / f"node_{improvement['node_id']}_prototype.npy" np.save(prototype_path, new_prototype) print(f" ✓ Prototype mis à jour: {prototype_path}") else: print(f" → Mettrait à jour le prototype avec {len(improvement['embeddings'])} embeddings") def _apply_node_creation(self, improvement: Dict): """Appliquer une création de node.""" print(f" Fenêtre: {improvement['window_title']}") print(f" Occurrences: {improvement['occurrence_count']}") print(f" Confiance moyenne: {improvement['avg_confidence']:.3f}") if not self.dry_run: # Créer un nouveau node (à adapter selon votre structure) node_id = f"node_{datetime.now().strftime('%Y%m%d_%H%M%S')}" node_dir = self.workflows_dir / node_id node_dir.mkdir(parents=True, exist_ok=True) # Copier les screenshots for i, screenshot in enumerate(improvement['screenshots']): if Path(screenshot).exists(): shutil.copy(screenshot, node_dir / f"example_{i}.png") # Calculer et sauvegarder le prototype embeddings = [] for emb_path in improvement['embeddings']: if Path(emb_path).exists(): embeddings.append(np.load(emb_path)) if embeddings: prototype = np.mean(embeddings, axis=0) np.save(node_dir / "prototype.npy", prototype) print(f" ✓ Node créé: {node_dir}") else: print(f" → Créerait un nouveau node avec {improvement['occurrence_count']} exemples") def _apply_threshold_adjustment(self, improvement: Dict): """Appliquer un ajustement de seuil.""" print(f" Seuil actuel: {improvement['current_threshold']:.3f}") print(f" Seuil recommandé: {improvement['recommended_threshold']:.3f}") print(f" Raison: {improvement['reason']}") if not self.dry_run: # Mettre à jour la configuration (à adapter) config_path = Path("config/matching_config.json") if config_path.exists(): with open(config_path, 'r') as f: config = json.load(f) config['similarity_threshold'] = improvement['recommended_threshold'] with open(config_path, 'w') as f: json.dump(config, f, indent=2) print(f" ✓ Configuration mise à jour: {config_path}") else: print(f" → Mettrait à jour le seuil dans la configuration") def print_summary(self): """Afficher un résumé des améliorations.""" print("\n" + "="*70) print("RÉSUMÉ DES AMÉLIORATIONS PROPOSÉES") print("="*70) by_type = {} for imp in self.improvements: imp_type = imp['type'] if imp_type not in by_type: by_type[imp_type] = [] by_type[imp_type].append(imp) for imp_type, imps in by_type.items(): print(f"\n{imp_type}: {len(imps)}") for imp in imps: if imp_type == 'UPDATE_PROTOTYPE': print(f" • {imp['node_label']}: {imp['near_miss_count']} near misses") elif imp_type == 'CREATE_NODE': print(f" • {imp['window_title']}: {imp['occurrence_count']} occurrences") elif imp_type == 'ADJUST_THRESHOLD': print(f" • {imp['current_threshold']:.3f} → {imp['recommended_threshold']:.3f}") def main(): parser = argparse.ArgumentParser( description="Amélioration automatique du système de matching" ) parser.add_argument( '--apply', action='store_true', help="Appliquer les améliorations (sinon mode simulation)" ) parser.add_argument( '--min-confidence', type=float, default=0.75, help="Confiance minimum pour mise à jour (défaut: 0.75)" ) args = parser.parse_args() improver = MatchingAutoImprover(dry_run=not args.apply) # Analyser improvements = improver.analyze_and_improve(min_confidence=args.min_confidence) if not improvements: print("\n✅ Aucune amélioration nécessaire") return 0 # Afficher le résumé improver.print_summary() # Appliquer improver.apply_improvements() return 0 if __name__ == '__main__': sys.exit(main())