diff --git a/crates/provider/src/docker/client.rs b/crates/provider/src/docker/client.rs index 557f10df92420029a954f2d13ee9e81a39b0b9ec..28a7f68d168b1b1b54d48385845f376cfa23e8e9 100644 --- a/crates/provider/src/docker/client.rs +++ b/crates/provider/src/docker/client.rs @@ -1,9 +1,10 @@ use std::{collections::HashMap, path::Path}; use anyhow::anyhow; +use futures::future::try_join_all; use serde::{Deserialize, Deserializer}; use tokio::process::Command; -use tracing::trace; +use tracing::{info, trace}; use crate::types::{ExecutionResult, Port}; @@ -28,6 +29,7 @@ pub struct ContainerRunOptions { entrypoint: Option<String>, port_mapping: HashMap<Port, Port>, rm: bool, + detach: bool, } enum Container { @@ -115,6 +117,7 @@ impl ContainerRunOptions { entrypoint: None, port_mapping: HashMap::default(), rm: false, + detach: true, // add -d flag by default } } @@ -168,6 +171,11 @@ impl ContainerRunOptions { self.rm = true; self } + + pub fn detach(mut self, choice: bool) -> Self { + self.detach = choice; + self + } } impl DockerClient { @@ -233,7 +241,11 @@ impl DockerClient { pub async fn container_run(&self, options: ContainerRunOptions) -> Result<String> { let mut cmd = self.client_command(); - cmd.args(["run", "-d", "--platform", "linux/amd64"]); + cmd.args(["run", "--platform", "linux/amd64"]); + + if options.detach { + cmd.arg("-d"); + } Self::apply_cmd_options(&mut cmd, &options); @@ -425,9 +437,12 @@ impl DockerClient { }) .collect(); - for name in container_names { - self.container_rm(&name).await?; - } + info!("{:?}", container_names); + let futures = container_names + .iter() + .map(|name| self.container_rm(name)) + .collect::<Vec<_>>(); + try_join_all(futures).await?; Ok(()) } diff --git a/crates/provider/src/docker/namespace.rs b/crates/provider/src/docker/namespace.rs index ddea343940426481dffbdc059bbf9fef3f9ed4ea..b48627bda6450d700f34265b235cb22455b705fc 100644 --- a/crates/provider/src/docker/namespace.rs +++ b/crates/provider/src/docker/namespace.rs @@ -2,6 +2,7 @@ use std::{ collections::HashMap, path::{Path, PathBuf}, sync::{Arc, Weak}, + thread, }; use anyhow::anyhow; @@ -178,6 +179,7 @@ where async fn initialize_helper_binaries_volume(&self) -> Result<(), ProviderError> { let helper_binaries_volume_name = format!("{}-helper-binaries", self.name); + let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name); self.docker_client .create_volume(&helper_binaries_volume_name) @@ -186,15 +188,23 @@ where // download binaries to volume self.docker_client - .container_create( + .container_run( ContainerRunOptions::new( "alpine:latest", vec!["ash", "/scripts/helper-binaries-downloader.sh"], ) - .volume_mounts(HashMap::from([( - helper_binaries_volume_name.as_str(), - "/helpers", - )])) + .volume_mounts(HashMap::from([ + ( + helper_binaries_volume_name.as_str(), + "/helpers", + ), + ( + zombie_wrapper_volume_name.as_ref(), + "/scripts", + ) + ])) + // wait until complete + .detach(false) .rm(), ) .await @@ -216,9 +226,18 @@ where Ok(()) } - pub async fn delete_on_drop(&self, delete_on_drop: bool) { + pub async fn set_delete_on_drop(&self, delete_on_drop: bool) { *self.delete_on_drop.lock().await = delete_on_drop; } + + pub async fn delete_on_drop(&self) -> bool { + if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() { + *delete_on_drop + } else { + // if we can't lock just remove the ns + true + } + } } #[async_trait] @@ -239,7 +258,11 @@ where } async fn detach(&self) { - self.delete_on_drop(false).await; + self.set_delete_on_drop(false).await; + } + + async fn is_detached(&self) -> bool { + self.delete_on_drop().await } async fn nodes(&self) -> HashMap<String, DynNode> { @@ -389,15 +412,28 @@ where if *delete_on_drop { let client = self.docker_client.clone(); let provider = self.provider.upgrade(); - futures::executor::block_on(async move { - trace!("🧟 deleting ns {ns_name} from cluster"); - let _ = client.namespaced_containers_rm(&ns_name).await; - if let Some(provider) = provider { - provider.namespaces.write().await.remove(&ns_name); - } - trace!("✅ deleted"); + let handler = thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + trace!("🧟 deleting ns {ns_name} from cluster"); + let _ = client.namespaced_containers_rm(&ns_name).await; + trace!("✅ deleted"); + }); }); + + if handler.join().is_ok() { + if let Some(provider) = provider { + if let Ok(mut p) = provider.namespaces.try_write() { + p.remove(&self.name); + } else { + warn!( + "âš ï¸ Can not acquire write lock to the provider, ns {} not removed", + self.name + ); + } + } + } } else { trace!("âš ï¸ leaking ns {ns_name} in cluster"); } diff --git a/crates/provider/src/docker/provider.rs b/crates/provider/src/docker/provider.rs index 6f72276e4b473edaf9c815fcffd64fc328b6d496..03957f9ba5e8d6b3d945303aea6bc6d2e80cff92 100644 --- a/crates/provider/src/docker/provider.rs +++ b/crates/provider/src/docker/provider.rs @@ -29,12 +29,12 @@ where impl<FS> DockerProvider<FS> where - FS: FileSystem + Send + Sync + Clone, + FS: FileSystem + Send + Sync + Clone + 'static, { pub async fn new(filesystem: FS) -> Arc<Self> { let docker_client = DockerClient::new().await.unwrap(); - Arc::new_cyclic(|weak| DockerProvider { + let provider = Arc::new_cyclic(|weak| DockerProvider { weak: weak.clone(), capabilities: ProviderCapabilities { requires_image: true, @@ -46,7 +46,23 @@ where docker_client, filesystem, namespaces: RwLock::new(HashMap::new()), - }) + }); + + let cloned_provider = provider.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + for (_, ns) in cloned_provider.namespaces().await { + if ns.is_detached().await { + // best effort + let _ = ns.destroy().await; + } + } + + // exit the process (130, SIGINT) + std::process::exit(130) + }); + + provider } pub fn tmp_dir(mut self, tmp_dir: impl Into<PathBuf>) -> Self { diff --git a/crates/provider/src/kubernetes/namespace.rs b/crates/provider/src/kubernetes/namespace.rs index c8792555efacfdfdc18c57e8d7489289cea7c4a1..62d0ae94ba364ee5e8df92cdea019b8a03bed39d 100644 --- a/crates/provider/src/kubernetes/namespace.rs +++ b/crates/provider/src/kubernetes/namespace.rs @@ -321,9 +321,18 @@ where Ok(()) } - pub async fn delete_on_drop(&self, delete_on_drop: bool) { + pub async fn set_delete_on_drop(&self, delete_on_drop: bool) { *self.delete_on_drop.lock().await = delete_on_drop; } + + pub async fn delete_on_drop(&self) -> bool { + if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() { + *delete_on_drop + } else { + // if we can't lock just remove the ns + true + } + } } impl<FS> Drop for KubernetesNamespace<FS> @@ -370,7 +379,11 @@ where } async fn detach(&self) { - self.delete_on_drop(false).await; + self.set_delete_on_drop(false).await; + } + + async fn is_detached(&self) -> bool { + self.delete_on_drop().await } async fn nodes(&self) -> HashMap<String, DynNode> { diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 68563b85afacecf245677eb7d2082c951d48d4ed..2e87cfb19a49357b1ab5f1dbdef45e5adcbc4ab4 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -144,6 +144,11 @@ pub trait ProviderNamespace { warn!("Detach is not implemented for {}", self.name()); } + async fn is_detached(&self) -> bool { + // false by default + false + } + async fn nodes(&self) -> HashMap<String, DynNode>; async fn get_node_available_args(