//! Client HTTP pour la communication avec le serveur streaming. //! //! Gère l'envoi des heartbeats (screenshots périodiques), //! le polling des actions replay, et le rapport des résultats. //! Compatible avec l'API de agent_v0/server_v1/api_stream.py (port 5005). use crate::config::Config; use crate::sysinfo; use reqwest::blocking::{Client, RequestBuilder}; use serde::{Deserialize, Serialize}; /// Ajoute le header Authorization Bearer si un token est configure. /// /// Si `config.api_token` est vide, la requete est retournee telle quelle. pub fn with_auth(request: RequestBuilder, config: &Config) -> RequestBuilder { if config.api_token.is_empty() { request } else { request.header("Authorization", format!("Bearer {}", config.api_token)) } } /// Action de replay reçue du serveur. /// /// Format identique à celui du Python executor (agent_v1/core/executor.py). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Action { /// Identifiant unique de l'action #[serde(default)] pub action_id: String, /// Type d'action : "click", "type", "key_combo", "scroll", "wait" #[serde(rename = "type")] pub action_type: String, /// Coordonnée X normalisée (0.0 à 1.0) #[serde(default)] pub x_pct: f64, /// Coordonnée Y normalisée (0.0 à 1.0) #[serde(default)] pub y_pct: f64, /// Texte à taper (pour action "type") #[serde(default)] pub text: String, /// Liste de touches (pour action "key_combo") #[serde(default)] pub keys: Vec, /// Bouton de souris : "left", "right", "double" #[serde(default = "default_button")] pub button: String, /// Durée d'attente en ms (pour action "wait") #[serde(default = "default_duration")] pub duration_ms: u64, /// Delta de scroll (pour action "scroll") #[serde(default)] pub delta: i32, /// Mode visuel (résolution par le serveur) #[serde(default)] pub visual_mode: bool, /// Spécification de la cible visuelle #[serde(default)] pub target_spec: serde_json::Value, } fn default_button() -> String { "left".to_string() } fn default_duration() -> u64 { 500 } /// Résultat d'exécution d'une action. #[derive(Debug, Serialize, Deserialize)] pub struct ActionResult { pub action_id: String, pub success: bool, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, #[serde(skip_serializing_if = "Option::is_none")] pub screenshot: Option, } impl ActionResult { /// Crée un résultat d'erreur. pub fn error(action_id: &str, msg: &str) -> Self { ActionResult { action_id: action_id.to_string(), success: false, error: Some(msg.to_string()), screenshot: None, } } /// Crée un résultat de succès. pub fn ok(action_id: &str) -> Self { ActionResult { action_id: action_id.to_string(), success: true, error: None, screenshot: None, } } } /// Envoie un heartbeat (screenshot) au serveur streaming. /// /// POST /traces/stream/image avec le screenshot en multipart. /// Inclut les métadonnées système (DPI, résolution, fenêtre, moniteur) /// dans les query params pour que le serveur puisse les exploiter. /// Retourne true si l'envoi a réussi. pub fn send_heartbeat( client: &Client, config: &Config, jpeg_bytes: &[u8], session_id: &str, ) -> bool { let url = format!("{}/image", config.streaming_url()); let shot_id = format!("heartbeat_{}", chrono::Utc::now().timestamp()); // Collecter les métadonnées système let meta = sysinfo::get_screen_metadata(); let dpi_str = meta.dpi_scale.to_string(); let screen_w_str = meta.screen_resolution[0].to_string(); let screen_h_str = meta.screen_resolution[1].to_string(); let monitor_str = meta.monitor_index.to_string(); // Sérialiser window_bounds en JSON compact (ou "null") let wb_str = match meta.window_bounds { Some(wb) => format!("[{},{},{},{}]", wb[0], wb[1], wb[2], wb[3]), None => "null".to_string(), }; let part = reqwest::blocking::multipart::Part::bytes(jpeg_bytes.to_vec()) .file_name("screenshot.jpg") .mime_str("image/jpeg") .unwrap_or_else(|_| { reqwest::blocking::multipart::Part::bytes(jpeg_bytes.to_vec()) .file_name("screenshot.jpg") }); let form = reqwest::blocking::multipart::Form::new().part("file", part); let request = client .post(&url) .query(&[ ("session_id", session_id), ("shot_id", &shot_id), ("machine_id", &config.machine_id), ("dpi_scale", &dpi_str), ("screen_w", &screen_w_str), ("screen_h", &screen_h_str), ("monitor_index", &monitor_str), ("window_bounds", &wb_str), ]) .multipart(form) .timeout(std::time::Duration::from_secs(10)); match with_auth(request, config).send() { Ok(resp) => { if resp.status().is_success() { true } else { eprintln!( "[HEARTBEAT] Envoi echoue : HTTP {}", resp.status() ); false } } Err(e) => { // Log discret pour ne pas spammer la console eprintln!("[HEARTBEAT] Erreur reseau : {}", e); false } } } /// Réponse du serveur pour GET /replay/next #[derive(Debug, Deserialize)] struct ReplayNextResponse { action: Option, } /// Poll le serveur pour récupérer la prochaine action de replay. /// /// GET /traces/stream/replay/next?session_id=...&machine_id=... /// Retourne None si pas d'action en attente ou si le serveur est indisponible. pub fn poll_next_action(client: &Client, config: &Config) -> Option { let url = format!("{}/replay/next", config.streaming_url()); let session_id = config.agent_session_id(); let request = client .get(&url) .query(&[ ("session_id", session_id.as_str()), ("machine_id", config.machine_id.as_str()), ]) .timeout(std::time::Duration::from_secs(5)); let resp = with_auth(request, config).send().ok()?; if !resp.status().is_success() { return None; } let data: ReplayNextResponse = resp.json().ok()?; data.action } /// Informations résumées d'un workflow disponible. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowInfo { /// Identifiant unique du workflow pub workflow_id: String, /// Nom lisible du workflow #[serde(default)] pub name: String, /// Identifiant machine associé #[serde(default)] pub machine_id: String, /// Nombre de nœuds #[serde(default)] pub nodes: u32, /// Nombre de transitions #[serde(default)] pub edges: u32, } /// Réponse du serveur pour GET /traces/stream/workflows #[derive(Debug, Deserialize)] struct WorkflowsResponse { #[serde(default)] workflows: Vec, } /// Récupère la liste des workflows disponibles pour cette machine. /// /// GET /traces/stream/workflows?machine_id= /// Sauvegarde le résultat dans workflows.json à côté de l'exécutable. /// Retourne la liste (éventuellement depuis le cache local si le serveur est indisponible). pub fn fetch_workflows(client: &Client, config: &Config) -> Vec { let url = format!("{}/workflows", config.streaming_url()); let request = client .get(&url) .query(&[("machine_id", config.machine_id.as_str())]) .timeout(std::time::Duration::from_secs(5)); let workflows = match with_auth(request, config).send() { Ok(resp) if resp.status().is_success() => { match resp.json::() { Ok(data) => data.workflows, Err(e) => { eprintln!("[WORKFLOWS] Erreur parsing reponse : {}", e); Vec::new() } } } Ok(resp) => { eprintln!("[WORKFLOWS] Serveur HTTP {} — chargement cache local", resp.status()); return load_workflows_cache(); } Err(e) => { eprintln!("[WORKFLOWS] Serveur injoignable ({}) — chargement cache local", e); return load_workflows_cache(); } }; // Sauvegarder dans le cache local save_workflows_cache(&workflows); workflows } /// Chemin du fichier cache workflows.json (à côté de l'exécutable ou dans le dossier courant). fn workflows_cache_path() -> std::path::PathBuf { if let Ok(exe) = std::env::current_exe() { if let Some(dir) = exe.parent() { return dir.join("workflows.json"); } } std::path::PathBuf::from("workflows.json") } /// Sauvegarde les workflows dans le cache local. fn save_workflows_cache(workflows: &[WorkflowInfo]) { let path = workflows_cache_path(); match serde_json::to_string_pretty(workflows) { Ok(json) => { if let Err(e) = std::fs::write(&path, json) { eprintln!("[WORKFLOWS] Erreur ecriture cache {} : {}", path.display(), e); } } Err(e) => { eprintln!("[WORKFLOWS] Erreur serialisation cache : {}", e); } } } /// Charge les workflows depuis le cache local. fn load_workflows_cache() -> Vec { let path = workflows_cache_path(); match std::fs::read_to_string(&path) { Ok(content) => { match serde_json::from_str::>(&content) { Ok(workflows) => { println!("[WORKFLOWS] {} workflow(s) charges depuis le cache local", workflows.len()); workflows } Err(e) => { eprintln!("[WORKFLOWS] Erreur parsing cache : {}", e); Vec::new() } } } Err(_) => Vec::new(), // Pas de cache, pas d'erreur } } /// Rapporte le résultat d'une action au serveur. /// /// POST /traces/stream/replay/result avec le résultat en JSON. pub fn report_result(client: &Client, config: &Config, result: &ActionResult) -> bool { let url = format!("{}/replay/result", config.streaming_url()); let session_id = config.agent_session_id(); #[derive(Serialize)] struct Report<'a> { session_id: &'a str, action_id: &'a str, success: bool, error: &'a Option, screenshot: &'a Option, } let report = Report { session_id: &session_id, action_id: &result.action_id, success: result.success, error: &result.error, screenshot: &result.screenshot, }; let request = client .post(&url) .json(&report) .timeout(std::time::Duration::from_secs(10)); match with_auth(request, config).send() { Ok(resp) => { if resp.status().is_success() { if let Ok(data) = resp.json::() { let status = data.get("replay_status") .and_then(|v| v.as_str()) .unwrap_or("?"); let remaining = data.get("remaining_actions") .and_then(|v| v.as_i64()) .unwrap_or(-1); println!( " [RESULT] Rapporte : status={}, restant={}", status, remaining ); } true } else { eprintln!( " [RESULT] Rapport echoue : HTTP {}", resp.status() ); false } } Err(e) => { eprintln!(" [RESULT] Erreur reseau : {}", e); false } } }