- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
547 lines
19 KiB
Python
547 lines
19 KiB
Python
"""
|
|
SessionAnalyzer - Analyse de la qualité des sessions d'entraînement
|
|
|
|
Ce module analyse:
|
|
- Qualité des screenshots (contraste, flou, artefacts)
|
|
- Cohérence du timing des actions
|
|
- Détection de doublons
|
|
- Génération de recommandations
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict, Optional, Any, Tuple
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
import numpy as np
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Dataclasses
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class FrameQuality:
|
|
"""Qualité d'une frame individuelle"""
|
|
frame_index: int
|
|
contrast_score: float # Score de contraste (0-1)
|
|
sharpness_score: float # Score de netteté (0-1)
|
|
artifact_score: float # Score d'artefacts (0=bon, 1=mauvais)
|
|
overall_score: float # Score global (0-1)
|
|
issues: List[str] = field(default_factory=list)
|
|
|
|
@property
|
|
def is_acceptable(self) -> bool:
|
|
return self.overall_score >= 0.6
|
|
|
|
|
|
@dataclass
|
|
class TimingAnalysis:
|
|
"""Analyse du timing des actions"""
|
|
mean_interval: float # Intervalle moyen entre actions (ms)
|
|
std_interval: float # Écart-type des intervalles
|
|
outlier_indices: List[int] # Indices des transitions problématiques
|
|
is_consistent: bool # True si timing cohérent
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"mean_interval": self.mean_interval,
|
|
"std_interval": self.std_interval,
|
|
"outlier_count": len(self.outlier_indices),
|
|
"is_consistent": self.is_consistent
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class DuplicateAnalysis:
|
|
"""Analyse des doublons"""
|
|
duplicate_pairs: List[Tuple[int, int]] # Paires de frames similaires
|
|
duplicate_ratio: float # Ratio de doublons
|
|
suggested_removal: List[int] # Indices suggérés pour suppression
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"duplicate_count": len(self.duplicate_pairs),
|
|
"duplicate_ratio": self.duplicate_ratio,
|
|
"suggested_removal_count": len(self.suggested_removal)
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SessionQualityReport:
|
|
"""Rapport complet de qualité d'une session"""
|
|
session_id: str
|
|
overall_score: float # Score global (0-1)
|
|
frame_qualities: List[FrameQuality] # Qualité par frame
|
|
timing_analysis: TimingAnalysis # Analyse du timing
|
|
duplicate_analysis: DuplicateAnalysis # Analyse des doublons
|
|
recommendations: List[str] # Recommandations
|
|
is_acceptable: bool # Session acceptable
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"session_id": self.session_id,
|
|
"overall_score": self.overall_score,
|
|
"frame_count": len(self.frame_qualities),
|
|
"acceptable_frames": sum(1 for f in self.frame_qualities if f.is_acceptable),
|
|
"timing": self.timing_analysis.to_dict(),
|
|
"duplicates": self.duplicate_analysis.to_dict(),
|
|
"recommendations": self.recommendations,
|
|
"is_acceptable": self.is_acceptable,
|
|
"created_at": self.created_at.isoformat()
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SessionAnalyzerConfig:
|
|
"""Configuration de l'analyseur de session"""
|
|
# Seuils de qualité d'image
|
|
min_contrast: float = 0.3
|
|
min_sharpness: float = 0.4
|
|
max_artifact_ratio: float = 0.1
|
|
|
|
# Seuils de timing
|
|
timing_outlier_factor: float = 2.0 # Facteur d'écart-type pour outliers
|
|
min_action_interval_ms: float = 100
|
|
max_action_interval_ms: float = 10000
|
|
|
|
# Seuils de doublons
|
|
duplicate_similarity_threshold: float = 0.95
|
|
max_duplicate_ratio: float = 0.3
|
|
|
|
# Seuils globaux
|
|
min_acceptable_frames_ratio: float = 0.8
|
|
min_overall_score: float = 0.6
|
|
|
|
|
|
# =============================================================================
|
|
# Analyseur de Session
|
|
# =============================================================================
|
|
|
|
class SessionAnalyzer:
|
|
"""
|
|
Analyseur de qualité des sessions d'entraînement.
|
|
|
|
Évalue:
|
|
- Qualité des screenshots (contraste, netteté, artefacts)
|
|
- Cohérence du timing des actions
|
|
- Présence de doublons
|
|
- Génère des recommandations d'amélioration
|
|
|
|
Example:
|
|
>>> analyzer = SessionAnalyzer()
|
|
>>> report = analyzer.analyze_session(session)
|
|
>>> if not report.is_acceptable:
|
|
... print(report.recommendations)
|
|
"""
|
|
|
|
def __init__(self, config: Optional[SessionAnalyzerConfig] = None):
|
|
"""
|
|
Initialiser l'analyseur.
|
|
|
|
Args:
|
|
config: Configuration (utilise défaut si None)
|
|
"""
|
|
self.config = config or SessionAnalyzerConfig()
|
|
logger.info("SessionAnalyzer initialisé")
|
|
|
|
def analyze_session(
|
|
self,
|
|
session: Any,
|
|
screenshots: Optional[List[np.ndarray]] = None
|
|
) -> SessionQualityReport:
|
|
"""
|
|
Analyser la qualité d'une session complète.
|
|
|
|
Args:
|
|
session: RawSession à analyser
|
|
screenshots: Images des screenshots (chargées si None)
|
|
|
|
Returns:
|
|
SessionQualityReport avec métriques et recommandations
|
|
"""
|
|
session_id = getattr(session, 'session_id', 'unknown')
|
|
logger.info(f"Analyse de la session {session_id}")
|
|
|
|
recommendations = []
|
|
|
|
# Charger screenshots si nécessaire
|
|
if screenshots is None:
|
|
screenshots = self._load_screenshots(session)
|
|
|
|
# 1. Analyser qualité des frames
|
|
frame_qualities = self._analyze_frame_qualities(screenshots)
|
|
|
|
# Vérifier frames problématiques
|
|
problematic_frames = [f for f in frame_qualities if not f.is_acceptable]
|
|
if problematic_frames:
|
|
recommendations.append(
|
|
f"{len(problematic_frames)} frames ont une qualité insuffisante. "
|
|
f"Considérer un ré-enregistrement avec meilleur éclairage/résolution."
|
|
)
|
|
|
|
# 2. Analyser timing
|
|
timing_analysis = self._analyze_timing(session)
|
|
|
|
if not timing_analysis.is_consistent:
|
|
recommendations.append(
|
|
f"Timing incohérent détecté ({len(timing_analysis.outlier_indices)} transitions problématiques). "
|
|
f"Vérifier les pauses anormalement longues ou actions trop rapides."
|
|
)
|
|
|
|
# 3. Analyser doublons
|
|
duplicate_analysis = self._analyze_duplicates(screenshots)
|
|
|
|
if duplicate_analysis.duplicate_ratio > self.config.max_duplicate_ratio:
|
|
recommendations.append(
|
|
f"Trop de screenshots similaires ({duplicate_analysis.duplicate_ratio:.1%}). "
|
|
f"Réduire la fréquence de capture ou optimiser le workflow."
|
|
)
|
|
|
|
# 4. Calculer score global
|
|
overall_score = self._compute_overall_score(
|
|
frame_qualities, timing_analysis, duplicate_analysis
|
|
)
|
|
|
|
# 5. Déterminer si acceptable
|
|
acceptable_frames_ratio = sum(1 for f in frame_qualities if f.is_acceptable) / max(len(frame_qualities), 1)
|
|
is_acceptable = (
|
|
overall_score >= self.config.min_overall_score and
|
|
acceptable_frames_ratio >= self.config.min_acceptable_frames_ratio and
|
|
timing_analysis.is_consistent
|
|
)
|
|
|
|
if is_acceptable:
|
|
recommendations.append("✓ Session de qualité acceptable pour l'entraînement")
|
|
else:
|
|
recommendations.append("⚠ Session nécessite des améliorations avant utilisation")
|
|
|
|
report = SessionQualityReport(
|
|
session_id=session_id,
|
|
overall_score=overall_score,
|
|
frame_qualities=frame_qualities,
|
|
timing_analysis=timing_analysis,
|
|
duplicate_analysis=duplicate_analysis,
|
|
recommendations=recommendations,
|
|
is_acceptable=is_acceptable
|
|
)
|
|
|
|
logger.info(f"Analyse terminée: score={overall_score:.3f}, acceptable={is_acceptable}")
|
|
return report
|
|
|
|
def _analyze_frame_qualities(
|
|
self,
|
|
screenshots: List[np.ndarray]
|
|
) -> List[FrameQuality]:
|
|
"""Analyser la qualité de chaque frame."""
|
|
qualities = []
|
|
|
|
for i, img in enumerate(screenshots):
|
|
if img is None:
|
|
qualities.append(FrameQuality(
|
|
frame_index=i,
|
|
contrast_score=0.0,
|
|
sharpness_score=0.0,
|
|
artifact_score=1.0,
|
|
overall_score=0.0,
|
|
issues=["Image non chargée"]
|
|
))
|
|
continue
|
|
|
|
issues = []
|
|
|
|
# Calculer contraste
|
|
contrast = self._compute_contrast(img)
|
|
if contrast < self.config.min_contrast:
|
|
issues.append(f"Faible contraste ({contrast:.2f})")
|
|
|
|
# Calculer netteté
|
|
sharpness = self._compute_sharpness(img)
|
|
if sharpness < self.config.min_sharpness:
|
|
issues.append(f"Image floue ({sharpness:.2f})")
|
|
|
|
# Détecter artefacts
|
|
artifact_score = self._detect_artifacts(img)
|
|
if artifact_score > self.config.max_artifact_ratio:
|
|
issues.append(f"Artefacts détectés ({artifact_score:.2f})")
|
|
|
|
# Score global
|
|
overall = (contrast + sharpness + (1 - artifact_score)) / 3
|
|
|
|
qualities.append(FrameQuality(
|
|
frame_index=i,
|
|
contrast_score=contrast,
|
|
sharpness_score=sharpness,
|
|
artifact_score=artifact_score,
|
|
overall_score=overall,
|
|
issues=issues
|
|
))
|
|
|
|
return qualities
|
|
|
|
def _compute_contrast(self, img: np.ndarray) -> float:
|
|
"""Calculer le score de contraste d'une image."""
|
|
if img is None or img.size == 0:
|
|
return 0.0
|
|
|
|
# Convertir en niveaux de gris si nécessaire
|
|
if len(img.shape) == 3:
|
|
gray = np.mean(img, axis=2)
|
|
else:
|
|
gray = img
|
|
|
|
# Calculer écart-type normalisé
|
|
std = np.std(gray)
|
|
max_std = 127.5 # Max théorique pour image 8-bit
|
|
|
|
return min(1.0, std / max_std)
|
|
|
|
def _compute_sharpness(self, img: np.ndarray) -> float:
|
|
"""Calculer le score de netteté d'une image."""
|
|
if img is None or img.size == 0:
|
|
return 0.0
|
|
|
|
# Convertir en niveaux de gris
|
|
if len(img.shape) == 3:
|
|
gray = np.mean(img, axis=2)
|
|
else:
|
|
gray = img
|
|
|
|
# Calculer Laplacien (mesure de netteté)
|
|
# Approximation simple sans OpenCV
|
|
laplacian = np.zeros_like(gray)
|
|
laplacian[1:-1, 1:-1] = (
|
|
gray[:-2, 1:-1] + gray[2:, 1:-1] +
|
|
gray[1:-1, :-2] + gray[1:-1, 2:] -
|
|
4 * gray[1:-1, 1:-1]
|
|
)
|
|
|
|
variance = np.var(laplacian)
|
|
|
|
# Normaliser (valeur empirique)
|
|
return min(1.0, variance / 500)
|
|
|
|
def _detect_artifacts(self, img: np.ndarray) -> float:
|
|
"""Détecter les artefacts dans une image."""
|
|
if img is None or img.size == 0:
|
|
return 1.0
|
|
|
|
# Détecter zones uniformes anormales (compression artifacts)
|
|
if len(img.shape) == 3:
|
|
gray = np.mean(img, axis=2)
|
|
else:
|
|
gray = img
|
|
|
|
# Calculer gradient local
|
|
grad_x = np.abs(np.diff(gray, axis=1))
|
|
grad_y = np.abs(np.diff(gray, axis=0))
|
|
|
|
# Zones avec gradient très faible = potentiels artefacts
|
|
low_grad_x = np.sum(grad_x < 1) / grad_x.size
|
|
low_grad_y = np.sum(grad_y < 1) / grad_y.size
|
|
|
|
artifact_ratio = (low_grad_x + low_grad_y) / 2
|
|
|
|
# Normaliser (une certaine quantité de zones uniformes est normale)
|
|
return max(0, artifact_ratio - 0.5) * 2
|
|
|
|
def _analyze_timing(self, session: Any) -> TimingAnalysis:
|
|
"""Analyser le timing des actions."""
|
|
events = getattr(session, 'events', [])
|
|
|
|
if len(events) < 2:
|
|
return TimingAnalysis(
|
|
mean_interval=0,
|
|
std_interval=0,
|
|
outlier_indices=[],
|
|
is_consistent=True
|
|
)
|
|
|
|
# Calculer intervalles entre événements
|
|
intervals = []
|
|
for i in range(1, len(events)):
|
|
t1 = getattr(events[i-1], 't', 0)
|
|
t2 = getattr(events[i], 't', 0)
|
|
interval = (t2 - t1) * 1000 # Convertir en ms
|
|
intervals.append(interval)
|
|
|
|
if not intervals:
|
|
return TimingAnalysis(
|
|
mean_interval=0,
|
|
std_interval=0,
|
|
outlier_indices=[],
|
|
is_consistent=True
|
|
)
|
|
|
|
intervals = np.array(intervals)
|
|
mean_interval = np.mean(intervals)
|
|
std_interval = np.std(intervals)
|
|
|
|
# Détecter outliers (> 2x écart-type)
|
|
outlier_indices = []
|
|
threshold = self.config.timing_outlier_factor * std_interval
|
|
|
|
for i, interval in enumerate(intervals):
|
|
if abs(interval - mean_interval) > threshold:
|
|
outlier_indices.append(i)
|
|
elif interval < self.config.min_action_interval_ms:
|
|
outlier_indices.append(i)
|
|
elif interval > self.config.max_action_interval_ms:
|
|
outlier_indices.append(i)
|
|
|
|
# Cohérent si moins de 10% d'outliers
|
|
is_consistent = len(outlier_indices) / len(intervals) < 0.1
|
|
|
|
return TimingAnalysis(
|
|
mean_interval=float(mean_interval),
|
|
std_interval=float(std_interval),
|
|
outlier_indices=outlier_indices,
|
|
is_consistent=is_consistent
|
|
)
|
|
|
|
def _analyze_duplicates(
|
|
self,
|
|
screenshots: List[np.ndarray]
|
|
) -> DuplicateAnalysis:
|
|
"""Analyser les doublons dans les screenshots."""
|
|
if len(screenshots) < 2:
|
|
return DuplicateAnalysis(
|
|
duplicate_pairs=[],
|
|
duplicate_ratio=0.0,
|
|
suggested_removal=[]
|
|
)
|
|
|
|
duplicate_pairs = []
|
|
|
|
# Comparer chaque paire de screenshots consécutifs
|
|
for i in range(len(screenshots) - 1):
|
|
if screenshots[i] is None or screenshots[i+1] is None:
|
|
continue
|
|
|
|
similarity = self._compute_image_similarity(
|
|
screenshots[i], screenshots[i+1]
|
|
)
|
|
|
|
if similarity >= self.config.duplicate_similarity_threshold:
|
|
duplicate_pairs.append((i, i+1))
|
|
|
|
# Calculer ratio
|
|
duplicate_ratio = len(duplicate_pairs) / max(len(screenshots) - 1, 1)
|
|
|
|
# Suggérer suppressions (garder première de chaque groupe)
|
|
suggested_removal = []
|
|
for pair in duplicate_pairs:
|
|
if pair[1] not in suggested_removal:
|
|
suggested_removal.append(pair[1])
|
|
|
|
return DuplicateAnalysis(
|
|
duplicate_pairs=duplicate_pairs,
|
|
duplicate_ratio=duplicate_ratio,
|
|
suggested_removal=suggested_removal
|
|
)
|
|
|
|
def _compute_image_similarity(
|
|
self,
|
|
img1: np.ndarray,
|
|
img2: np.ndarray
|
|
) -> float:
|
|
"""Calculer la similarité entre deux images."""
|
|
if img1 is None or img2 is None:
|
|
return 0.0
|
|
|
|
# Redimensionner si nécessaire
|
|
if img1.shape != img2.shape:
|
|
return 0.0
|
|
|
|
# Calculer différence normalisée
|
|
diff = np.abs(img1.astype(float) - img2.astype(float))
|
|
max_diff = 255.0 * img1.size
|
|
|
|
similarity = 1.0 - (np.sum(diff) / max_diff)
|
|
return similarity
|
|
|
|
def _compute_overall_score(
|
|
self,
|
|
frame_qualities: List[FrameQuality],
|
|
timing_analysis: TimingAnalysis,
|
|
duplicate_analysis: DuplicateAnalysis
|
|
) -> float:
|
|
"""Calculer le score global de la session."""
|
|
scores = []
|
|
|
|
# Score moyen des frames
|
|
if frame_qualities:
|
|
frame_score = np.mean([f.overall_score for f in frame_qualities])
|
|
scores.append(frame_score)
|
|
|
|
# Score de timing
|
|
if timing_analysis.is_consistent:
|
|
timing_score = 1.0
|
|
else:
|
|
outlier_ratio = len(timing_analysis.outlier_indices) / max(1, len(frame_qualities) - 1)
|
|
timing_score = max(0, 1.0 - outlier_ratio)
|
|
scores.append(timing_score)
|
|
|
|
# Score de doublons
|
|
duplicate_score = max(0, 1.0 - duplicate_analysis.duplicate_ratio / self.config.max_duplicate_ratio)
|
|
scores.append(duplicate_score)
|
|
|
|
return float(np.mean(scores)) if scores else 0.0
|
|
|
|
def _load_screenshots(self, session: Any) -> List[np.ndarray]:
|
|
"""Charger les screenshots d'une session."""
|
|
screenshots = []
|
|
|
|
session_screenshots = getattr(session, 'screenshots', [])
|
|
|
|
for screenshot in session_screenshots:
|
|
try:
|
|
path = getattr(screenshot, 'relative_path', None)
|
|
if path and Path(path).exists():
|
|
# Charger avec PIL ou numpy
|
|
try:
|
|
from PIL import Image
|
|
img = Image.open(path)
|
|
screenshots.append(np.array(img))
|
|
except ImportError:
|
|
screenshots.append(None)
|
|
else:
|
|
screenshots.append(None)
|
|
except Exception as e:
|
|
logger.warning(f"Erreur chargement screenshot: {e}")
|
|
screenshots.append(None)
|
|
|
|
return screenshots
|
|
|
|
def get_config(self) -> SessionAnalyzerConfig:
|
|
"""Récupérer la configuration."""
|
|
return self.config
|
|
|
|
|
|
# =============================================================================
|
|
# Fonctions utilitaires
|
|
# =============================================================================
|
|
|
|
def create_session_analyzer(
|
|
min_contrast: float = 0.3,
|
|
max_duplicate_ratio: float = 0.3
|
|
) -> SessionAnalyzer:
|
|
"""
|
|
Créer un analyseur avec configuration personnalisée.
|
|
|
|
Args:
|
|
min_contrast: Contraste minimum acceptable
|
|
max_duplicate_ratio: Ratio max de doublons
|
|
|
|
Returns:
|
|
SessionAnalyzer configuré
|
|
"""
|
|
config = SessionAnalyzerConfig(
|
|
min_contrast=min_contrast,
|
|
max_duplicate_ratio=max_duplicate_ratio
|
|
)
|
|
return SessionAnalyzer(config)
|