Refonte majeure du système Agent Chat et ajout de nombreux modules : - Chat unifié : suppression du dual Workflows/Agent Libre, tout passe par /api/chat avec résolution en 3 niveaux (workflow → geste → "montre-moi") - GestureCatalog : 38 raccourcis clavier universels Windows avec matching sémantique, substitution automatique dans les replays, et endpoint /api/gestures - Mode Copilot : exécution pas-à-pas des workflows avec validation humaine via WebSocket (approve/skip/abort) avant chaque action - Léa UI (agent_v0/lea_ui/) : interface PyQt5 pour Windows avec overlay transparent pour feedback visuel pendant le replay - Data Extraction (core/extraction/) : moteur d'extraction visuelle de données (OCR + VLM → SQLite), avec schémas YAML et export CSV/Excel - ReplayVerifier (agent_v0/server_v1/) : vérification post-action par comparaison de screenshots, avec logique de retry (max 3) - IntentParser durci : meilleur fallback regex, type GREETING, patterns améliorés - Dashboard : nouvelles pages gestures, streaming, extractions - Tests : 63 tests GestureCatalog, 47 tests extraction, corrections tests existants - Dépréciation : /api/agent/plan et /api/agent/execute retournent HTTP 410, suppression du code hardcodé _plan_to_replay_actions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
364 lines
13 KiB
Python
364 lines
13 KiB
Python
"""
|
|
Modèles de base standardisés avec Pydantic - Tâche 4
|
|
|
|
Contrats de données standardisés pour assurer la cohérence entre tous les composants :
|
|
- BBox : Format exclusif (x, y, width, height) avec validation Pydantic
|
|
- Timestamp : Objets datetime uniquement
|
|
- IDs : Strings uniquement avec validation
|
|
|
|
Auteur : Dom, Alice Kiro
|
|
Date : 20 décembre 2024
|
|
"""
|
|
|
|
from pydantic import BaseModel, Field, validator
|
|
from typing import Tuple, Union, Dict, Any, Optional
|
|
from datetime import datetime
|
|
import uuid
|
|
|
|
|
|
class BBox(BaseModel):
|
|
"""
|
|
Bounding box standardisée au format (x, y, width, height)
|
|
|
|
Exigence 4.1 : Format exclusif (x, y, width, height) avec validation Pydantic
|
|
"""
|
|
x: int = Field(..., ge=0, description="Position X (coin supérieur gauche)")
|
|
y: int = Field(..., ge=0, description="Position Y (coin supérieur gauche)")
|
|
width: int = Field(..., gt=0, description="Largeur")
|
|
height: int = Field(..., gt=0, description="Hauteur")
|
|
|
|
@validator('x', 'y', pre=True)
|
|
def validate_coordinates(cls, v):
|
|
"""Valider que les coordonnées sont non-négatives"""
|
|
if isinstance(v, (int, float)):
|
|
if v < 0:
|
|
raise ValueError("Coordinates must be non-negative")
|
|
return int(v)
|
|
raise ValueError("Coordinates must be numeric")
|
|
|
|
@validator('width', 'height', pre=True)
|
|
def validate_dimensions(cls, v):
|
|
"""Valider que les dimensions sont positives"""
|
|
if isinstance(v, (int, float)):
|
|
if v <= 0:
|
|
raise ValueError("Dimensions must be positive")
|
|
return int(v)
|
|
raise ValueError("Dimensions must be numeric")
|
|
|
|
def __iter__(self):
|
|
"""Permet le unpacking: x, y, w, h = bbox"""
|
|
return iter((self.x, self.y, self.width, self.height))
|
|
|
|
def __getitem__(self, index):
|
|
"""Permet l'accès par index: bbox[0], bbox[1], etc."""
|
|
return (self.x, self.y, self.width, self.height)[index]
|
|
|
|
def __len__(self):
|
|
return 4
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, BBox):
|
|
return (self.x == other.x and self.y == other.y and
|
|
self.width == other.width and self.height == other.height)
|
|
if isinstance(other, (tuple, list)) and len(other) == 4:
|
|
return (self.x, self.y, self.width, self.height) == tuple(other)
|
|
return NotImplemented
|
|
|
|
def to_tuple(self) -> Tuple[int, int, int, int]:
|
|
"""Conversion vers tuple (x, y, w, h)"""
|
|
return (self.x, self.y, self.width, self.height)
|
|
|
|
@classmethod
|
|
def from_tuple(cls, bbox_tuple: Tuple[int, int, int, int]) -> 'BBox':
|
|
"""Création depuis tuple (x, y, w, h)"""
|
|
if len(bbox_tuple) != 4:
|
|
raise ValueError("BBox tuple must have exactly 4 elements")
|
|
return cls(x=bbox_tuple[0], y=bbox_tuple[1], width=bbox_tuple[2], height=bbox_tuple[3])
|
|
|
|
@classmethod
|
|
def from_xyxy(cls, x1: int, y1: int, x2: int, y2: int) -> 'BBox':
|
|
"""Conversion depuis format (x1, y1, x2, y2)"""
|
|
return cls(
|
|
x=min(x1, x2),
|
|
y=min(y1, y2),
|
|
width=abs(x2 - x1),
|
|
height=abs(y2 - y1)
|
|
)
|
|
|
|
def to_xyxy(self) -> Tuple[int, int, int, int]:
|
|
"""Conversion vers format (x1, y1, x2, y2)"""
|
|
return (self.x, self.y, self.x + self.width, self.y + self.height)
|
|
|
|
def center(self) -> Tuple[int, int]:
|
|
"""Calculer le centre de la bbox"""
|
|
return (self.x + self.width // 2, self.y + self.height // 2)
|
|
|
|
def area(self) -> int:
|
|
"""Calculer l'aire de la bbox"""
|
|
return self.width * self.height
|
|
|
|
def contains_point(self, x: int, y: int) -> bool:
|
|
"""Vérifier si un point est dans la bbox"""
|
|
return (self.x <= x <= self.x + self.width and
|
|
self.y <= y <= self.y + self.height)
|
|
|
|
def intersects(self, other: 'BBox') -> bool:
|
|
"""Vérifier si cette bbox intersecte avec une autre"""
|
|
return not (self.x + self.width < other.x or
|
|
other.x + other.width < self.x or
|
|
self.y + self.height < other.y or
|
|
other.y + other.height < self.y)
|
|
|
|
def intersection(self, other: 'BBox') -> Optional['BBox']:
|
|
"""Calculer l'intersection avec une autre bbox"""
|
|
if not self.intersects(other):
|
|
return None
|
|
|
|
x1 = max(self.x, other.x)
|
|
y1 = max(self.y, other.y)
|
|
x2 = min(self.x + self.width, other.x + other.width)
|
|
y2 = min(self.y + self.height, other.y + other.height)
|
|
|
|
return BBox(x=x1, y=y1, width=x2-x1, height=y2-y1)
|
|
|
|
def union(self, other: 'BBox') -> 'BBox':
|
|
"""Calculer l'union avec une autre bbox"""
|
|
x1 = min(self.x, other.x)
|
|
y1 = min(self.y, other.y)
|
|
x2 = max(self.x + self.width, other.x + other.width)
|
|
y2 = max(self.y + self.height, other.y + other.height)
|
|
|
|
return BBox(x=x1, y=y1, width=x2-x1, height=y2-y1)
|
|
|
|
|
|
class Point(BaseModel):
|
|
"""
|
|
Point 2D standardisé
|
|
|
|
Représente un point avec coordonnées x, y
|
|
"""
|
|
x: int = Field(..., description="Coordonnée X")
|
|
y: int = Field(..., description="Coordonnée Y")
|
|
|
|
@validator('x', 'y', pre=True)
|
|
def validate_coordinates(cls, v):
|
|
"""Valider que les coordonnées sont numériques"""
|
|
if isinstance(v, (int, float)):
|
|
return int(v)
|
|
raise ValueError("Coordinates must be numeric")
|
|
|
|
def to_tuple(self) -> Tuple[int, int]:
|
|
"""Conversion vers tuple (x, y)"""
|
|
return (self.x, self.y)
|
|
|
|
@classmethod
|
|
def from_tuple(cls, point_tuple: Tuple[int, int]) -> 'Point':
|
|
"""Création depuis tuple (x, y)"""
|
|
if len(point_tuple) != 2:
|
|
raise ValueError("Point tuple must have exactly 2 elements")
|
|
return cls(x=point_tuple[0], y=point_tuple[1])
|
|
|
|
def distance_to(self, other: 'Point') -> float:
|
|
"""Calculer la distance euclidienne vers un autre point"""
|
|
return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5
|
|
|
|
def is_inside_bbox(self, bbox: BBox) -> bool:
|
|
"""Vérifier si ce point est dans une bbox"""
|
|
return bbox.contains_point(self.x, self.y)
|
|
|
|
|
|
class Timestamp(BaseModel):
|
|
"""
|
|
Timestamp standardisé avec datetime
|
|
|
|
Exigence 4.2 : Objets datetime uniquement avec utilitaires de conversion
|
|
"""
|
|
value: datetime = Field(default_factory=datetime.now, description="Valeur datetime")
|
|
|
|
@validator('value', pre=True)
|
|
def validate_datetime(cls, v):
|
|
"""Valider et convertir vers datetime"""
|
|
if isinstance(v, datetime):
|
|
return v
|
|
elif isinstance(v, str):
|
|
try:
|
|
return datetime.fromisoformat(v.replace('Z', '+00:00'))
|
|
except ValueError:
|
|
raise ValueError(f"Cannot parse datetime string: {v}")
|
|
elif isinstance(v, (int, float)):
|
|
try:
|
|
return datetime.fromtimestamp(v)
|
|
except (ValueError, OSError):
|
|
raise ValueError(f"Cannot convert timestamp to datetime: {v}")
|
|
else:
|
|
raise ValueError(f"Cannot convert {type(v)} to datetime")
|
|
|
|
def to_iso(self) -> str:
|
|
"""Conversion vers format ISO"""
|
|
return self.value.isoformat()
|
|
|
|
def to_timestamp(self) -> float:
|
|
"""Conversion vers timestamp Unix"""
|
|
return self.value.timestamp()
|
|
|
|
@classmethod
|
|
def now(cls) -> 'Timestamp':
|
|
"""Créer un timestamp pour maintenant"""
|
|
return cls(value=datetime.now())
|
|
|
|
@classmethod
|
|
def from_iso(cls, iso_string: str) -> 'Timestamp':
|
|
"""Créer depuis string ISO"""
|
|
return cls(value=datetime.fromisoformat(iso_string.replace('Z', '+00:00')))
|
|
|
|
@classmethod
|
|
def from_timestamp(cls, timestamp: float) -> 'Timestamp':
|
|
"""Créer depuis timestamp Unix"""
|
|
return cls(value=datetime.fromtimestamp(timestamp))
|
|
|
|
|
|
class StandardID(BaseModel):
|
|
"""
|
|
ID standardisé en string
|
|
|
|
Exigence 4.3 : IDs en strings uniquement avec validation
|
|
"""
|
|
value: str = Field(..., min_length=1, description="Valeur de l'ID")
|
|
|
|
@validator('value', pre=True)
|
|
def validate_id(cls, v):
|
|
"""Valider et convertir vers string"""
|
|
if isinstance(v, str):
|
|
if not v.strip():
|
|
raise ValueError("ID cannot be empty")
|
|
return v.strip()
|
|
elif isinstance(v, (int, float)):
|
|
return str(v)
|
|
elif isinstance(v, uuid.UUID):
|
|
return str(v)
|
|
else:
|
|
raise ValueError(f"Cannot convert {type(v)} to ID string")
|
|
|
|
def __str__(self) -> str:
|
|
return self.value
|
|
|
|
def __eq__(self, other) -> bool:
|
|
if isinstance(other, StandardID):
|
|
return self.value == other.value
|
|
elif isinstance(other, str):
|
|
return self.value == other
|
|
return False
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.value)
|
|
|
|
@classmethod
|
|
def generate(cls) -> 'StandardID':
|
|
"""Générer un nouvel ID unique"""
|
|
return cls(value=str(uuid.uuid4()))
|
|
|
|
@classmethod
|
|
def from_uuid(cls, uuid_obj: uuid.UUID) -> 'StandardID':
|
|
"""Créer depuis UUID"""
|
|
return cls(value=str(uuid_obj))
|
|
|
|
|
|
# Utilitaires de conversion pour la migration
|
|
class DataConverter:
|
|
"""
|
|
Utilitaires de conversion sûrs pour la migration vers les nouveaux contrats
|
|
|
|
Exigence 4.4 : Assurer la compatibilité ascendante pendant la migration
|
|
"""
|
|
|
|
@staticmethod
|
|
def ensure_bbox(bbox: Union[BBox, Tuple, list, Dict, Any]) -> BBox:
|
|
"""Assurer que bbox est au format BBox standardisé"""
|
|
if isinstance(bbox, BBox):
|
|
return bbox
|
|
elif isinstance(bbox, (tuple, list)) and len(bbox) == 4:
|
|
return BBox.from_tuple(tuple(bbox))
|
|
elif isinstance(bbox, dict):
|
|
if all(k in bbox for k in ['x', 'y', 'width', 'height']):
|
|
return BBox(**bbox)
|
|
elif all(k in bbox for k in ['x1', 'y1', 'x2', 'y2']):
|
|
return BBox.from_xyxy(bbox['x1'], bbox['y1'], bbox['x2'], bbox['y2'])
|
|
else:
|
|
raise ValueError(f"Cannot convert dict to BBox: missing required keys")
|
|
else:
|
|
raise ValueError(f"Cannot convert {type(bbox)} to BBox")
|
|
|
|
@staticmethod
|
|
def ensure_timestamp(timestamp: Union[Timestamp, datetime, str, int, float, Any]) -> Timestamp:
|
|
"""Assurer que timestamp est un objet Timestamp standardisé"""
|
|
if isinstance(timestamp, Timestamp):
|
|
return timestamp
|
|
else:
|
|
return Timestamp(value=timestamp)
|
|
|
|
@staticmethod
|
|
def ensure_id(id_value: Union[StandardID, str, int, float, uuid.UUID, Any]) -> StandardID:
|
|
"""Assurer que l'ID est un StandardID"""
|
|
if isinstance(id_value, StandardID):
|
|
return id_value
|
|
else:
|
|
return StandardID(value=id_value)
|
|
|
|
@staticmethod
|
|
def migrate_bbox_dict(data: Dict[str, Any], bbox_fields: list = None) -> Dict[str, Any]:
|
|
"""Migrer les champs bbox dans un dictionnaire"""
|
|
if bbox_fields is None:
|
|
bbox_fields = ['bbox', 'bounding_box', 'bounds']
|
|
|
|
migrated = data.copy()
|
|
for field in bbox_fields:
|
|
if field in migrated:
|
|
try:
|
|
bbox = DataConverter.ensure_bbox(migrated[field])
|
|
migrated[field] = bbox.dict()
|
|
except Exception as e:
|
|
# Log l'erreur mais continue la migration
|
|
print(f"Warning: Could not migrate bbox field '{field}': {e}")
|
|
|
|
return migrated
|
|
|
|
@staticmethod
|
|
def migrate_timestamp_dict(data: Dict[str, Any], timestamp_fields: list = None) -> Dict[str, Any]:
|
|
"""Migrer les champs timestamp dans un dictionnaire"""
|
|
if timestamp_fields is None:
|
|
timestamp_fields = ['timestamp', 'created_at', 'updated_at', 'captured_at']
|
|
|
|
migrated = data.copy()
|
|
for field in timestamp_fields:
|
|
if field in migrated:
|
|
try:
|
|
timestamp = DataConverter.ensure_timestamp(migrated[field])
|
|
migrated[field] = timestamp.value
|
|
except Exception as e:
|
|
# Log l'erreur mais continue la migration
|
|
print(f"Warning: Could not migrate timestamp field '{field}': {e}")
|
|
|
|
return migrated
|
|
|
|
@staticmethod
|
|
def migrate_id_dict(data: Dict[str, Any], id_fields: list = None) -> Dict[str, Any]:
|
|
"""Migrer les champs ID dans un dictionnaire"""
|
|
if id_fields is None:
|
|
id_fields = ['id', 'element_id', 'session_id', 'workflow_id', 'node_id', 'edge_id']
|
|
|
|
migrated = data.copy()
|
|
for field in id_fields:
|
|
if field in migrated:
|
|
try:
|
|
id_obj = DataConverter.ensure_id(migrated[field])
|
|
migrated[field] = id_obj.value
|
|
except Exception as e:
|
|
# Log l'erreur mais continue la migration
|
|
print(f"Warning: Could not migrate ID field '{field}': {e}")
|
|
|
|
return migrated
|
|
|
|
|
|
# Aliases pour compatibilité
|
|
BaseTimestamp = Timestamp
|
|
BaseID = StandardID |