Compare commits
2 Commits
ae3e2050c1
...
c2c40543e5
| Author | SHA1 | Date | |
|---|---|---|---|
| c2c40543e5 | |||
| d265cd3269 |
47
Pseudonymisation_Gui_V6.py
Normal file
47
Pseudonymisation_Gui_V6.py
Normal file
@@ -0,0 +1,47 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Point d'entrée de la GUI V6 de Pseudonymisation.
|
||||
|
||||
Usage :
|
||||
python Pseudonymisation_Gui_V6.py # lance la fenêtre
|
||||
python Pseudonymisation_Gui_V6.py --self-test # importe l'app, sort 0, sans fenêtre
|
||||
|
||||
Le mode ``--self-test`` vérifie que tout le socle GUI V6 s'importe correctement
|
||||
(utile en CI / build sans display). Il n'ouvre aucune fenêtre.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
def _self_test() -> int:
|
||||
"""Importe les modules du socle GUI V6 sans créer de fenêtre."""
|
||||
from gui_v6 import app, license_client, license_store, processing_runner, theme # noqa: F401
|
||||
from gui_v6.tabs import tab_about, tab_usage # noqa: F401
|
||||
|
||||
# Sanity check des contrats publics du socle.
|
||||
assert hasattr(app, "AnonymisationApp")
|
||||
assert hasattr(license_client, "LicenseClient")
|
||||
assert hasattr(license_client, "LicenseStatus")
|
||||
assert hasattr(license_store, "LicenseStore")
|
||||
assert hasattr(processing_runner, "ProcessingRunner")
|
||||
assert hasattr(tab_about, "AboutTab")
|
||||
assert hasattr(tab_usage, "UsageTab")
|
||||
print("GUI V6 self-test OK")
|
||||
return 0
|
||||
|
||||
|
||||
def main(argv=None) -> int:
|
||||
argv = list(sys.argv[1:] if argv is None else argv)
|
||||
if "--self-test" in argv:
|
||||
return _self_test()
|
||||
|
||||
from gui_v6.app import AnonymisationApp
|
||||
|
||||
application = AnonymisationApp()
|
||||
application.mainloop()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
11
gui_v6/__init__.py
Normal file
11
gui_v6/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""Package GUI V6 de Pseudonymisation (refonte de la couche présentation).
|
||||
|
||||
Frontière propre : ce package n'embarque AUCUNE logique de détection. Il
|
||||
orchestre uniquement le moteur d'anonymisation existant et la licence.
|
||||
|
||||
Lot G1 (socle) : thème, client/stockage licence, shell minimal, onglet À propos.
|
||||
"""
|
||||
|
||||
__all__ = ["__version__"]
|
||||
|
||||
__version__ = "6.0.0-g1"
|
||||
97
gui_v6/app.py
Normal file
97
gui_v6/app.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""Shell minimal de la GUI V6 (lot G1).
|
||||
|
||||
Header + bandeau de statut licence + navigation 3 onglets
|
||||
(Utilisation, Configuration, À propos). Seul « À propos » est étoffé en G1 ;
|
||||
les deux autres sont des placeholders qui seront remplis en G2/G3.
|
||||
|
||||
Aucune logique de détection ici : ce module orchestre uniquement. La fenêtre
|
||||
n'est créée qu'à l'instanciation de :class:`AnonymisationApp` (import sûr).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import customtkinter as ctk
|
||||
|
||||
from gui_v6 import theme as theme_mod
|
||||
from gui_v6.license_client import LicenseClient, LicenseStatus
|
||||
from gui_v6.tabs.tab_about import AboutTab
|
||||
from gui_v6.tabs.tab_usage import UsageTab
|
||||
|
||||
_TABS = ("Utilisation", "Configuration", "À propos")
|
||||
|
||||
|
||||
class AnonymisationApp(ctk.CTk):
|
||||
"""Fenêtre principale (socle G1)."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
license_client: Optional[LicenseClient] = None,
|
||||
theme_name: str = theme_mod.DEFAULT_THEME,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self._theme_name = theme_name
|
||||
theme_mod.apply_theme(theme_name)
|
||||
|
||||
# Client licence : par défaut, lecture du statut local uniquement
|
||||
# (aucun appel réseau au démarrage). Injectable pour les tests.
|
||||
self._license_client = license_client or LicenseClient("http://localhost")
|
||||
status = self._safe_local_status()
|
||||
|
||||
self.title("Pseudonymisation de vos documents")
|
||||
self.geometry("960x640")
|
||||
|
||||
self._build_header(status)
|
||||
self._build_tabs(status)
|
||||
|
||||
# -- statut licence ---------------------------------------------------
|
||||
|
||||
def _safe_local_status(self) -> LicenseStatus:
|
||||
try:
|
||||
return self._license_client.local_status()
|
||||
except Exception:
|
||||
# Licence indisponible → dégradation silencieuse (mode bêta).
|
||||
return LicenseStatus.unavailable()
|
||||
|
||||
# -- construction UI --------------------------------------------------
|
||||
|
||||
def _build_header(self, status: LicenseStatus) -> None:
|
||||
header = ctk.CTkFrame(self, height=56)
|
||||
header.pack(fill="x", padx=12, pady=(12, 6))
|
||||
|
||||
ctk.CTkLabel(
|
||||
header,
|
||||
text="Pseudonymisation",
|
||||
font=ctk.CTkFont(size=16, weight="bold"),
|
||||
).pack(side="left", padx=12, pady=10)
|
||||
|
||||
color = theme_mod.status_color(self._theme_name, status.status)
|
||||
self._status_banner = ctk.CTkLabel(
|
||||
header, text=self._banner_text(status), text_color=color
|
||||
)
|
||||
self._status_banner.pack(side="right", padx=12, pady=10)
|
||||
|
||||
def _build_tabs(self, status: LicenseStatus) -> None:
|
||||
tabview = ctk.CTkTabview(self)
|
||||
tabview.pack(fill="both", expand=True, padx=12, pady=(6, 12))
|
||||
for name in _TABS:
|
||||
tabview.add(name)
|
||||
|
||||
self._usage = UsageTab(tabview.tab("Utilisation"))
|
||||
self._usage.pack(fill="both", expand=True)
|
||||
|
||||
self._about = AboutTab(
|
||||
tabview.tab("À propos"), status=status, theme_name=self._theme_name
|
||||
)
|
||||
self._about.pack(fill="both", expand=True)
|
||||
|
||||
# Placeholder G3.
|
||||
ctk.CTkLabel(
|
||||
tabview.tab("Configuration"),
|
||||
text="Onglet Configuration — disponible au lot G3.",
|
||||
).pack(padx=16, pady=16, anchor="w")
|
||||
|
||||
@staticmethod
|
||||
def _banner_text(status: LicenseStatus) -> str:
|
||||
return f"Licence : {status.status}"
|
||||
195
gui_v6/license_client.py
Normal file
195
gui_v6/license_client.py
Normal file
@@ -0,0 +1,195 @@
|
||||
"""Client licence de la GUI V6 (mockable, sans clé privée).
|
||||
|
||||
Contrat final aligné sur le portail ``app_aivanov`` :
|
||||
|
||||
- ``activate(token, machine_id)`` → ``POST /api/v1/activate``
|
||||
- ``check(license_ref, machine_id)`` → ``POST /api/v1/check``
|
||||
|
||||
Principes :
|
||||
|
||||
- La session HTTP est **injectable** (paramètre ``session``) : en test on
|
||||
fournit une fausse session, aucun appel réseau réel n'est effectué.
|
||||
- Le client **ne contient aucune clé privée** : la vérification de signature
|
||||
est faite côté serveur ; le client stocke seulement la licence signée reçue.
|
||||
- Serveur indisponible / réponse illisible → statut ``unavailable``, jamais
|
||||
d'exception remontée à l'appelant (la GUI ne doit pas crasher).
|
||||
- Aucun token n'est journalisé.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Optional, Protocol
|
||||
|
||||
from gui_v6.license_store import LicenseStore
|
||||
|
||||
# Statuts métier exposés à l'UI.
|
||||
ACTIVE = "active"
|
||||
GRACE = "grace"
|
||||
EXPIRED = "expired"
|
||||
REVOKED = "revoked"
|
||||
NONE = "none"
|
||||
UNAVAILABLE = "unavailable"
|
||||
INVALID = "invalid"
|
||||
|
||||
_VALID_STATES = {ACTIVE, GRACE}
|
||||
|
||||
|
||||
class _HttpResponse(Protocol):
|
||||
status_code: int
|
||||
|
||||
def json(self) -> Any: ...
|
||||
|
||||
|
||||
class _HttpSession(Protocol):
|
||||
def post(self, url: str, json: dict, timeout: float) -> _HttpResponse: ...
|
||||
|
||||
|
||||
@dataclass
|
||||
class LicenseStatus:
|
||||
"""État de licence destiné à l'affichage et aux décisions de la GUI."""
|
||||
|
||||
valid: bool
|
||||
status: str
|
||||
message: str = ""
|
||||
expires_at: Optional[str] = None
|
||||
grace_days: int = 0
|
||||
machine_id: Optional[str] = None
|
||||
license_ref: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def none(cls, message: str = "Aucune licence active") -> "LicenseStatus":
|
||||
return cls(valid=False, status=NONE, message=message)
|
||||
|
||||
@classmethod
|
||||
def unavailable(
|
||||
cls, message: str = "Serveur de licence indisponible"
|
||||
) -> "LicenseStatus":
|
||||
return cls(valid=False, status=UNAVAILABLE, message=message)
|
||||
|
||||
@classmethod
|
||||
def invalid(cls, message: str = "Licence ou jeton invalide") -> "LicenseStatus":
|
||||
return cls(valid=False, status=INVALID, message=message)
|
||||
|
||||
@classmethod
|
||||
def from_payload(
|
||||
cls, payload: dict[str, Any], machine_id: Optional[str] = None
|
||||
) -> "LicenseStatus":
|
||||
"""Construit un statut depuis la réponse API ou un ancien payload plat.
|
||||
|
||||
Le portail renvoie ``{"state": "...", "license": {"payload": ...}}``.
|
||||
Les payloads plats ``{"status": "...", "license_ref": ...}`` restent
|
||||
acceptés pour garder les tests/unités locales simples et robustes.
|
||||
"""
|
||||
status = str(payload.get("state") or payload.get("status") or NONE)
|
||||
license_envelope = payload.get("license")
|
||||
signed_payload = (
|
||||
license_envelope.get("payload")
|
||||
if isinstance(license_envelope, dict)
|
||||
and isinstance(license_envelope.get("payload"), dict)
|
||||
else {}
|
||||
)
|
||||
source = signed_payload if signed_payload else payload
|
||||
try:
|
||||
grace = int(source.get("grace_days", payload.get("grace_days", 0)) or 0)
|
||||
except (TypeError, ValueError):
|
||||
grace = 0
|
||||
return cls(
|
||||
valid=status in _VALID_STATES,
|
||||
status=status,
|
||||
message=str(payload.get("message", "") or ""),
|
||||
expires_at=source.get("expires_at") or payload.get("expires_at"),
|
||||
grace_days=grace,
|
||||
machine_id=source.get("machine_id") or payload.get("machine_id") or machine_id,
|
||||
license_ref=source.get("license_ref") or payload.get("license_ref"),
|
||||
)
|
||||
|
||||
|
||||
class LicenseClient:
|
||||
"""Appelle le portail licence via une session HTTP injectable."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str,
|
||||
session: Optional[_HttpSession] = None,
|
||||
store: Optional[LicenseStore] = None,
|
||||
timeout: float = 10.0,
|
||||
) -> None:
|
||||
self._base_url = base_url.rstrip("/")
|
||||
self._session = session
|
||||
self._store = store if store is not None else LicenseStore()
|
||||
self._timeout = timeout
|
||||
|
||||
def _get_session(self) -> _HttpSession:
|
||||
if self._session is None:
|
||||
# Import paresseux : pas de dépendance dure si une session est injectée.
|
||||
import requests
|
||||
|
||||
self._session = requests.Session()
|
||||
return self._session
|
||||
|
||||
def _post(self, endpoint: str, payload: dict) -> Optional[_HttpResponse]:
|
||||
try:
|
||||
session = self._get_session()
|
||||
return session.post(
|
||||
f"{self._base_url}{endpoint}", json=payload, timeout=self._timeout
|
||||
)
|
||||
except Exception:
|
||||
# Réseau indisponible, DNS, timeout, requests absent… : pas de crash.
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _parse(response: Optional[_HttpResponse]) -> Optional[dict]:
|
||||
if response is None:
|
||||
return None
|
||||
try:
|
||||
data = response.json()
|
||||
except Exception:
|
||||
return None
|
||||
return data if isinstance(data, dict) else None
|
||||
|
||||
def activate(self, token: str, machine_id: str) -> LicenseStatus:
|
||||
"""Active une licence à partir d'un jeton. Stocke la licence si valide."""
|
||||
response = self._post(
|
||||
"/api/v1/activate", {"token": token, "machine_id": machine_id}
|
||||
)
|
||||
if response is None:
|
||||
return LicenseStatus.unavailable()
|
||||
if getattr(response, "status_code", 500) >= 400:
|
||||
payload = self._parse(response) or {}
|
||||
message = str(payload.get("detail") or payload.get("message") or "")
|
||||
return LicenseStatus.invalid(message or "Jeton refusé par le serveur")
|
||||
payload = self._parse(response)
|
||||
if payload is None:
|
||||
return LicenseStatus.unavailable()
|
||||
status = LicenseStatus.from_payload(payload, machine_id)
|
||||
if status.valid:
|
||||
# Stocke l'enveloppe signée reçue du serveur, jamais le jeton d'activation.
|
||||
self._store.save(payload)
|
||||
return status
|
||||
|
||||
def check(self, license_ref: str, machine_id: str) -> LicenseStatus:
|
||||
"""Vérifie l'état d'une licence existante auprès du portail."""
|
||||
response = self._post(
|
||||
"/api/v1/check", {"license_ref": license_ref, "machine_id": machine_id}
|
||||
)
|
||||
if response is None:
|
||||
return LicenseStatus.unavailable()
|
||||
if getattr(response, "status_code", 500) >= 400:
|
||||
payload = self._parse(response) or {}
|
||||
message = str(payload.get("detail") or payload.get("message") or "")
|
||||
return LicenseStatus.invalid(message or "Licence refusée par le serveur")
|
||||
payload = self._parse(response)
|
||||
if payload is None:
|
||||
return LicenseStatus.unavailable()
|
||||
status = LicenseStatus.from_payload(payload, machine_id)
|
||||
if status.valid:
|
||||
self._store.save(payload)
|
||||
return status
|
||||
|
||||
def local_status(self) -> LicenseStatus:
|
||||
"""État de licence depuis le stockage local, sans appel réseau."""
|
||||
data = self._store.load()
|
||||
if not data:
|
||||
return LicenseStatus.none()
|
||||
return LicenseStatus.from_payload(data)
|
||||
84
gui_v6/license_store.py
Normal file
84
gui_v6/license_store.py
Normal file
@@ -0,0 +1,84 @@
|
||||
"""Stockage local de la licence signée, hors dépôt.
|
||||
|
||||
La licence est stockée dans l'espace utilisateur, jamais dans le dépôt Git :
|
||||
|
||||
- Windows : ``%LOCALAPPDATA%/Aivanov/Anonymisation/license.json``
|
||||
- Linux/dev : ``~/.local/share/aivanov/anonymisation/license.json``
|
||||
|
||||
Règle de sécurité : ce module ne journalise jamais le contenu de la licence
|
||||
(le token signé est sensible). Aucune impression, aucun log du payload.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
_APP_VENDOR = "Aivanov"
|
||||
_APP_NAME = "Anonymisation"
|
||||
_LICENSE_FILENAME = "license.json"
|
||||
|
||||
|
||||
def default_license_path() -> Path:
|
||||
"""Retourne le chemin du fichier licence dans l'espace utilisateur.
|
||||
|
||||
Le répertoire n'est pas créé ici ; il l'est à la première écriture.
|
||||
"""
|
||||
if sys.platform.startswith("win"):
|
||||
base = os.environ.get("LOCALAPPDATA")
|
||||
root = Path(base) if base else Path.home() / "AppData" / "Local"
|
||||
return root / _APP_VENDOR / _APP_NAME / _LICENSE_FILENAME
|
||||
|
||||
# Linux / macOS / dev : respecte XDG_DATA_HOME si défini.
|
||||
base = os.environ.get("XDG_DATA_HOME")
|
||||
root = Path(base) if base else Path.home() / ".local" / "share"
|
||||
return root / _APP_VENDOR.lower() / _APP_NAME.lower() / _LICENSE_FILENAME
|
||||
|
||||
|
||||
class LicenseStore:
|
||||
"""Lit / écrit / supprime le fichier licence local (JSON).
|
||||
|
||||
Le chemin est injectable pour les tests (vrai fichier dans un tmp_path) ;
|
||||
par défaut il pointe vers l'emplacement utilisateur de la plateforme.
|
||||
"""
|
||||
|
||||
def __init__(self, path: Optional[Path] = None) -> None:
|
||||
self._path = Path(path) if path is not None else default_license_path()
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
return self._path
|
||||
|
||||
def exists(self) -> bool:
|
||||
return self._path.is_file()
|
||||
|
||||
def load(self) -> Optional[dict[str, Any]]:
|
||||
"""Retourne le payload licence, ou ``None`` si absent / illisible."""
|
||||
if not self._path.is_file():
|
||||
return None
|
||||
try:
|
||||
with self._path.open("r", encoding="utf-8") as handle:
|
||||
data = json.load(handle)
|
||||
except (json.JSONDecodeError, OSError, ValueError):
|
||||
return None
|
||||
return data if isinstance(data, dict) else None
|
||||
|
||||
def save(self, data: dict[str, Any]) -> Path:
|
||||
"""Écrit le payload licence de façon atomique. Crée le répertoire parent."""
|
||||
self._path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = self._path.with_name(self._path.name + ".tmp")
|
||||
with tmp.open("w", encoding="utf-8") as handle:
|
||||
json.dump(data, handle, ensure_ascii=False, indent=2)
|
||||
os.replace(tmp, self._path)
|
||||
return self._path
|
||||
|
||||
def delete(self) -> bool:
|
||||
"""Supprime le fichier licence. Retourne True s'il existait."""
|
||||
try:
|
||||
self._path.unlink()
|
||||
return True
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
166
gui_v6/processing_runner.py
Normal file
166
gui_v6/processing_runner.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""Runner de traitement pour la GUI V6 (testable sans display ni moteur lourd).
|
||||
|
||||
Le runner orchestre l'anonymisation document par document via une fonction de
|
||||
traitement **injectable** :
|
||||
|
||||
- en production, le défaut appelle ``process_document`` du moteur (import paresseux,
|
||||
aucun manager NER chargé à l'import de ce module) ;
|
||||
- en test, on injecte une fausse fonction — aucun appel réseau, aucun modèle.
|
||||
|
||||
Il ne contient aucune logique de détection : il découvre les documents, construit
|
||||
les dossiers de sortie comme la V5 (``anonymise/`` sous la source, arborescence
|
||||
préservée), exécute le traitement, et expose progression / journal / résumé /
|
||||
arrêt coopératif (entre deux documents).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Callable, Optional, Sequence
|
||||
|
||||
from gui_batch_paths import build_batch_output_dir, list_supported_documents
|
||||
|
||||
# process_fn(doc_path, out_dir) -> dict de sortie (ignoré par le runner).
|
||||
ProcessFn = Callable[[Path, Path], dict]
|
||||
|
||||
# Repli si format_converter indisponible à l'exécution (ne sert qu'au listing).
|
||||
_FALLBACK_EXTENSIONS = (
|
||||
".pdf", ".docx", ".odt", ".rtf", ".txt", ".html", ".htm",
|
||||
".jpg", ".jpeg", ".png", ".tiff", ".tif", ".bmp",
|
||||
)
|
||||
|
||||
|
||||
def supported_extensions() -> tuple[str, ...]:
|
||||
"""Extensions supportées : depuis ``format_converter`` si dispo, sinon repli."""
|
||||
try:
|
||||
from format_converter import SUPPORTED_EXTENSIONS
|
||||
|
||||
return tuple(sorted(SUPPORTED_EXTENSIONS))
|
||||
except Exception:
|
||||
return _FALLBACK_EXTENSIONS
|
||||
|
||||
|
||||
def default_output_dir(input_path) -> Path:
|
||||
"""Dossier de sortie par défaut : ``anonymise/`` sous la source."""
|
||||
path = Path(input_path)
|
||||
base = path if path.is_dir() else path.parent
|
||||
return base / "anonymise"
|
||||
|
||||
|
||||
def discover_documents(input_path, extensions: Optional[Sequence[str]] = None) -> list[Path]:
|
||||
"""Liste les documents à traiter (fichier unique ou dossier récursif)."""
|
||||
path = Path(input_path)
|
||||
exts = tuple(extensions) if extensions is not None else supported_extensions()
|
||||
normalized = {e.lower() for e in exts}
|
||||
if path.is_file():
|
||||
return [path] if path.suffix.lower() in normalized else []
|
||||
if path.is_dir():
|
||||
return list_supported_documents(path, exts)
|
||||
return []
|
||||
|
||||
|
||||
@dataclass
|
||||
class RunSummary:
|
||||
"""Résultat d'un run : compteurs et erreurs par document."""
|
||||
|
||||
total: int = 0
|
||||
succeeded: int = 0
|
||||
failed: int = 0
|
||||
stopped: bool = False
|
||||
errors: list = field(default_factory=list) # list[tuple[str, str]] (nom, message)
|
||||
|
||||
@property
|
||||
def ok(self) -> bool:
|
||||
return self.failed == 0 and not self.stopped
|
||||
|
||||
|
||||
def _default_process_fn(doc_path: Path, out_dir: Path) -> dict:
|
||||
# Import paresseux : aucun manager NER chargé à l'import du runner.
|
||||
from anonymizer_core_refactored_onnx import process_document
|
||||
|
||||
return process_document(doc_path, out_dir)
|
||||
|
||||
|
||||
class ProcessingRunner:
|
||||
"""Exécute le traitement document par document, arrêt coopératif inclus."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
process_fn: Optional[ProcessFn] = None,
|
||||
extensions: Optional[Sequence[str]] = None,
|
||||
) -> None:
|
||||
self._process_fn = process_fn or _default_process_fn
|
||||
self._extensions = tuple(extensions) if extensions is not None else None
|
||||
self._lock = threading.Lock()
|
||||
self._running = False
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
|
||||
def discover(self, input_path) -> list[Path]:
|
||||
return discover_documents(input_path, self._extensions)
|
||||
|
||||
def run(
|
||||
self,
|
||||
input_path,
|
||||
output_dir=None,
|
||||
*,
|
||||
on_progress: Optional[Callable[[int, int, str], None]] = None,
|
||||
on_log: Optional[Callable[[str], None]] = None,
|
||||
stop_event: Optional[threading.Event] = None,
|
||||
) -> RunSummary:
|
||||
"""Traite les documents de ``input_path``. Synchrone (lancer dans un thread pour l'UI).
|
||||
|
||||
Lève ``RuntimeError`` si un run est déjà en cours (anti double-lancement).
|
||||
"""
|
||||
with self._lock:
|
||||
if self._running:
|
||||
raise RuntimeError("Un traitement est déjà en cours.")
|
||||
self._running = True
|
||||
try:
|
||||
return self._run_impl(input_path, output_dir, on_progress, on_log, stop_event)
|
||||
finally:
|
||||
with self._lock:
|
||||
self._running = False
|
||||
|
||||
def _run_impl(self, input_path, output_dir, on_progress, on_log, stop_event) -> RunSummary:
|
||||
input_path = Path(input_path)
|
||||
docs = self.discover(input_path)
|
||||
out_root = Path(output_dir) if output_dir else default_output_dir(input_path)
|
||||
root_dir = input_path if input_path.is_dir() else input_path.parent
|
||||
summary = RunSummary(total=len(docs))
|
||||
|
||||
def log(message: str) -> None:
|
||||
if on_log:
|
||||
on_log(message)
|
||||
|
||||
if not docs:
|
||||
log("Aucun document supporté détecté.")
|
||||
return summary
|
||||
|
||||
for index, doc in enumerate(docs, start=1):
|
||||
if stop_event is not None and stop_event.is_set():
|
||||
summary.stopped = True
|
||||
log("Arrêt demandé — traitement interrompu entre deux documents.")
|
||||
break
|
||||
if on_progress:
|
||||
on_progress(index - 1, summary.total, doc.name)
|
||||
try:
|
||||
if input_path.is_dir():
|
||||
doc_out = build_batch_output_dir(root_dir, out_root, doc)
|
||||
else:
|
||||
doc_out = out_root
|
||||
doc_out.mkdir(parents=True, exist_ok=True)
|
||||
self._process_fn(doc, doc_out)
|
||||
summary.succeeded += 1
|
||||
log(f"OK : {doc.name}")
|
||||
except Exception as exc: # un échec n'interrompt pas le lot
|
||||
summary.failed += 1
|
||||
summary.errors.append((doc.name, str(exc)))
|
||||
log(f"ÉCHEC : {doc.name} — {exc}")
|
||||
if on_progress:
|
||||
on_progress(index, summary.total, doc.name)
|
||||
return summary
|
||||
1
gui_v6/tabs/__init__.py
Normal file
1
gui_v6/tabs/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Onglets de la GUI V6."""
|
||||
76
gui_v6/tabs/tab_about.py
Normal file
76
gui_v6/tabs/tab_about.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""Onglet « À propos » : version, et état de la licence.
|
||||
|
||||
Affiche le statut licence fourni par le client/stub. Dégrade proprement si la
|
||||
licence est indisponible (bandeau d'information, pas d'erreur bloquante).
|
||||
Les widgets ne sont créés qu'à l'instanciation (import sûr pour ``--self-test``).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import customtkinter as ctk
|
||||
|
||||
from gui_v6 import __version__ as GUI_VERSION
|
||||
from gui_v6 import theme as theme_mod
|
||||
from gui_v6.license_client import LicenseStatus
|
||||
|
||||
_STATUS_LABELS = {
|
||||
"active": "Licence active",
|
||||
"grace": "Licence en période de grâce",
|
||||
"expired": "Licence expirée",
|
||||
"revoked": "Poste révoqué",
|
||||
"invalid": "Licence invalide",
|
||||
"unavailable": "Serveur de licence indisponible",
|
||||
"none": "Aucune licence",
|
||||
}
|
||||
|
||||
|
||||
def _build_info() -> str:
|
||||
"""Version / commit du build, si disponible. Best-effort, sans casser l'UI."""
|
||||
try:
|
||||
import build_info # type: ignore
|
||||
|
||||
version = getattr(build_info, "VERSION", "?")
|
||||
commit = getattr(build_info, "COMMIT", "?")
|
||||
return f"Moteur {version} ({commit})"
|
||||
except Exception:
|
||||
return "Moteur : information de build indisponible"
|
||||
|
||||
|
||||
class AboutTab(ctk.CTkFrame):
|
||||
def __init__(
|
||||
self,
|
||||
master,
|
||||
status: Optional[LicenseStatus] = None,
|
||||
theme_name: str = theme_mod.DEFAULT_THEME,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(master, **kwargs)
|
||||
self._theme_name = theme_name
|
||||
|
||||
ctk.CTkLabel(
|
||||
self,
|
||||
text="Pseudonymisation de vos documents",
|
||||
font=ctk.CTkFont(size=18, weight="bold"),
|
||||
).pack(anchor="w", padx=16, pady=(16, 4))
|
||||
|
||||
ctk.CTkLabel(self, text=f"Interface V6 — {GUI_VERSION}").pack(
|
||||
anchor="w", padx=16
|
||||
)
|
||||
ctk.CTkLabel(self, text=_build_info()).pack(anchor="w", padx=16, pady=(0, 12))
|
||||
|
||||
self._status_label = ctk.CTkLabel(self, text="", anchor="w")
|
||||
self._status_label.pack(anchor="w", padx=16, pady=(0, 8))
|
||||
|
||||
self.set_status(status or LicenseStatus.none())
|
||||
|
||||
def set_status(self, status: LicenseStatus) -> None:
|
||||
label = _STATUS_LABELS.get(status.status, status.status)
|
||||
text = f"État licence : {label}"
|
||||
if status.expires_at:
|
||||
text += f" · expire le {status.expires_at}"
|
||||
if status.message:
|
||||
text += f"\n{status.message}"
|
||||
color = theme_mod.status_color(self._theme_name, status.status)
|
||||
self._status_label.configure(text=text, text_color=color)
|
||||
194
gui_v6/tabs/tab_usage.py
Normal file
194
gui_v6/tabs/tab_usage.py
Normal file
@@ -0,0 +1,194 @@
|
||||
"""Onglet « Utilisation » de la GUI V6.
|
||||
|
||||
Sélection fichier/dossier → choix sortie → lancement via le runner (dans un
|
||||
thread) → progression / journal / résumé. Aucun appel réseau au démarrage,
|
||||
aucune logique de détection : tout passe par :class:`ProcessingRunner`.
|
||||
|
||||
Les widgets ne sont créés qu'à l'instanciation (import sûr pour ``--self-test``).
|
||||
La communication thread worker → UI passe par une file drainée via ``after``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from tkinter import filedialog
|
||||
|
||||
import customtkinter as ctk
|
||||
|
||||
from gui_v6.processing_runner import ProcessingRunner, default_output_dir
|
||||
|
||||
|
||||
class UsageTab(ctk.CTkFrame):
|
||||
def __init__(self, master, runner: ProcessingRunner | None = None, **kwargs):
|
||||
super().__init__(master, **kwargs)
|
||||
self._runner = runner or ProcessingRunner()
|
||||
self._input_path: Path | None = None
|
||||
self._output_dir: Path | None = None
|
||||
self._stop_event: threading.Event | None = None
|
||||
self._worker: threading.Thread | None = None
|
||||
self._events: "queue.Queue[tuple]" = queue.Queue()
|
||||
|
||||
self._build()
|
||||
self.after(120, self._drain_events)
|
||||
|
||||
# -- construction UI --------------------------------------------------
|
||||
|
||||
def _build(self) -> None:
|
||||
bar = ctk.CTkFrame(self)
|
||||
bar.pack(fill="x", padx=12, pady=(12, 6))
|
||||
ctk.CTkButton(bar, text="Choisir un fichier…", command=self._pick_file).pack(
|
||||
side="left", padx=(0, 8)
|
||||
)
|
||||
ctk.CTkButton(bar, text="Choisir un dossier…", command=self._pick_folder).pack(
|
||||
side="left"
|
||||
)
|
||||
|
||||
self._target_label = ctk.CTkLabel(self, text="Aucune source sélectionnée", anchor="w")
|
||||
self._target_label.pack(fill="x", padx=12, pady=(0, 4))
|
||||
|
||||
out_bar = ctk.CTkFrame(self)
|
||||
out_bar.pack(fill="x", padx=12, pady=(0, 6))
|
||||
ctk.CTkButton(out_bar, text="Dossier de sortie…", command=self._pick_output).pack(
|
||||
side="left", padx=(0, 8)
|
||||
)
|
||||
self._output_label = ctk.CTkLabel(out_bar, text="Sortie : (défaut anonymise/)", anchor="w")
|
||||
self._output_label.pack(side="left")
|
||||
|
||||
action = ctk.CTkFrame(self)
|
||||
action.pack(fill="x", padx=12, pady=(0, 6))
|
||||
self._run_btn = ctk.CTkButton(action, text="Lancer", command=self._start, state="disabled")
|
||||
self._run_btn.pack(side="left", padx=(0, 8))
|
||||
self._stop_btn = ctk.CTkButton(action, text="Arrêter", command=self._request_stop, state="disabled")
|
||||
self._stop_btn.pack(side="left")
|
||||
|
||||
self._progress = ctk.CTkProgressBar(self)
|
||||
self._progress.set(0.0)
|
||||
self._progress.pack(fill="x", padx=12, pady=(6, 4))
|
||||
|
||||
self._status_label = ctk.CTkLabel(self, text="Prêt.", anchor="w")
|
||||
self._status_label.pack(fill="x", padx=12)
|
||||
|
||||
self._log = ctk.CTkTextbox(self, height=180)
|
||||
self._log.pack(fill="both", expand=True, padx=12, pady=(6, 12))
|
||||
self._log.configure(state="disabled")
|
||||
|
||||
# -- sélection --------------------------------------------------------
|
||||
|
||||
def _pick_file(self) -> None:
|
||||
path = filedialog.askopenfilename(title="Choisir un document")
|
||||
if path:
|
||||
self._set_input(Path(path))
|
||||
|
||||
def _pick_folder(self) -> None:
|
||||
path = filedialog.askdirectory(title="Choisir un dossier")
|
||||
if path:
|
||||
self._set_input(Path(path))
|
||||
|
||||
def _pick_output(self) -> None:
|
||||
path = filedialog.askdirectory(title="Dossier de sortie")
|
||||
if path:
|
||||
self._output_dir = Path(path)
|
||||
self._output_label.configure(text=f"Sortie : {self._output_dir}")
|
||||
|
||||
def _set_input(self, path: Path) -> None:
|
||||
self._input_path = path
|
||||
count = len(self._runner.discover(path))
|
||||
if path.is_dir():
|
||||
self._target_label.configure(text=f"Dossier : {path} · {count} document(s) détecté(s)")
|
||||
else:
|
||||
self._target_label.configure(text=f"Fichier : {path.name}")
|
||||
self._output_label.configure(text=f"Sortie : (défaut {default_output_dir(path)})")
|
||||
self._run_btn.configure(state="normal" if count > 0 else "disabled")
|
||||
|
||||
# -- exécution --------------------------------------------------------
|
||||
|
||||
def _start(self) -> None:
|
||||
if self._input_path is None or self._runner.is_running:
|
||||
return
|
||||
self._stop_event = threading.Event()
|
||||
self._run_btn.configure(state="disabled")
|
||||
self._stop_btn.configure(state="normal")
|
||||
self._progress.set(0.0)
|
||||
self._clear_log()
|
||||
self._set_status("Traitement en cours…")
|
||||
|
||||
input_path, output_dir, stop = self._input_path, self._output_dir, self._stop_event
|
||||
|
||||
def work() -> None:
|
||||
try:
|
||||
summary = self._runner.run(
|
||||
input_path,
|
||||
output_dir,
|
||||
on_progress=lambda done, total, name: self._events.put(("progress", done, total, name)),
|
||||
on_log=lambda msg: self._events.put(("log", msg)),
|
||||
stop_event=stop,
|
||||
)
|
||||
self._events.put(("done", summary))
|
||||
except Exception as exc: # garde-fou : ne jamais laisser le thread tuer l'UI
|
||||
self._events.put(("error", str(exc)))
|
||||
|
||||
self._worker = threading.Thread(target=work, daemon=True)
|
||||
self._worker.start()
|
||||
|
||||
def _request_stop(self) -> None:
|
||||
if self._stop_event is not None:
|
||||
self._stop_event.set()
|
||||
self._set_status("Arrêt demandé…")
|
||||
self._stop_btn.configure(state="disabled")
|
||||
|
||||
# -- file d'événements worker → UI ------------------------------------
|
||||
|
||||
def _drain_events(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
event = self._events.get_nowait()
|
||||
self._handle_event(event)
|
||||
except queue.Empty:
|
||||
pass
|
||||
self.after(120, self._drain_events)
|
||||
|
||||
def _handle_event(self, event: tuple) -> None:
|
||||
kind = event[0]
|
||||
if kind == "progress":
|
||||
_, done, total, name = event
|
||||
self._progress.set(done / total if total else 0.0)
|
||||
self._set_status(f"{done}/{total} — {name}")
|
||||
elif kind == "log":
|
||||
self._append_log(event[1])
|
||||
elif kind == "done":
|
||||
self._finish(event[1])
|
||||
elif kind == "error":
|
||||
self._append_log(f"Erreur : {event[1]}")
|
||||
self._finish(None)
|
||||
|
||||
def _finish(self, summary) -> None:
|
||||
self._stop_btn.configure(state="disabled")
|
||||
self._run_btn.configure(state="normal")
|
||||
if summary is None:
|
||||
self._set_status("Terminé avec erreur.")
|
||||
return
|
||||
if summary.stopped:
|
||||
self._set_status(f"Arrêté : {summary.succeeded}/{summary.total} traités.")
|
||||
else:
|
||||
self._progress.set(1.0)
|
||||
self._set_status(
|
||||
f"Terminé : {summary.succeeded} OK, {summary.failed} échec(s) sur {summary.total}."
|
||||
)
|
||||
|
||||
# -- helpers widgets --------------------------------------------------
|
||||
|
||||
def _set_status(self, text: str) -> None:
|
||||
self._status_label.configure(text=text)
|
||||
|
||||
def _clear_log(self) -> None:
|
||||
self._log.configure(state="normal")
|
||||
self._log.delete("1.0", "end")
|
||||
self._log.configure(state="disabled")
|
||||
|
||||
def _append_log(self, message: str) -> None:
|
||||
self._log.configure(state="normal")
|
||||
self._log.insert("end", message + "\n")
|
||||
self._log.see("end")
|
||||
self._log.configure(state="disabled")
|
||||
83
gui_v6/theme.py
Normal file
83
gui_v6/theme.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""Thèmes et palette de la GUI V6.
|
||||
|
||||
Mappe les tokens de couleur de la maquette ``docs/ui_mockup_v6.html`` vers
|
||||
customtkinter. Lot G1 : 4 thèmes de base + helper d'application. L'import de ce
|
||||
module ne crée aucun widget (compatible ``--self-test`` sans display).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict
|
||||
|
||||
# Tokens couleur par thème (bandeau statut licence, accents, surfaces).
|
||||
THEMES: Dict[str, dict] = {
|
||||
"clair": {
|
||||
"appearance": "light",
|
||||
"color_theme": "blue",
|
||||
"accent": "#2563eb",
|
||||
"ok": "#16a34a",
|
||||
"warn": "#d97706",
|
||||
"error": "#dc2626",
|
||||
},
|
||||
"sombre": {
|
||||
"appearance": "dark",
|
||||
"color_theme": "blue",
|
||||
"accent": "#3b82f6",
|
||||
"ok": "#22c55e",
|
||||
"warn": "#f59e0b",
|
||||
"error": "#ef4444",
|
||||
},
|
||||
"médical": {
|
||||
"appearance": "light",
|
||||
"color_theme": "green",
|
||||
"accent": "#0d9488",
|
||||
"ok": "#15803d",
|
||||
"warn": "#ca8a04",
|
||||
"error": "#b91c1c",
|
||||
},
|
||||
"contraste": {
|
||||
"appearance": "dark",
|
||||
"color_theme": "dark-blue",
|
||||
"accent": "#60a5fa",
|
||||
"ok": "#4ade80",
|
||||
"warn": "#fbbf24",
|
||||
"error": "#f87171",
|
||||
},
|
||||
}
|
||||
|
||||
DEFAULT_THEME = "clair"
|
||||
|
||||
# Couleurs du bandeau de statut licence selon l'état métier.
|
||||
STATUS_COLORS = {
|
||||
"active": "ok",
|
||||
"grace": "warn",
|
||||
"expired": "error",
|
||||
"revoked": "error",
|
||||
"invalid": "error",
|
||||
"unavailable": "warn",
|
||||
"none": "warn",
|
||||
}
|
||||
|
||||
|
||||
def theme_names() -> list[str]:
|
||||
return list(THEMES.keys())
|
||||
|
||||
|
||||
def get_theme(name: str) -> dict:
|
||||
return THEMES.get(name, THEMES[DEFAULT_THEME])
|
||||
|
||||
|
||||
def status_color(theme_name: str, status: str) -> str:
|
||||
"""Couleur hex pour un statut licence, dans le thème donné."""
|
||||
theme = get_theme(theme_name)
|
||||
return theme[STATUS_COLORS.get(status, "warn")]
|
||||
|
||||
|
||||
def apply_theme(name: str = DEFAULT_THEME) -> dict:
|
||||
"""Applique le thème à customtkinter. Import paresseux de ctk."""
|
||||
theme = get_theme(name)
|
||||
import customtkinter as ctk
|
||||
|
||||
ctk.set_appearance_mode(theme["appearance"])
|
||||
ctk.set_default_color_theme(theme["color_theme"])
|
||||
return theme
|
||||
@@ -34,3 +34,6 @@ python-doctr[torch]>=0.9.0
|
||||
|
||||
# (optionnel – si tu gardes spaCy dans d'autres chemins)
|
||||
# spacy==3.7.4
|
||||
|
||||
# GUI V6 (customtkinter) — interface refondue, embarquée dans l'EXE
|
||||
customtkinter==5.2.2
|
||||
|
||||
179
tests/unit/test_gui_v6_license_client.py
Normal file
179
tests/unit/test_gui_v6_license_client.py
Normal file
@@ -0,0 +1,179 @@
|
||||
"""Tests du client licence : session HTTP injectée, aucun appel réseau réel.
|
||||
|
||||
Le client accepte une session mockable. On vérifie : activation réussie +
|
||||
stockage local, refus serveur (4xx), serveur indisponible (exception réseau),
|
||||
réponse illisible, check, et lecture du statut local.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from gui_v6.license_client import LicenseClient, LicenseStatus
|
||||
from gui_v6.license_store import LicenseStore
|
||||
|
||||
|
||||
class FakeResponse:
|
||||
def __init__(self, status_code, payload=None, raise_on_json=False):
|
||||
self.status_code = status_code
|
||||
self._payload = payload
|
||||
self._raise_on_json = raise_on_json
|
||||
|
||||
def json(self):
|
||||
if self._raise_on_json:
|
||||
raise ValueError("not json")
|
||||
return self._payload
|
||||
|
||||
|
||||
class FakeSession:
|
||||
"""Session HTTP mockable : enregistre les appels, renvoie des réponses scriptées."""
|
||||
|
||||
def __init__(self, response=None, exc=None):
|
||||
self._response = response
|
||||
self._exc = exc
|
||||
self.calls = []
|
||||
|
||||
def post(self, url, json, timeout):
|
||||
self.calls.append({"url": url, "json": json, "timeout": timeout})
|
||||
if self._exc is not None:
|
||||
raise self._exc
|
||||
return self._response
|
||||
|
||||
|
||||
def _client(tmp_path, session):
|
||||
store = LicenseStore(tmp_path / "license.json")
|
||||
return LicenseClient("https://portail.example/", session=session, store=store), store
|
||||
|
||||
|
||||
def test_activate_success_stores_license(tmp_path):
|
||||
payload = {
|
||||
"state": "active",
|
||||
"license": {
|
||||
"payload": {
|
||||
"license_ref": "LIC-9",
|
||||
"expires_at": "2027-06-01",
|
||||
"grace_days": 7,
|
||||
"machine_id": "MID-0001",
|
||||
},
|
||||
"signature": "signed",
|
||||
"alg": "RSASSA-PSS-SHA256",
|
||||
},
|
||||
}
|
||||
session = FakeSession(FakeResponse(200, payload))
|
||||
client, store = _client(tmp_path, session)
|
||||
|
||||
status = client.activate("TOKEN-ABC", "MID-0001")
|
||||
|
||||
assert isinstance(status, LicenseStatus)
|
||||
assert status.valid is True
|
||||
assert status.status == "active"
|
||||
assert status.license_ref == "LIC-9"
|
||||
assert status.grace_days == 7
|
||||
# La licence signée est stockée localement.
|
||||
assert store.load() == payload
|
||||
# L'URL et le payload envoyés sont corrects.
|
||||
assert session.calls[0]["url"] == "https://portail.example/api/v1/activate"
|
||||
assert session.calls[0]["json"] == {"token": "TOKEN-ABC", "machine_id": "MID-0001"}
|
||||
|
||||
|
||||
def test_activate_grace_is_valid(tmp_path):
|
||||
session = FakeSession(
|
||||
FakeResponse(200, {"state": "grace", "license": {"payload": {"grace_days": 3}}})
|
||||
)
|
||||
client, _ = _client(tmp_path, session)
|
||||
status = client.activate("T", "M")
|
||||
assert status.valid is True
|
||||
assert status.status == "grace"
|
||||
|
||||
|
||||
def test_activate_rejected_4xx_is_invalid_and_not_stored(tmp_path):
|
||||
session = FakeSession(FakeResponse(403, {"detail": "token revoked"}))
|
||||
client, store = _client(tmp_path, session)
|
||||
|
||||
status = client.activate("BAD", "MID-1")
|
||||
|
||||
assert status.valid is False
|
||||
assert status.status == "invalid"
|
||||
assert "revoked" in status.message
|
||||
# Rien n'est stocké en cas de refus.
|
||||
assert store.load() is None
|
||||
|
||||
|
||||
def test_activate_server_unavailable_does_not_crash(tmp_path):
|
||||
session = FakeSession(exc=ConnectionError("server down"))
|
||||
client, store = _client(tmp_path, session)
|
||||
|
||||
status = client.activate("T", "M")
|
||||
|
||||
assert status.valid is False
|
||||
assert status.status == "unavailable"
|
||||
assert store.load() is None
|
||||
|
||||
|
||||
def test_activate_unreadable_response_is_unavailable(tmp_path):
|
||||
session = FakeSession(FakeResponse(200, raise_on_json=True))
|
||||
client, _ = _client(tmp_path, session)
|
||||
status = client.activate("T", "M")
|
||||
assert status.status == "unavailable"
|
||||
|
||||
|
||||
def test_check_active(tmp_path):
|
||||
payload = {"state": "active", "license": {"payload": {"license_ref": "LIC-1"}}}
|
||||
session = FakeSession(FakeResponse(200, payload))
|
||||
client, store = _client(tmp_path, session)
|
||||
|
||||
status = client.check("LIC-1", "MID-1")
|
||||
|
||||
assert status.valid is True
|
||||
assert session.calls[0]["url"] == "https://portail.example/api/v1/check"
|
||||
assert session.calls[0]["json"] == {"license_ref": "LIC-1", "machine_id": "MID-1"}
|
||||
assert store.load() == payload
|
||||
|
||||
|
||||
def test_check_expired_is_not_valid(tmp_path):
|
||||
session = FakeSession(FakeResponse(200, {"state": "expired"}))
|
||||
client, _ = _client(tmp_path, session)
|
||||
status = client.check("LIC-1", "MID-1")
|
||||
assert status.valid is False
|
||||
assert status.status == "expired"
|
||||
|
||||
|
||||
def test_check_revoked_without_license_payload(tmp_path):
|
||||
session = FakeSession(FakeResponse(200, {"state": "revoked"}))
|
||||
client, store = _client(tmp_path, session)
|
||||
|
||||
status = client.check("LIC-1", "MID-1")
|
||||
|
||||
assert status.valid is False
|
||||
assert status.status == "revoked"
|
||||
assert store.load() is None
|
||||
|
||||
|
||||
def test_check_server_unavailable(tmp_path):
|
||||
session = FakeSession(exc=TimeoutError("timeout"))
|
||||
client, _ = _client(tmp_path, session)
|
||||
status = client.check("LIC-1", "MID-1")
|
||||
assert status.status == "unavailable"
|
||||
|
||||
|
||||
def test_local_status_none_when_empty(tmp_path):
|
||||
session = FakeSession(FakeResponse(200, {}))
|
||||
client, _ = _client(tmp_path, session)
|
||||
status = client.local_status()
|
||||
assert status.valid is False
|
||||
assert status.status == "none"
|
||||
|
||||
|
||||
def test_local_status_reads_store(tmp_path):
|
||||
store = LicenseStore(tmp_path / "license.json")
|
||||
store.save({"state": "active", "license": {"payload": {"license_ref": "LIC-7"}}})
|
||||
client = LicenseClient("https://x", session=FakeSession(), store=store)
|
||||
status = client.local_status()
|
||||
assert status.valid is True
|
||||
assert status.license_ref == "LIC-7"
|
||||
|
||||
|
||||
def test_status_never_exposes_token():
|
||||
# Le statut ne porte pas de token : la repr ne peut pas le fuiter.
|
||||
status = LicenseStatus.from_payload({"status": "active", "license_ref": "LIC-1"})
|
||||
assert "token" not in repr(status).lower()
|
||||
91
tests/unit/test_gui_v6_license_store.py
Normal file
91
tests/unit/test_gui_v6_license_store.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""Tests du stockage local de licence (vrais fichiers, aucun mock).
|
||||
|
||||
Couvre : résolution du chemin par plateforme, round-trip save/load, création
|
||||
du répertoire parent, suppression, robustesse au JSON corrompu.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from gui_v6.license_store import LicenseStore, default_license_path
|
||||
|
||||
|
||||
def test_default_path_windows(monkeypatch):
|
||||
monkeypatch.setattr("sys.platform", "win32")
|
||||
monkeypatch.setenv("LOCALAPPDATA", r"C:\Users\dom\AppData\Local")
|
||||
path = default_license_path()
|
||||
parts = path.as_posix()
|
||||
assert parts.endswith("Aivanov/Anonymisation/license.json")
|
||||
|
||||
|
||||
def test_default_path_linux(monkeypatch):
|
||||
monkeypatch.setattr("sys.platform", "linux")
|
||||
monkeypatch.delenv("XDG_DATA_HOME", raising=False)
|
||||
monkeypatch.setattr(Path, "home", classmethod(lambda cls: Path("/home/tester")))
|
||||
path = default_license_path()
|
||||
assert path == Path("/home/tester/.local/share/aivanov/anonymisation/license.json")
|
||||
|
||||
|
||||
def test_default_path_linux_respects_xdg(monkeypatch):
|
||||
monkeypatch.setattr("sys.platform", "linux")
|
||||
monkeypatch.setenv("XDG_DATA_HOME", "/custom/data")
|
||||
path = default_license_path()
|
||||
assert path == Path("/custom/data/aivanov/anonymisation/license.json")
|
||||
|
||||
|
||||
def test_save_creates_parent_dir_and_roundtrips(tmp_path):
|
||||
target = tmp_path / "nested" / "dir" / "license.json"
|
||||
store = LicenseStore(target)
|
||||
assert not store.exists()
|
||||
|
||||
payload = {"status": "active", "license_ref": "LIC-123", "expires_at": "2027-01-01"}
|
||||
store.save(payload)
|
||||
|
||||
assert target.is_file()
|
||||
assert store.exists()
|
||||
assert store.load() == payload
|
||||
# Vérifie aussi le contenu sur disque (vrai fichier JSON).
|
||||
on_disk = json.loads(target.read_text(encoding="utf-8"))
|
||||
assert on_disk["license_ref"] == "LIC-123"
|
||||
|
||||
|
||||
def test_load_absent_returns_none(tmp_path):
|
||||
store = LicenseStore(tmp_path / "absent.json")
|
||||
assert store.load() is None
|
||||
|
||||
|
||||
def test_load_corrupted_returns_none(tmp_path):
|
||||
target = tmp_path / "license.json"
|
||||
target.write_text("{ this is not valid json", encoding="utf-8")
|
||||
store = LicenseStore(target)
|
||||
assert store.load() is None
|
||||
|
||||
|
||||
def test_load_non_object_returns_none(tmp_path):
|
||||
target = tmp_path / "license.json"
|
||||
target.write_text("[1, 2, 3]", encoding="utf-8")
|
||||
store = LicenseStore(target)
|
||||
assert store.load() is None
|
||||
|
||||
|
||||
def test_delete(tmp_path):
|
||||
target = tmp_path / "license.json"
|
||||
store = LicenseStore(target)
|
||||
store.save({"status": "active"})
|
||||
assert store.delete() is True
|
||||
assert not target.exists()
|
||||
# Suppression idempotente : un second delete ne lève pas.
|
||||
assert store.delete() is False
|
||||
|
||||
|
||||
def test_save_overwrites(tmp_path):
|
||||
store = LicenseStore(tmp_path / "license.json")
|
||||
store.save({"status": "active", "v": 1})
|
||||
store.save({"status": "grace", "v": 2})
|
||||
loaded = store.load()
|
||||
assert loaded["status"] == "grace"
|
||||
assert loaded["v"] == 2
|
||||
165
tests/unit/test_gui_v6_processing_runner.py
Normal file
165
tests/unit/test_gui_v6_processing_runner.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Tests du runner G2 : process_fn injectée, vrais fichiers tmp, aucun moteur réel."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from gui_v6.processing_runner import (
|
||||
ProcessingRunner,
|
||||
RunSummary,
|
||||
default_output_dir,
|
||||
discover_documents,
|
||||
)
|
||||
|
||||
_EXTS = (".pdf", ".txt")
|
||||
|
||||
|
||||
def _touch(path: Path) -> Path:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("x", encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
# -- découverte & chemins --------------------------------------------------
|
||||
|
||||
def test_discover_single_file(tmp_path):
|
||||
f = _touch(tmp_path / "doc.pdf")
|
||||
assert discover_documents(f, _EXTS) == [f]
|
||||
|
||||
|
||||
def test_discover_single_file_unsupported(tmp_path):
|
||||
f = _touch(tmp_path / "doc.xyz")
|
||||
assert discover_documents(f, _EXTS) == []
|
||||
|
||||
|
||||
def test_discover_folder_sorted_and_skips_output(tmp_path):
|
||||
_touch(tmp_path / "b.pdf")
|
||||
_touch(tmp_path / "a.pdf")
|
||||
_touch(tmp_path / "note.txt")
|
||||
_touch(tmp_path / "anonymise" / "already.pdf") # sous-arbre de sortie ignoré
|
||||
found = discover_documents(tmp_path, _EXTS)
|
||||
names = [p.name for p in found]
|
||||
assert names == ["a.pdf", "b.pdf", "note.txt"]
|
||||
|
||||
|
||||
def test_default_output_dir_file_and_dir(tmp_path):
|
||||
f = _touch(tmp_path / "doc.pdf")
|
||||
assert default_output_dir(f) == tmp_path / "anonymise"
|
||||
assert default_output_dir(tmp_path) == tmp_path / "anonymise"
|
||||
|
||||
|
||||
# -- exécution -------------------------------------------------------------
|
||||
|
||||
def test_run_processes_all_docs(tmp_path):
|
||||
_touch(tmp_path / "a.pdf")
|
||||
_touch(tmp_path / "b.pdf")
|
||||
calls = []
|
||||
runner = ProcessingRunner(process_fn=lambda d, o: calls.append((d, o)) or {}, extensions=_EXTS)
|
||||
|
||||
summary = runner.run(tmp_path)
|
||||
|
||||
assert isinstance(summary, RunSummary)
|
||||
assert summary.total == 2
|
||||
assert summary.succeeded == 2
|
||||
assert summary.failed == 0
|
||||
assert summary.ok is True
|
||||
assert len(calls) == 2
|
||||
# Le dossier de sortie par défaut a été créé.
|
||||
assert (tmp_path / "anonymise").is_dir()
|
||||
|
||||
|
||||
def test_run_single_file_uses_output_dir(tmp_path):
|
||||
f = _touch(tmp_path / "doc.pdf")
|
||||
out = tmp_path / "sortie"
|
||||
seen = {}
|
||||
runner = ProcessingRunner(process_fn=lambda d, o: seen.update(doc=d, out=o) or {}, extensions=_EXTS)
|
||||
|
||||
summary = runner.run(f, output_dir=out)
|
||||
|
||||
assert summary.total == 1 and summary.succeeded == 1
|
||||
assert seen["doc"] == f
|
||||
assert seen["out"] == out
|
||||
assert out.is_dir()
|
||||
|
||||
|
||||
def test_run_continues_after_failure(tmp_path):
|
||||
_touch(tmp_path / "a.pdf")
|
||||
_touch(tmp_path / "boom.pdf")
|
||||
_touch(tmp_path / "c.pdf")
|
||||
|
||||
def proc(doc, out):
|
||||
if doc.name == "boom.pdf":
|
||||
raise RuntimeError("explosion")
|
||||
return {}
|
||||
|
||||
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
|
||||
summary = runner.run(tmp_path)
|
||||
|
||||
assert summary.total == 3
|
||||
assert summary.succeeded == 2
|
||||
assert summary.failed == 1
|
||||
assert summary.ok is False
|
||||
assert summary.errors[0][0] == "boom.pdf"
|
||||
assert "explosion" in summary.errors[0][1]
|
||||
|
||||
|
||||
def test_run_empty_folder(tmp_path):
|
||||
logs = []
|
||||
runner = ProcessingRunner(process_fn=lambda d, o: {}, extensions=_EXTS)
|
||||
summary = runner.run(tmp_path, on_log=logs.append)
|
||||
assert summary.total == 0
|
||||
assert any("Aucun document" in m for m in logs)
|
||||
|
||||
|
||||
def test_stop_event_interrupts_between_docs(tmp_path):
|
||||
for name in ("a.pdf", "b.pdf", "c.pdf"):
|
||||
_touch(tmp_path / name)
|
||||
stop = threading.Event()
|
||||
processed = []
|
||||
|
||||
def proc(doc, out):
|
||||
processed.append(doc.name)
|
||||
stop.set() # demande l'arrêt après le 1er document
|
||||
return {}
|
||||
|
||||
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
|
||||
summary = runner.run(tmp_path, stop_event=stop)
|
||||
|
||||
assert summary.stopped is True
|
||||
assert summary.succeeded == 1
|
||||
assert len(processed) == 1 # arrêt effectif entre deux documents
|
||||
|
||||
|
||||
def test_progress_callbacks(tmp_path):
|
||||
_touch(tmp_path / "a.pdf")
|
||||
_touch(tmp_path / "b.pdf")
|
||||
events = []
|
||||
runner = ProcessingRunner(process_fn=lambda d, o: {}, extensions=_EXTS)
|
||||
runner.run(tmp_path, on_progress=lambda done, total, name: events.append((done, total)))
|
||||
assert (2, 2) in events # progression finale atteinte
|
||||
|
||||
|
||||
def test_no_double_run(tmp_path):
|
||||
_touch(tmp_path / "a.pdf")
|
||||
started = threading.Event()
|
||||
release = threading.Event()
|
||||
result = {}
|
||||
|
||||
def proc(doc, out):
|
||||
started.set()
|
||||
release.wait(timeout=2)
|
||||
return {}
|
||||
|
||||
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
|
||||
worker = threading.Thread(target=lambda: runner.run(tmp_path))
|
||||
worker.start()
|
||||
assert started.wait(timeout=2)
|
||||
# Pendant le run, un second lancement est refusé.
|
||||
with pytest.raises(RuntimeError):
|
||||
runner.run(tmp_path)
|
||||
release.set()
|
||||
worker.join(timeout=2)
|
||||
assert runner.is_running is False
|
||||
Reference in New Issue
Block a user