""" Modèles de base standardisés avec Pydantic - Tâche 4 Contrats de données standardisés pour assurer la cohérence entre tous les composants : - BBox : Format exclusif (x, y, width, height) avec validation Pydantic - Timestamp : Objets datetime uniquement - IDs : Strings uniquement avec validation Auteur : Dom, Alice Kiro Date : 20 décembre 2024 """ from pydantic import BaseModel, Field, validator from typing import Tuple, Union, Dict, Any, Optional from datetime import datetime import uuid class BBox(BaseModel): """ Bounding box standardisée au format (x, y, width, height) Exigence 4.1 : Format exclusif (x, y, width, height) avec validation Pydantic """ x: int = Field(..., ge=0, description="Position X (coin supérieur gauche)") y: int = Field(..., ge=0, description="Position Y (coin supérieur gauche)") width: int = Field(..., gt=0, description="Largeur") height: int = Field(..., gt=0, description="Hauteur") @validator('x', 'y', pre=True) def validate_coordinates(cls, v): """Valider que les coordonnées sont non-négatives""" if isinstance(v, (int, float)): if v < 0: raise ValueError("Coordinates must be non-negative") return int(v) raise ValueError("Coordinates must be numeric") @validator('width', 'height', pre=True) def validate_dimensions(cls, v): """Valider que les dimensions sont positives""" if isinstance(v, (int, float)): if v <= 0: raise ValueError("Dimensions must be positive") return int(v) raise ValueError("Dimensions must be numeric") def __iter__(self): """Permet le unpacking: x, y, w, h = bbox""" return iter((self.x, self.y, self.width, self.height)) def __getitem__(self, index): """Permet l'accès par index: bbox[0], bbox[1], etc.""" return (self.x, self.y, self.width, self.height)[index] def __len__(self): return 4 def __eq__(self, other): if isinstance(other, BBox): return (self.x == other.x and self.y == other.y and self.width == other.width and self.height == other.height) if isinstance(other, (tuple, list)) and len(other) == 4: return (self.x, self.y, self.width, self.height) == tuple(other) return NotImplemented def to_tuple(self) -> Tuple[int, int, int, int]: """Conversion vers tuple (x, y, w, h)""" return (self.x, self.y, self.width, self.height) @classmethod def from_tuple(cls, bbox_tuple: Tuple[int, int, int, int]) -> 'BBox': """Création depuis tuple (x, y, w, h)""" if len(bbox_tuple) != 4: raise ValueError("BBox tuple must have exactly 4 elements") return cls(x=bbox_tuple[0], y=bbox_tuple[1], width=bbox_tuple[2], height=bbox_tuple[3]) @classmethod def from_xyxy(cls, x1: int, y1: int, x2: int, y2: int) -> 'BBox': """Conversion depuis format (x1, y1, x2, y2)""" return cls( x=min(x1, x2), y=min(y1, y2), width=abs(x2 - x1), height=abs(y2 - y1) ) def to_xyxy(self) -> Tuple[int, int, int, int]: """Conversion vers format (x1, y1, x2, y2)""" return (self.x, self.y, self.x + self.width, self.y + self.height) def center(self) -> Tuple[int, int]: """Calculer le centre de la bbox""" return (self.x + self.width // 2, self.y + self.height // 2) def area(self) -> int: """Calculer l'aire de la bbox""" return self.width * self.height def contains_point(self, x: int, y: int) -> bool: """Vérifier si un point est dans la bbox""" return (self.x <= x <= self.x + self.width and self.y <= y <= self.y + self.height) def intersects(self, other: 'BBox') -> bool: """Vérifier si cette bbox intersecte avec une autre""" return not (self.x + self.width < other.x or other.x + other.width < self.x or self.y + self.height < other.y or other.y + other.height < self.y) def intersection(self, other: 'BBox') -> Optional['BBox']: """Calculer l'intersection avec une autre bbox""" if not self.intersects(other): return None x1 = max(self.x, other.x) y1 = max(self.y, other.y) x2 = min(self.x + self.width, other.x + other.width) y2 = min(self.y + self.height, other.y + other.height) return BBox(x=x1, y=y1, width=x2-x1, height=y2-y1) def union(self, other: 'BBox') -> 'BBox': """Calculer l'union avec une autre bbox""" x1 = min(self.x, other.x) y1 = min(self.y, other.y) x2 = max(self.x + self.width, other.x + other.width) y2 = max(self.y + self.height, other.y + other.height) return BBox(x=x1, y=y1, width=x2-x1, height=y2-y1) class Point(BaseModel): """ Point 2D standardisé Représente un point avec coordonnées x, y """ x: int = Field(..., description="Coordonnée X") y: int = Field(..., description="Coordonnée Y") @validator('x', 'y', pre=True) def validate_coordinates(cls, v): """Valider que les coordonnées sont numériques""" if isinstance(v, (int, float)): return int(v) raise ValueError("Coordinates must be numeric") def to_tuple(self) -> Tuple[int, int]: """Conversion vers tuple (x, y)""" return (self.x, self.y) @classmethod def from_tuple(cls, point_tuple: Tuple[int, int]) -> 'Point': """Création depuis tuple (x, y)""" if len(point_tuple) != 2: raise ValueError("Point tuple must have exactly 2 elements") return cls(x=point_tuple[0], y=point_tuple[1]) def distance_to(self, other: 'Point') -> float: """Calculer la distance euclidienne vers un autre point""" return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5 def is_inside_bbox(self, bbox: BBox) -> bool: """Vérifier si ce point est dans une bbox""" return bbox.contains_point(self.x, self.y) class Timestamp(BaseModel): """ Timestamp standardisé avec datetime Exigence 4.2 : Objets datetime uniquement avec utilitaires de conversion """ value: datetime = Field(default_factory=datetime.now, description="Valeur datetime") @validator('value', pre=True) def validate_datetime(cls, v): """Valider et convertir vers datetime""" if isinstance(v, datetime): return v elif isinstance(v, str): try: return datetime.fromisoformat(v.replace('Z', '+00:00')) except ValueError: raise ValueError(f"Cannot parse datetime string: {v}") elif isinstance(v, (int, float)): try: return datetime.fromtimestamp(v) except (ValueError, OSError): raise ValueError(f"Cannot convert timestamp to datetime: {v}") else: raise ValueError(f"Cannot convert {type(v)} to datetime") def to_iso(self) -> str: """Conversion vers format ISO""" return self.value.isoformat() def to_timestamp(self) -> float: """Conversion vers timestamp Unix""" return self.value.timestamp() @classmethod def now(cls) -> 'Timestamp': """Créer un timestamp pour maintenant""" return cls(value=datetime.now()) @classmethod def from_iso(cls, iso_string: str) -> 'Timestamp': """Créer depuis string ISO""" return cls(value=datetime.fromisoformat(iso_string.replace('Z', '+00:00'))) @classmethod def from_timestamp(cls, timestamp: float) -> 'Timestamp': """Créer depuis timestamp Unix""" return cls(value=datetime.fromtimestamp(timestamp)) class StandardID(BaseModel): """ ID standardisé en string Exigence 4.3 : IDs en strings uniquement avec validation """ value: str = Field(..., min_length=1, description="Valeur de l'ID") @validator('value', pre=True) def validate_id(cls, v): """Valider et convertir vers string""" if isinstance(v, str): if not v.strip(): raise ValueError("ID cannot be empty") return v.strip() elif isinstance(v, (int, float)): return str(v) elif isinstance(v, uuid.UUID): return str(v) else: raise ValueError(f"Cannot convert {type(v)} to ID string") def __str__(self) -> str: return self.value def __eq__(self, other) -> bool: if isinstance(other, StandardID): return self.value == other.value elif isinstance(other, str): return self.value == other return False def __hash__(self) -> int: return hash(self.value) @classmethod def generate(cls) -> 'StandardID': """Générer un nouvel ID unique""" return cls(value=str(uuid.uuid4())) @classmethod def from_uuid(cls, uuid_obj: uuid.UUID) -> 'StandardID': """Créer depuis UUID""" return cls(value=str(uuid_obj)) # Utilitaires de conversion pour la migration class DataConverter: """ Utilitaires de conversion sûrs pour la migration vers les nouveaux contrats Exigence 4.4 : Assurer la compatibilité ascendante pendant la migration """ @staticmethod def ensure_bbox(bbox: Union[BBox, Tuple, list, Dict, Any]) -> BBox: """Assurer que bbox est au format BBox standardisé""" if isinstance(bbox, BBox): return bbox elif isinstance(bbox, (tuple, list)) and len(bbox) == 4: return BBox.from_tuple(tuple(bbox)) elif isinstance(bbox, dict): if all(k in bbox for k in ['x', 'y', 'width', 'height']): return BBox(**bbox) elif all(k in bbox for k in ['x1', 'y1', 'x2', 'y2']): return BBox.from_xyxy(bbox['x1'], bbox['y1'], bbox['x2'], bbox['y2']) else: raise ValueError(f"Cannot convert dict to BBox: missing required keys") else: raise ValueError(f"Cannot convert {type(bbox)} to BBox") @staticmethod def ensure_timestamp(timestamp: Union[Timestamp, datetime, str, int, float, Any]) -> Timestamp: """Assurer que timestamp est un objet Timestamp standardisé""" if isinstance(timestamp, Timestamp): return timestamp else: return Timestamp(value=timestamp) @staticmethod def ensure_id(id_value: Union[StandardID, str, int, float, uuid.UUID, Any]) -> StandardID: """Assurer que l'ID est un StandardID""" if isinstance(id_value, StandardID): return id_value else: return StandardID(value=id_value) @staticmethod def migrate_bbox_dict(data: Dict[str, Any], bbox_fields: list = None) -> Dict[str, Any]: """Migrer les champs bbox dans un dictionnaire""" if bbox_fields is None: bbox_fields = ['bbox', 'bounding_box', 'bounds'] migrated = data.copy() for field in bbox_fields: if field in migrated: try: bbox = DataConverter.ensure_bbox(migrated[field]) migrated[field] = bbox.dict() except Exception as e: # Log l'erreur mais continue la migration print(f"Warning: Could not migrate bbox field '{field}': {e}") return migrated @staticmethod def migrate_timestamp_dict(data: Dict[str, Any], timestamp_fields: list = None) -> Dict[str, Any]: """Migrer les champs timestamp dans un dictionnaire""" if timestamp_fields is None: timestamp_fields = ['timestamp', 'created_at', 'updated_at', 'captured_at'] migrated = data.copy() for field in timestamp_fields: if field in migrated: try: timestamp = DataConverter.ensure_timestamp(migrated[field]) migrated[field] = timestamp.value except Exception as e: # Log l'erreur mais continue la migration print(f"Warning: Could not migrate timestamp field '{field}': {e}") return migrated @staticmethod def migrate_id_dict(data: Dict[str, Any], id_fields: list = None) -> Dict[str, Any]: """Migrer les champs ID dans un dictionnaire""" if id_fields is None: id_fields = ['id', 'element_id', 'session_id', 'workflow_id', 'node_id', 'edge_id'] migrated = data.copy() for field in id_fields: if field in migrated: try: id_obj = DataConverter.ensure_id(migrated[field]) migrated[field] = id_obj.value except Exception as e: # Log l'erreur mais continue la migration print(f"Warning: Could not migrate ID field '{field}': {e}") return migrated # Aliases pour compatibilité BaseTimestamp = Timestamp BaseID = StandardID