diff --git a/crates/orchestrator/src/spawner.rs b/crates/orchestrator/src/spawner.rs index c5e9dd782753c51887a271c5bfa020a4a5f20026..af355cd09ac395be3b451d670d921050f0690cfc 100644 --- a/crates/orchestrator/src/spawner.rs +++ b/crates/orchestrator/src/spawner.rs @@ -1,9 +1,9 @@ -use std::path::PathBuf; +use std::{collections::HashMap, path::PathBuf}; use anyhow::Context; use configuration::shared::constants::THIS_IS_A_BUG; use provider::{ - constants::{LOCALHOST, NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR}, + constants::{LOCALHOST, NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, P2P_PORT}, shared::helpers::running_in_ci, types::{SpawnNodeOptions, TransferedFile}, DynNamespace, @@ -149,6 +149,21 @@ where args.join(" ") ); + let ports = if ctx.ns.capabilities().use_default_ports_in_cmd { + // should use default ports to as internal + [ + (P2P_PORT, node.p2p_port.0), + (RPC_PORT, node.rpc_port.0), + (PROMETHEUS_PORT, node.prometheus_port.0), + ] + } else { + [ + (P2P_PORT, P2P_PORT), + (RPC_PORT, RPC_PORT), + (PROMETHEUS_PORT, PROMETHEUS_PORT), + ] + }; + let spawn_ops = SpawnNodeOptions::new(node.name.clone(), program) .args(args) .env( @@ -158,7 +173,8 @@ where ) .injected_files(files_to_inject) .created_paths(created_paths) - .db_snapshot(node.db_snapshot.clone()); + .db_snapshot(node.db_snapshot.clone()) + .port_mapping(HashMap::from(ports)); let spawn_ops = if let Some(image) = node.image.as_ref() { spawn_ops.image(image.as_str()) @@ -196,7 +212,7 @@ where ports[1].unwrap_or(node.prometheus_port.0), ); } else { - // running in ci requrire to use ip and default port + // running in ci require to use ip and default port (rpc_port_external, prometheus_port_external) = (RPC_PORT, PROMETHEUS_PORT); ip_to_use = running_node.ip().await?; } diff --git a/crates/provider/src/docker.rs b/crates/provider/src/docker.rs new file mode 100644 index 0000000000000000000000000000000000000000..565db02111a2d906d8cd583270f98b8101ae81e1 --- /dev/null +++ b/crates/provider/src/docker.rs @@ -0,0 +1,6 @@ +mod client; +mod namespace; +mod node; +mod provider; + +pub use provider::DockerProvider; diff --git a/crates/provider/src/docker/client.rs b/crates/provider/src/docker/client.rs new file mode 100644 index 0000000000000000000000000000000000000000..2335a37976defed3fe4bece5f3d4f3184ced20a5 --- /dev/null +++ b/crates/provider/src/docker/client.rs @@ -0,0 +1,535 @@ +use std::{collections::HashMap, path::Path}; + +use anyhow::anyhow; +use serde::{Deserialize, Deserializer}; +use tokio::process::Command; +use tracing::trace; + +use crate::types::{ExecutionResult, Port}; + +#[derive(thiserror::Error, Debug)] +#[error(transparent)] +pub struct Error(#[from] anyhow::Error); + +pub type Result<T> = core::result::Result<T, Error>; + +#[derive(Clone)] +pub struct DockerClient { + using_podman: bool, +} + +#[derive(Debug)] +pub struct ContainerRunOptions { + image: String, + command: Vec<String>, + env: Option<Vec<(String, String)>>, + volume_mounts: Option<HashMap<String, String>>, + name: Option<String>, + entrypoint: Option<String>, + port_mapping: HashMap<Port, Port>, + rm: bool, +} + +enum Container { + Docker(DockerContainer), + Podman(PodmanContainer), +} + +// TODO: we may don't need this +#[allow(dead_code)] +#[derive(Deserialize, Debug)] +struct DockerContainer { + #[serde(alias = "Names", deserialize_with = "deserialize_list")] + names: Vec<String>, + #[serde(alias = "Ports", deserialize_with = "deserialize_list")] + ports: Vec<String>, + #[serde(alias = "State")] + state: String, +} + +// TODO: we may don't need this +#[allow(dead_code)] +#[derive(Deserialize, Debug)] +struct PodmanPort { + host_ip: String, + container_port: u16, + host_port: u16, + range: u16, + protocol: String, +} + +// TODO: we may don't need this +#[allow(dead_code)] +#[derive(Deserialize, Debug)] +struct PodmanContainer { + #[serde(alias = "Id")] + id: String, + #[serde(alias = "Image")] + image: String, + #[serde(alias = "Mounts")] + mounts: Vec<String>, + #[serde(alias = "Names")] + names: Vec<String>, + #[serde(alias = "Ports", deserialize_with = "deserialize_null_as_default")] + ports: Vec<PodmanPort>, + #[serde(alias = "State")] + state: String, +} + +fn deserialize_list<'de, D>(deserializer: D) -> std::result::Result<Vec<String>, D::Error> +where + D: Deserializer<'de>, +{ + let str_sequence = String::deserialize(deserializer)?; + Ok(str_sequence + .split(',') + .filter(|item| !item.is_empty()) + .map(|item| item.to_owned()) + .collect()) +} + +fn deserialize_null_as_default<'de, D, T>(deserializer: D) -> std::result::Result<T, D::Error> +where + T: Default + Deserialize<'de>, + D: Deserializer<'de>, +{ + let opt = Option::deserialize(deserializer)?; + Ok(opt.unwrap_or_default()) +} + +impl ContainerRunOptions { + pub fn new<S>(image: &str, command: Vec<S>) -> Self + where + S: Into<String> + std::fmt::Debug + Send + Clone, + { + ContainerRunOptions { + image: image.to_string(), + command: command + .clone() + .into_iter() + .map(|s| s.into()) + .collect::<Vec<_>>(), + env: None, + volume_mounts: None, + name: None, + entrypoint: None, + port_mapping: HashMap::default(), + rm: false, + } + } + + pub fn env<S>(mut self, env: Vec<(S, S)>) -> Self + where + S: Into<String> + std::fmt::Debug + Send + Clone, + { + self.env = Some( + env.into_iter() + .map(|(name, value)| (name.into(), value.into())) + .collect(), + ); + self + } + + pub fn volume_mounts<S>(mut self, volume_mounts: HashMap<S, S>) -> Self + where + S: Into<String> + std::fmt::Debug + Send + Clone, + { + self.volume_mounts = Some( + volume_mounts + .into_iter() + .map(|(source, target)| (source.into(), target.into())) + .collect(), + ); + self + } + + pub fn name<S>(mut self, name: S) -> Self + where + S: Into<String> + std::fmt::Debug + Send + Clone, + { + self.name = Some(name.into()); + self + } + + pub fn entrypoint<S>(mut self, entrypoint: S) -> Self + where + S: Into<String> + std::fmt::Debug + Send + Clone, + { + self.entrypoint = Some(entrypoint.into()); + self + } + + pub fn port_mapping(mut self, port_mapping: &HashMap<Port, Port>) -> Self { + self.port_mapping = port_mapping.clone(); + self + } + + pub fn rm(mut self) -> Self { + self.rm = true; + self + } +} + +impl DockerClient { + pub async fn new() -> Result<Self> { + let using_podman = Self::is_using_podman().await?; + + Ok(DockerClient { using_podman }) + } + + async fn is_using_podman() -> Result<bool> { + let result = tokio::process::Command::new("docker") + .arg("--version") + .output() + .await + .map_err(|err| anyhow!("Failed to detect container engine: {err}"))?; + + if !result.status.success() { + return Err(anyhow!( + "Failed to detect container engine: {}", + String::from_utf8_lossy(&result.stderr) + ) + .into()); + } + + Ok(String::from_utf8_lossy(&result.stdout).contains("podman")) + } +} + +impl DockerClient { + pub async fn create_volume(&self, name: &str) -> Result<()> { + let result = tokio::process::Command::new("docker") + .args(["volume", "create", name]) + .output() + .await + .map_err(|err| anyhow!("Failed to create volume '{name}': {err}"))?; + + if !result.status.success() { + return Err(anyhow!( + "Failed to create volume '{name}': {}", + String::from_utf8_lossy(&result.stderr) + ) + .into()); + } + + Ok(()) + } + + pub async fn container_run(&self, options: ContainerRunOptions) -> Result<String> { + let mut cmd = tokio::process::Command::new("docker"); + cmd.args(["run", "-d", "--platform", "linux/amd64"]); + + Self::apply_cmd_options(&mut cmd, &options); + + trace!("cmd: {:?}", cmd); + + let result = cmd.output().await.map_err(|err| { + anyhow!( + "Failed to run container with image '{image}' and command '{command}': {err}", + image = options.image, + command = options.command.join(" "), + ) + })?; + + if !result.status.success() { + return Err(anyhow!( + "Failed to run container with image '{image}' and command '{command}': {err}", + image = options.image, + command = options.command.join(" "), + err = String::from_utf8_lossy(&result.stderr) + ) + .into()); + } + + Ok(String::from_utf8_lossy(&result.stdout).to_string()) + } + + pub async fn container_create(&self, options: ContainerRunOptions) -> Result<String> { + let mut cmd = tokio::process::Command::new("docker"); + cmd.args(["container", "create"]); + + Self::apply_cmd_options(&mut cmd, &options); + + trace!("cmd: {:?}", cmd); + + let result = cmd.output().await.map_err(|err| { + anyhow!( + "Failed to run container with image '{image}' and command '{command}': {err}", + image = options.image, + command = options.command.join(" "), + ) + })?; + + if !result.status.success() { + return Err(anyhow!( + "Failed to run container with image '{image}' and command '{command}': {err}", + image = options.image, + command = options.command.join(" "), + err = String::from_utf8_lossy(&result.stderr) + ) + .into()); + } + + Ok(String::from_utf8_lossy(&result.stdout).to_string()) + } + + pub async fn container_exec<S>( + &self, + name: &str, + command: Vec<S>, + env: Option<Vec<(S, S)>>, + as_user: Option<S>, + ) -> Result<ExecutionResult> + where + S: Into<String> + std::fmt::Debug + Send + Clone, + { + let mut cmd = tokio::process::Command::new("docker"); + cmd.arg("exec"); + + if let Some(env) = env { + for env_var in env { + cmd.args(["-e", &format!("{}={}", env_var.0.into(), env_var.1.into())]); + } + } + + if let Some(user) = as_user { + cmd.args(["-u", user.into().as_ref()]); + } + + cmd.arg(name); + + cmd.args( + command + .clone() + .into_iter() + .map(|s| <S as Into<String>>::into(s)), + ); + + trace!("cmd is : {:?}", cmd); + + let result = cmd.output().await.map_err(|err| { + anyhow!( + "Failed to exec '{}' on '{}': {err}", + command + .into_iter() + .map(|s| <S as Into<String>>::into(s)) + .collect::<Vec<_>>() + .join(" "), + name, + ) + })?; + + if !result.status.success() { + return Ok(Err(( + result.status, + String::from_utf8_lossy(&result.stderr).to_string(), + ))); + } + + Ok(Ok(String::from_utf8_lossy(&result.stdout).to_string())) + } + + pub async fn container_cp( + &self, + name: &str, + local_path: &Path, + remote_path: &Path, + ) -> Result<()> { + let result = tokio::process::Command::new("docker") + .args([ + "cp", + local_path.to_string_lossy().as_ref(), + &format!("{name}:{}", remote_path.to_string_lossy().as_ref()), + ]) + .output() + .await + .map_err(|err| { + anyhow!( + "Failed copy file '{file}' to container '{name}': {err}", + file = local_path.to_string_lossy(), + ) + })?; + + if !result.status.success() { + return Err(anyhow!( + "Failed to copy file '{file}' to container '{name}': {err}", + file = local_path.to_string_lossy(), + err = String::from_utf8_lossy(&result.stderr) + ) + .into()); + } + + Ok(()) + } + + pub async fn container_rm(&self, name: &str) -> Result<()> { + let result = tokio::process::Command::new("docker") + .args(["rm", "--force", "--volumes", name]) + .output() + .await + .map_err(|err| anyhow!("Failed do remove container '{name}: {err}"))?; + + if !result.status.success() { + return Err(anyhow!( + "Failed to remove container '{name}': {err}", + err = String::from_utf8_lossy(&result.stderr) + ) + .into()); + } + + Ok(()) + } + + pub async fn namespaced_containers_rm(&self, namespace: &str) -> Result<()> { + let container_names: Vec<String> = self + .get_containers() + .await? + .into_iter() + .filter_map(|container| match container { + Container::Docker(container) => { + if let Some(name) = container.names.first() { + if name.starts_with(namespace) { + return Some(name.to_string()); + } + } + + None + }, + Container::Podman(container) => { + if let Some(name) = container.names.first() { + if name.starts_with(namespace) { + return Some(name.to_string()); + } + } + + None + }, + }) + .collect(); + + for name in container_names { + self.container_rm(&name).await?; + } + + Ok(()) + } + + pub async fn container_ip(&self, container_name: &str) -> Result<String> { + let ip = if self.using_podman { + "127.0.0.1".into() + } else { + let mut cmd = tokio::process::Command::new("docker"); + cmd.args(vec![ + "inspect", + "-f", + "{{ .NetworkSettings.IPAddress }}", + container_name, + ]); + + trace!("CMD: {cmd:?}"); + + let res = cmd + .output() + .await + .map_err(|err| anyhow!("Failed to get docker container ip, output: {err}"))?; + + String::from_utf8(res.stdout) + .map_err(|err| anyhow!("Failed to get docker container ip, output: {err}"))? + .trim() + .into() + }; + + trace!("IP: {ip}"); + Ok(ip) + } + + async fn get_containers(&self) -> Result<Vec<Container>> { + let containers = if self.using_podman { + self.get_podman_containers() + .await? + .into_iter() + .map(Container::Podman) + .collect() + } else { + self.get_docker_containers() + .await? + .into_iter() + .map(Container::Docker) + .collect() + }; + + Ok(containers) + } + + async fn get_podman_containers(&self) -> Result<Vec<PodmanContainer>> { + let res = tokio::process::Command::new("podman") + .args(vec!["ps", "--all", "--no-trunc", "--format", "json"]) + .output() + .await + .map_err(|err| anyhow!("Failed to get podman containers output: {err}"))?; + + let stdout = String::from_utf8_lossy(&res.stdout); + + let containers = serde_json::from_str(&stdout) + .map_err(|err| anyhow!("Failed to parse podman containers output: {err}"))?; + + Ok(containers) + } + + async fn get_docker_containers(&self) -> Result<Vec<DockerContainer>> { + let res = tokio::process::Command::new("docker") + .args(vec!["ps", "--all", "--no-trunc", "--format", "json"]) + .output() + .await + .unwrap(); + + let stdout = String::from_utf8_lossy(&res.stdout); + + let mut containers = vec![]; + for line in stdout.lines() { + containers.push( + serde_json::from_str::<DockerContainer>(line) + .map_err(|err| anyhow!("Failed to parse docker container output: {err}"))?, + ); + } + + Ok(containers) + } + + fn apply_cmd_options(cmd: &mut Command, options: &ContainerRunOptions) { + if options.rm { + cmd.arg("--rm"); + } + + if let Some(entrypoint) = options.entrypoint.as_ref() { + cmd.args(["--entrypoint", entrypoint]); + } + + if let Some(volume_mounts) = options.volume_mounts.as_ref() { + for (source, target) in volume_mounts { + cmd.args(["-v", &format!("{source}:{target}")]); + } + } + + if let Some(env) = options.env.as_ref() { + for env_var in env { + cmd.args(["-e", &format!("{}={}", env_var.0, env_var.1)]); + } + } + + // add published ports + for (container_port, host_port) in options.port_mapping.iter() { + cmd.args(["-p", &format!("{host_port}:{container_port}")]); + } + + if let Some(name) = options.name.as_ref() { + cmd.args(["--name", name]); + } + + cmd.arg(&options.image); + + for arg in &options.command { + cmd.arg(arg); + } + } +} diff --git a/crates/provider/src/docker/namespace.rs b/crates/provider/src/docker/namespace.rs new file mode 100644 index 0000000000000000000000000000000000000000..29dbeba4121a3897ecb7bf17b8bb61f0275718ed --- /dev/null +++ b/crates/provider/src/docker/namespace.rs @@ -0,0 +1,393 @@ +use std::{ + collections::HashMap, + path::PathBuf, + sync::{Arc, Weak}, +}; + +use anyhow::anyhow; +use async_trait::async_trait; +use configuration::shared::constants::THIS_IS_A_BUG; +use support::fs::FileSystem; +use tokio::sync::{Mutex, RwLock}; +use tracing::{debug, trace}; +use uuid::Uuid; + +use super::{ + client::{ContainerRunOptions, DockerClient}, + node::DockerNode, + DockerProvider, +}; +use crate::{ + constants::NAMESPACE_PREFIX, + docker::node::DockerNodeOptions, + types::{ + GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions, + SpawnNodeOptions, + }, + DynNode, ProviderError, ProviderNamespace, ProviderNode, +}; + +pub struct DockerNamespace<FS> +where + FS: FileSystem + Send + Sync + Clone, +{ + weak: Weak<DockerNamespace<FS>>, + #[allow(dead_code)] + provider: Weak<DockerProvider<FS>>, + name: String, + base_dir: PathBuf, + capabilities: ProviderCapabilities, + docker_client: DockerClient, + filesystem: FS, + delete_on_drop: Arc<Mutex<bool>>, + pub(super) nodes: RwLock<HashMap<String, Arc<DockerNode<FS>>>>, +} + +impl<FS> DockerNamespace<FS> +where + FS: FileSystem + Send + Sync + Clone + 'static, +{ + pub(super) async fn new( + provider: &Weak<DockerProvider<FS>>, + tmp_dir: &PathBuf, + capabilities: &ProviderCapabilities, + docker_client: &DockerClient, + filesystem: &FS, + ) -> Result<Arc<Self>, ProviderError> { + let name = format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4()); + let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]); + filesystem.create_dir(&base_dir).await?; + + let namespace = Arc::new_cyclic(|weak| DockerNamespace { + weak: weak.clone(), + provider: provider.clone(), + name, + base_dir, + capabilities: capabilities.clone(), + filesystem: filesystem.clone(), + docker_client: docker_client.clone(), + nodes: RwLock::new(HashMap::new()), + delete_on_drop: Arc::new(Mutex::new(true)), + }); + + namespace.initialize().await?; + + Ok(namespace) + } + + async fn initialize(&self) -> Result<(), ProviderError> { + // let ns_scripts_shared = PathBuf::from_iter([&self.base_dir, &PathBuf::from("shared-scripts")]); + // self.filesystem.create_dir(&ns_scripts_shared).await?; + self.initialize_zombie_scripts_volume().await?; + self.initialize_helper_binaries_volume().await?; + + Ok(()) + } + + async fn initialize_zombie_scripts_volume(&self) -> Result<(), ProviderError> { + let local_zombie_wrapper_path = + PathBuf::from_iter([&self.base_dir, &PathBuf::from("zombie-wrapper.sh")]); + + self.filesystem + .write( + &local_zombie_wrapper_path, + include_str!("../shared/scripts/zombie-wrapper.sh"), + ) + .await?; + + let local_helper_binaries_downloader_path = PathBuf::from_iter([ + &self.base_dir, + &PathBuf::from("helper-binaries-downloader.sh"), + ]); + + self.filesystem + .write( + &local_helper_binaries_downloader_path, + include_str!("../shared/scripts/helper-binaries-downloader.sh"), + ) + .await?; + + let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name); + let zombie_wrapper_container_name = format!("{}-scripts", self.name); + + self.docker_client + .create_volume(&zombie_wrapper_volume_name) + .await + .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?; + + self.docker_client + .container_create( + ContainerRunOptions::new("alpine:latest", vec!["tail", "-f", "/dev/null"]) + .volume_mounts(HashMap::from([( + zombie_wrapper_volume_name.as_str(), + "/scripts", + )])) + .name(&zombie_wrapper_container_name) + .rm(), + ) + .await + .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?; + + // copy the scripts + self.docker_client + .container_cp( + &zombie_wrapper_container_name, + &local_zombie_wrapper_path, + &PathBuf::from("/scripts/zombie-wrapper.sh"), + ) + .await + .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?; + + self.docker_client + .container_cp( + &zombie_wrapper_container_name, + &local_helper_binaries_downloader_path, + &PathBuf::from("/scripts/helper-binaries-downloader.sh"), + ) + .await + .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?; + + // set permissions for rwx on whole volume recursively + self.docker_client + .container_run( + ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/scripts"]) + .volume_mounts(HashMap::from([( + zombie_wrapper_volume_name.as_ref(), + "/scripts", + )])) + .rm(), + ) + .await + .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?; + + Ok(()) + } + + async fn initialize_helper_binaries_volume(&self) -> Result<(), ProviderError> { + let helper_binaries_volume_name = format!("{}-helper-binaries", self.name); + + self.docker_client + .create_volume(&helper_binaries_volume_name) + .await + .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?; + + // download binaries to volume + self.docker_client + .container_create( + ContainerRunOptions::new( + "alpine:latest", + vec!["ash", "/scripts/helper-binaries-downloader.sh"], + ) + .volume_mounts(HashMap::from([( + helper_binaries_volume_name.as_str(), + "/helpers", + )])) + .rm(), + ) + .await + .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?; + + // set permissions for rwx on whole volume recursively + self.docker_client + .container_run( + ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/helpers"]) + .volume_mounts(HashMap::from([( + helper_binaries_volume_name.as_ref(), + "/helpers", + )])) + .rm(), + ) + .await + .map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?; + + Ok(()) + } + + pub async fn delete_on_drop(&self, delete_on_drop: bool) { + *self.delete_on_drop.lock().await = delete_on_drop; + } +} + +#[async_trait] +impl<FS> ProviderNamespace for DockerNamespace<FS> +where + FS: FileSystem + Send + Sync + Clone + 'static, +{ + fn name(&self) -> &str { + &self.name + } + + fn base_dir(&self) -> &PathBuf { + &self.base_dir + } + + fn capabilities(&self) -> &ProviderCapabilities { + &self.capabilities + } + + async fn detach(&self) { + self.delete_on_drop(false).await; + } + + async fn nodes(&self) -> HashMap<String, DynNode> { + self.nodes + .read() + .await + .iter() + .map(|(name, node)| (name.clone(), node.clone() as DynNode)) + .collect() + } + + async fn get_node_available_args( + &self, + (command, image): (String, Option<String>), + ) -> Result<String, ProviderError> { + let node_image = image.expect(&format!("image should be present when getting node available args with docker provider {THIS_IS_A_BUG}")); + + let temp_node = self + .spawn_node( + &SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "cat".to_string()) + .image(node_image.clone()), + ) + .await?; + + let available_args_output = temp_node + .run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"])) + .await? + .map_err(|(_exit, status)| { + ProviderError::NodeAvailableArgsError(node_image, command, status) + })?; + + temp_node.destroy().await?; + + Ok(available_args_output) + } + + async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> { + debug!("spawn option {:?}", options); + if self.nodes.read().await.contains_key(&options.name) { + return Err(ProviderError::DuplicatedNodeName(options.name.clone())); + } + + let node = DockerNode::new(DockerNodeOptions { + namespace: &self.weak, + namespace_base_dir: &self.base_dir, + name: &options.name, + image: options.image.as_ref(), + program: &options.program, + args: &options.args, + env: &options.env, + startup_files: &options.injected_files, + db_snapshot: options.db_snapshot.as_ref(), + docker_client: &self.docker_client, + container_name: format!("{}-{}", self.name, options.name), + filesystem: &self.filesystem, + port_mapping: options.port_mapping.as_ref().unwrap_or(&HashMap::default()), + }) + .await?; + + self.nodes + .write() + .await + .insert(node.name().to_string(), node.clone()); + + Ok(node) + } + + async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> { + debug!("generate files options {options:#?}"); + + let node_name = options + .temp_name + .unwrap_or_else(|| format!("temp-{}", Uuid::new_v4())); + let node_image = options.image.expect(&format!( + "image should be present when generating files with docker provider {THIS_IS_A_BUG}" + )); + + // run dummy command in a new container + let temp_node = self + .spawn_node( + &SpawnNodeOptions::new(node_name, "cat".to_string()) + .injected_files(options.injected_files) + .image(node_image), + ) + .await?; + + for GenerateFileCommand { + program, + args, + env, + local_output_path, + } in options.commands + { + let local_output_full_path = format!( + "{}{}{}", + self.base_dir.to_string_lossy(), + if local_output_path.starts_with("/") { + "" + } else { + "/" + }, + local_output_path.to_string_lossy() + ); + + match temp_node + .run_command(RunCommandOptions { program, args, env }) + .await? + { + Ok(contents) => self + .filesystem + .write(local_output_full_path, contents) + .await + .map_err(|err| ProviderError::FileGenerationFailed(err.into()))?, + Err((_, msg)) => Err(ProviderError::FileGenerationFailed(anyhow!("{msg}")))?, + }; + } + + temp_node.destroy().await + } + + async fn static_setup(&self) -> Result<(), ProviderError> { + todo!() + } + + async fn destroy(&self) -> Result<(), ProviderError> { + let _ = self + .docker_client + .namespaced_containers_rm(&self.name) + .await + .map_err(|err| ProviderError::DeleteNamespaceFailed(self.name.clone(), err.into()))?; + + if let Some(provider) = self.provider.upgrade() { + provider.namespaces.write().await.remove(&self.name); + } + + Ok(()) + } +} + +impl<FS> Drop for DockerNamespace<FS> +where + FS: FileSystem + Send + Sync + Clone, +{ + fn drop(&mut self) { + let ns_name = self.name.clone(); + if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() { + if *delete_on_drop { + let client = self.docker_client.clone(); + let provider = self.provider.upgrade(); + futures::executor::block_on(async move { + trace!("🧟 deleting ns {ns_name} from cluster"); + let _ = client.namespaced_containers_rm(&ns_name).await; + if let Some(provider) = provider { + provider.namespaces.write().await.remove(&ns_name); + } + + trace!("✅ deleted"); + }); + } else { + trace!("âš ï¸ leaking ns {ns_name} in cluster"); + } + }; + } +} diff --git a/crates/provider/src/docker/node.rs b/crates/provider/src/docker/node.rs new file mode 100644 index 0000000000000000000000000000000000000000..e85c70db26d128c4449e7971a7d378f85b489d2a --- /dev/null +++ b/crates/provider/src/docker/node.rs @@ -0,0 +1,550 @@ +use std::{ + collections::HashMap, + net::IpAddr, + path::{Component, Path, PathBuf}, + sync::{Arc, Weak}, + time::Duration, +}; + +use anyhow::anyhow; +use async_trait::async_trait; +use configuration::{shared::constants::THIS_IS_A_BUG, types::AssetLocation}; +use futures::future::try_join_all; +use support::fs::FileSystem; +use tokio::{time::sleep, try_join}; +use tracing::debug; + +use super::{ + client::{ContainerRunOptions, DockerClient}, + namespace::DockerNamespace, +}; +use crate::{ + constants::{NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, NODE_SCRIPTS_DIR}, + types::{ExecutionResult, Port, RunCommandOptions, RunScriptOptions, TransferedFile}, + ProviderError, ProviderNamespace, ProviderNode, +}; + +pub(super) struct DockerNodeOptions<'a, FS> +where + FS: FileSystem + Send + Sync + Clone + 'static, +{ + pub(super) namespace: &'a Weak<DockerNamespace<FS>>, + pub(super) namespace_base_dir: &'a PathBuf, + pub(super) name: &'a str, + pub(super) image: Option<&'a String>, + pub(super) program: &'a str, + pub(super) args: &'a [String], + pub(super) env: &'a [(String, String)], + pub(super) startup_files: &'a [TransferedFile], + pub(super) db_snapshot: Option<&'a AssetLocation>, + pub(super) docker_client: &'a DockerClient, + pub(super) container_name: String, + pub(super) filesystem: &'a FS, + pub(super) port_mapping: &'a HashMap<Port, Port>, +} + +pub struct DockerNode<FS> +where + FS: FileSystem + Send + Sync + Clone, +{ + namespace: Weak<DockerNamespace<FS>>, + name: String, + image: String, + program: String, + args: Vec<String>, + env: Vec<(String, String)>, + base_dir: PathBuf, + config_dir: PathBuf, + data_dir: PathBuf, + relay_data_dir: PathBuf, + scripts_dir: PathBuf, + log_path: PathBuf, + docker_client: DockerClient, + container_name: String, + port_mapping: HashMap<Port, Port>, + #[allow(dead_code)] + filesystem: FS, +} + +impl<FS> DockerNode<FS> +where + FS: FileSystem + Send + Sync + Clone + 'static, +{ + pub(super) async fn new( + options: DockerNodeOptions<'_, FS>, + ) -> Result<Arc<Self>, ProviderError> { + let image = options.image.ok_or_else(|| { + ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string()) + })?; + + let filesystem = options.filesystem.clone(); + + let base_dir = + PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]); + filesystem.create_dir_all(&base_dir).await?; + + let base_dir_raw = base_dir.to_string_lossy(); + let config_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_CONFIG_DIR)); + let data_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_DATA_DIR)); + let relay_data_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_RELAY_DATA_DIR)); + let scripts_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_SCRIPTS_DIR)); + let log_path = base_dir.join("node.log"); + + try_join!( + filesystem.create_dir(&config_dir), + filesystem.create_dir(&data_dir), + filesystem.create_dir(&relay_data_dir), + filesystem.create_dir(&scripts_dir), + )?; + + let node = Arc::new(DockerNode { + namespace: options.namespace.clone(), + name: options.name.to_string(), + image: image.to_string(), + program: options.program.to_string(), + args: options.args.to_vec(), + env: options.env.to_vec(), + base_dir, + config_dir, + data_dir, + relay_data_dir, + scripts_dir, + log_path, + filesystem: filesystem.clone(), + docker_client: options.docker_client.clone(), + container_name: options.container_name, + port_mapping: options.port_mapping.clone(), + }); + + node.initialize_docker().await?; + + if let Some(db_snap) = options.db_snapshot { + node.initialize_db_snapshot(db_snap).await?; + } + + node.initialize_startup_files(options.startup_files).await?; + + node.start().await?; + + Ok(node) + } + + async fn initialize_docker(&self) -> Result<(), ProviderError> { + let command = [vec![self.program.to_string()], self.args.to_vec()].concat(); + + self.docker_client + .container_run( + ContainerRunOptions::new(&self.image, command) + .name(&self.container_name) + .env(self.env.clone()) + .volume_mounts(HashMap::from([ + ( + format!("{}-zombie-wrapper", self.namespace_name(),), + "/scripts".to_string(), + ), + ( + format!("{}-helper-binaries", self.namespace_name()), + "/helpers".to_string(), + ), + ( + self.config_dir.to_string_lossy().into_owned(), + "/cfg".to_string(), + ), + ( + self.data_dir.to_string_lossy().into_owned(), + "/data".to_string(), + ), + ( + self.relay_data_dir.to_string_lossy().into_owned(), + "/relay-data".to_string(), + ), + ])) + .entrypoint("/scripts/zombie-wrapper.sh") + .port_mapping(&self.port_mapping), + ) + .await + .map_err(|err| ProviderError::NodeSpawningFailed(self.name.clone(), err.into()))?; + + // change dirs permission + let _ = self + .docker_client + .container_exec( + &self.container_name, + ["chmod", "777", "/cfg", "/data", "/relay-data"].into(), + None, + Some("root"), + ) + .await + .map_err(|err| ProviderError::NodeSpawningFailed(self.name.clone(), err.into()))?; + + Ok(()) + } + + async fn initialize_db_snapshot( + &self, + _db_snapshot: &AssetLocation, + ) -> Result<(), ProviderError> { + todo!() + // trace!("snap: {db_snapshot}"); + // let url_of_snap = match db_snapshot { + // AssetLocation::Url(location) => location.clone(), + // AssetLocation::FilePath(filepath) => self.upload_to_fileserver(filepath).await?, + // }; + + // // we need to get the snapshot from a public access + // // and extract to /data + // let opts = RunCommandOptions::new("mkdir").args([ + // "-p", + // "/data/", + // "&&", + // "mkdir", + // "-p", + // "/relay-data/", + // "&&", + // // Use our version of curl + // "/cfg/curl", + // url_of_snap.as_ref(), + // "--output", + // "/data/db.tgz", + // "&&", + // "cd", + // "/", + // "&&", + // "tar", + // "--skip-old-files", + // "-xzvf", + // "/data/db.tgz", + // ]); + + // trace!("cmd opts: {:#?}", opts); + // let _ = self.run_command(opts).await?; + + // Ok(()) + } + + async fn initialize_startup_files( + &self, + startup_files: &[TransferedFile], + ) -> Result<(), ProviderError> { + try_join_all( + startup_files + .iter() + .map(|file| self.send_file(&file.local_path, &file.remote_path, &file.mode)), + ) + .await?; + + Ok(()) + } + + pub(super) async fn start(&self) -> Result<(), ProviderError> { + self.docker_client + .container_exec( + &self.container_name, + vec!["sh", "-c", "echo start > /tmp/zombiepipe"], + None, + None, + ) + .await + .map_err(|err| { + ProviderError::NodeSpawningFailed( + format!("failed to start pod {} after spawning", self.name), + err.into(), + ) + })? + .map_err(|err| { + ProviderError::NodeSpawningFailed( + format!("failed to start pod {} after spawning", self.name,), + anyhow!("command failed in container: status {}: {}", err.0, err.1), + ) + })?; + + Ok(()) + } + + fn get_remote_parent_dir(&self, remote_file_path: &Path) -> Option<PathBuf> { + if let Some(remote_parent_dir) = remote_file_path.parent() { + if matches!( + remote_parent_dir.components().rev().peekable().peek(), + Some(Component::Normal(_)) + ) { + return Some(remote_parent_dir.to_path_buf()); + } + } + + None + } + + async fn create_remote_dir(&self, remote_dir: &Path) -> Result<(), ProviderError> { + let _ = self + .docker_client + .container_exec( + &self.container_name, + vec!["mkdir", "-p", &remote_dir.to_string_lossy()], + None, + None, + ) + .await + .map_err(|err| { + ProviderError::NodeSpawningFailed( + format!( + "failed to create dir {} for container {}", + remote_dir.to_string_lossy(), + &self.name + ), + err.into(), + ) + })?; + + Ok(()) + } + + fn namespace_name(&self) -> String { + self.namespace + .upgrade() + .map(|namespace| namespace.name().to_string()) + .unwrap_or_else(|| panic!("namespace shouldn't be dropped, {}", THIS_IS_A_BUG)) + } +} + +#[async_trait] +impl<FS> ProviderNode for DockerNode<FS> +where + FS: FileSystem + Send + Sync + Clone + 'static, +{ + fn name(&self) -> &str { + &self.name + } + + fn args(&self) -> Vec<&str> { + self.args.iter().map(|arg| arg.as_str()).collect() + } + + fn base_dir(&self) -> &PathBuf { + &self.base_dir + } + + fn config_dir(&self) -> &PathBuf { + &self.config_dir + } + + fn data_dir(&self) -> &PathBuf { + &self.data_dir + } + + fn relay_data_dir(&self) -> &PathBuf { + &self.relay_data_dir + } + + fn scripts_dir(&self) -> &PathBuf { + &self.scripts_dir + } + + fn log_path(&self) -> &PathBuf { + &self.log_path + } + + fn path_in_node(&self, file: &Path) -> PathBuf { + // here is just a noop op since we will receive the path + // for the file inside the pod + PathBuf::from(file) + } + + async fn logs(&self) -> Result<String, ProviderError> { + todo!() + } + + async fn dump_logs(&self, _local_dest: PathBuf) -> Result<(), ProviderError> { + todo!() + } + + async fn run_command( + &self, + options: RunCommandOptions, + ) -> Result<ExecutionResult, ProviderError> { + debug!( + "running command for {} with options {:?}", + self.name, options + ); + let command = [vec![options.program], options.args].concat(); + + self.docker_client + .container_exec( + &self.container_name, + vec!["sh", "-c", &command.join(" ")], + Some( + options + .env + .iter() + .map(|(k, v)| (k.as_ref(), v.as_ref())) + .collect(), + ), + None, + ) + .await + .map_err(|err| { + ProviderError::RunCommandError( + format!("sh -c {}", &command.join(" ")), + self.name.to_string(), + err.into(), + ) + }) + } + + async fn run_script( + &self, + _options: RunScriptOptions, + ) -> Result<ExecutionResult, ProviderError> { + todo!() + } + + async fn send_file( + &self, + local_file_path: &Path, + remote_file_path: &Path, + mode: &str, + ) -> Result<(), ProviderError> { + if let Some(remote_parent_dir) = self.get_remote_parent_dir(remote_file_path) { + self.create_remote_dir(&remote_parent_dir).await?; + } + + debug!( + "starting sending file for {}: {} to {} with mode {}", + self.name, + local_file_path.to_string_lossy(), + remote_file_path.to_string_lossy(), + mode + ); + + let _ = self + .docker_client + .container_cp(&self.container_name, local_file_path, remote_file_path) + .await + .map_err(|err| { + ProviderError::SendFile( + local_file_path.to_string_lossy().to_string(), + self.name.clone(), + err.into(), + ) + }); + + let _ = self + .docker_client + .container_exec( + &self.container_name, + vec!["chmod", mode, &remote_file_path.to_string_lossy()], + None, + None, + ) + .await + .map_err(|err| { + ProviderError::SendFile( + self.name.clone(), + local_file_path.to_string_lossy().to_string(), + err.into(), + ) + })?; + + Ok(()) + } + + async fn receive_file( + &self, + _remote_src: &Path, + _local_dest: &Path, + ) -> Result<(), ProviderError> { + Ok(()) + } + + async fn ip(&self) -> Result<IpAddr, ProviderError> { + let ip = self + .docker_client + .container_ip(&self.container_name) + .await + .map_err(|err| { + ProviderError::InvalidConfig(format!("Error getting container ip, err: {err}")) + })?; + + Ok(ip.parse::<IpAddr>().map_err(|err| { + ProviderError::InvalidConfig(format!( + "Can not parse the container ip: {ip}, err: {err}" + )) + })?) + } + + async fn pause(&self) -> Result<(), ProviderError> { + self.docker_client + .container_exec( + &self.container_name, + vec!["echo", "pause", ">", "/tmp/zombiepipe"], + None, + None, + ) + .await + .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))? + .map_err(|err| { + ProviderError::PauseNodeFailed( + self.name.to_string(), + anyhow!("error when pausing node: status {}: {}", err.0, err.1), + ) + })?; + + Ok(()) + } + + async fn resume(&self) -> Result<(), ProviderError> { + self.docker_client + .container_exec( + &self.container_name, + vec!["echo", "resume", ">", "/tmp/zombiepipe"], + None, + None, + ) + .await + .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))? + .map_err(|err| { + ProviderError::PauseNodeFailed( + self.name.to_string(), + anyhow!("error when pausing node: status {}: {}", err.0, err.1), + ) + })?; + + Ok(()) + } + + async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError> { + if let Some(duration) = after { + sleep(duration).await; + } + + self.docker_client + .container_exec( + &self.container_name, + vec!["echo", "restart", ">", "/tmp/zombiepipe"], + None, + None, + ) + .await + .map_err(|err| ProviderError::PauseNodeFailed(self.name.to_string(), err.into()))? + .map_err(|err| { + ProviderError::PauseNodeFailed( + self.name.to_string(), + anyhow!("error when pausing node: status {}: {}", err.0, err.1), + ) + })?; + + Ok(()) + } + + async fn destroy(&self) -> Result<(), ProviderError> { + self.docker_client + .container_rm(&self.container_name) + .await + .map_err(|err| ProviderError::KillNodeFailed(self.name.to_string(), err.into()))?; + + if let Some(namespace) = self.namespace.upgrade() { + namespace.nodes.write().await.remove(&self.name); + } + + Ok(()) + } +} diff --git a/crates/provider/src/docker/provider.rs b/crates/provider/src/docker/provider.rs new file mode 100644 index 0000000000000000000000000000000000000000..5acd7c269ef0ebe084c9cca9c5fdb75c0c943351 --- /dev/null +++ b/crates/provider/src/docker/provider.rs @@ -0,0 +1,97 @@ +use std::{ + collections::HashMap, + path::PathBuf, + sync::{Arc, Weak}, +}; + +use async_trait::async_trait; +use support::fs::FileSystem; +use tokio::sync::RwLock; + +use super::{client::DockerClient, namespace::DockerNamespace}; +use crate::{ + types::ProviderCapabilities, DynNamespace, Provider, ProviderError, ProviderNamespace, +}; + +const PROVIDER_NAME: &str = "docker"; + +pub struct DockerProvider<FS> +where + FS: FileSystem + Send + Sync + Clone, +{ + weak: Weak<DockerProvider<FS>>, + capabilities: ProviderCapabilities, + tmp_dir: PathBuf, + docker_client: DockerClient, + filesystem: FS, + pub(super) namespaces: RwLock<HashMap<String, Arc<DockerNamespace<FS>>>>, +} + +impl<FS> DockerProvider<FS> +where + FS: FileSystem + Send + Sync + Clone, +{ + pub async fn new(filesystem: FS) -> Arc<Self> { + let docker_client = DockerClient::new().await.unwrap(); + + Arc::new_cyclic(|weak| DockerProvider { + weak: weak.clone(), + capabilities: ProviderCapabilities { + requires_image: true, + has_resources: false, + prefix_with_full_path: false, + use_default_ports_in_cmd: true, + }, + tmp_dir: std::env::temp_dir(), + docker_client, + filesystem, + namespaces: RwLock::new(HashMap::new()), + }) + } + + pub fn tmp_dir(mut self, tmp_dir: impl Into<PathBuf>) -> Self { + self.tmp_dir = tmp_dir.into(); + self + } +} + +#[async_trait] +impl<FS> Provider for DockerProvider<FS> +where + FS: FileSystem + Send + Sync + Clone + 'static, +{ + fn name(&self) -> &str { + PROVIDER_NAME + } + + fn capabilities(&self) -> &ProviderCapabilities { + &self.capabilities + } + + async fn namespaces(&self) -> HashMap<String, DynNamespace> { + self.namespaces + .read() + .await + .iter() + .map(|(name, namespace)| (name.clone(), namespace.clone() as DynNamespace)) + .collect() + } + + async fn create_namespace(&self) -> Result<DynNamespace, ProviderError> { + let namespace = DockerNamespace::new( + &self.weak, + &self.tmp_dir, + &self.capabilities, + &self.docker_client, + &self.filesystem, + ) + .await?; + + self.namespaces + .write() + .await + .insert(namespace.name().to_string(), namespace.clone()); + + Ok(namespace) + } +} diff --git a/crates/provider/src/kubernetes.rs b/crates/provider/src/kubernetes.rs index e02d51ee7cdea60c8ccf004893735456c9f74312..7c9e208e3afeb2bea3c76e53aaec4e15ff3d0c0b 100644 --- a/crates/provider/src/kubernetes.rs +++ b/crates/provider/src/kubernetes.rs @@ -4,5 +4,4 @@ mod node; mod pod_spec_builder; mod provider; -pub use client::KubernetesClient; pub use provider::KubernetesProvider; diff --git a/crates/provider/src/kubernetes/namespace.rs b/crates/provider/src/kubernetes/namespace.rs index 0a5888f5ab1efbf04ba2763bf1f3c6d8321b3bf3..7d4f2ddd274d16aaad217057aba2147d6199cf0e 100644 --- a/crates/provider/src/kubernetes/namespace.rs +++ b/crates/provider/src/kubernetes/namespace.rs @@ -19,7 +19,7 @@ use tokio::sync::{Mutex, RwLock}; use tracing::{debug, trace}; use uuid::Uuid; -use super::node::KubernetesNode; +use super::{client::KubernetesClient, node::KubernetesNode}; use crate::{ constants::NAMESPACE_PREFIX, kubernetes::node::KubernetesNodeOptions, @@ -28,7 +28,7 @@ use crate::{ GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions, SpawnNodeOptions, }, - DynNode, KubernetesClient, KubernetesProvider, ProviderError, ProviderNamespace, ProviderNode, + DynNode, KubernetesProvider, ProviderError, ProviderNamespace, ProviderNode, }; const FILE_SERVER_IMAGE: &str = "europe-west3-docker.pkg.dev/parity-zombienet/zombienet-public-images/zombienet-file-server:latest"; @@ -84,13 +84,13 @@ where Ok(namespace) } - pub(super) async fn initialize(&self) -> Result<(), ProviderError> { + async fn initialize(&self) -> Result<(), ProviderError> { self.initialize_k8s().await?; self.initialize_file_server().await?; self.setup_script_config_map( "zombie-wrapper", - include_str!("./scripts/zombie-wrapper.sh"), + include_str!("../shared/scripts/zombie-wrapper.sh"), "zombie_wrapper_config_map_manifest.yaml", // TODO: add correct labels BTreeMap::new(), @@ -99,7 +99,7 @@ where self.setup_script_config_map( "helper-binaries-downloader", - include_str!("./scripts/helper-binaries-downloader.sh"), + include_str!("../shared/scripts/helper-binaries-downloader.sh"), "helper_binaries_downloader_config_map_manifest.yaml", // TODO: add correct labels BTreeMap::new(), @@ -322,9 +322,14 @@ where if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() { if *delete_on_drop { let client = self.k8s_client.clone(); + let provider = self.provider.upgrade(); futures::executor::block_on(async move { trace!("🧟 deleting ns {ns_name} from cluster"); let _ = client.delete_namespace(&ns_name).await; + if let Some(provider) = provider { + provider.namespaces.write().await.remove(&ns_name); + } + trace!("✅ deleted"); }); } else { @@ -385,11 +390,13 @@ where ProviderError::NodeAvailableArgsError(node_image, command, status) })?; + temp_node.destroy().await?; + Ok(available_args_output) } async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> { - trace!("spawn option {:?}", options); + trace!("spawn node options {options:?}"); if self.nodes.read().await.contains_key(&options.name) { return Err(ProviderError::DuplicatedNodeName(options.name.clone())); } @@ -419,7 +426,7 @@ where } async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> { - debug!("options {:#?}", options); + debug!("generate files options {options:#?}"); let node_name = options .temp_name diff --git a/crates/provider/src/kubernetes/node.rs b/crates/provider/src/kubernetes/node.rs index 4dcc16a1cea111392fed148f7e19efc59641768e..f8f1f4fd17121fcd22550b91d456b46c7739b2fd 100644 --- a/crates/provider/src/kubernetes/node.rs +++ b/crates/provider/src/kubernetes/node.rs @@ -21,14 +21,16 @@ use tokio::{sync::RwLock, task::JoinHandle, time::sleep, try_join}; use tracing::trace; use url::Url; -use super::{namespace::KubernetesNamespace, pod_spec_builder::PodSpecBuilder}; +use super::{ + client::KubernetesClient, namespace::KubernetesNamespace, pod_spec_builder::PodSpecBuilder, +}; use crate::{ constants::{ NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, NODE_SCRIPTS_DIR, P2P_PORT, PROMETHEUS_PORT, RPC_HTTP_PORT, RPC_WS_PORT, }, types::{ExecutionResult, RunCommandOptions, RunScriptOptions, TransferedFile}, - KubernetesClient, ProviderError, ProviderNamespace, ProviderNode, + ProviderError, ProviderNamespace, ProviderNode, }; pub(super) struct KubernetesNodeOptions<'a, FS> @@ -57,7 +59,11 @@ where { namespace: Weak<KubernetesNamespace<FS>>, name: String, + image: String, + program: String, args: Vec<String>, + env: Vec<(String, String)>, + resources: Option<Resources>, base_dir: PathBuf, config_dir: PathBuf, data_dir: PathBuf, @@ -77,6 +83,10 @@ where pub(super) async fn new( options: KubernetesNodeOptions<'_, FS>, ) -> Result<Arc<Self>, ProviderError> { + let image = options.image.ok_or_else(|| { + ProviderError::MissingNodeInfo(options.name.to_string(), "missing image".to_string()) + })?; + let filesystem = options.filesystem.clone(); let base_dir = @@ -100,7 +110,11 @@ where let node = Arc::new(KubernetesNode { namespace: options.namespace.clone(), name: options.name.to_string(), + image: image.to_string(), + program: options.program.to_string(), args: options.args.to_vec(), + env: options.env.to_vec(), + resources: options.resources.cloned(), base_dir, config_dir, data_dir, @@ -113,14 +127,7 @@ where port_fwds: Default::default(), }); - node.initialize_k8s( - options.image, - options.program, - options.args, - options.env, - options.resources, - ) - .await?; + node.initialize_k8s().await?; if let Some(db_snap) = options.db_snapshot { node.initialize_db_snapshot(db_snap).await?; @@ -133,14 +140,7 @@ where Ok(node) } - async fn initialize_k8s( - &self, - image: Option<&String>, - program: &str, - args: &[String], - env: &[(String, String)], - resources: Option<&Resources>, - ) -> Result<(), ProviderError> { + async fn initialize_k8s(&self) -> Result<(), ProviderError> { let labels = BTreeMap::from([ ( "app.kubernetes.io/name".to_string(), @@ -152,12 +152,15 @@ where ), ]); - let image = image.ok_or_else(|| { - ProviderError::MissingNodeInfo(self.name.to_string(), "missing image".to_string()) - })?; - // Create pod - let pod_spec = PodSpecBuilder::build(&self.name, image, resources, program, args, env); + let pod_spec = PodSpecBuilder::build( + &self.name, + &self.image, + self.resources.as_ref(), + &self.program, + &self.args, + &self.env, + ); let manifest = self .k8s_client @@ -330,7 +333,11 @@ where .await .map_err(|err| { ProviderError::NodeSpawningFailed( - format!("failed to created dirfor pod {}", &self.name), + format!( + "failed to create dir {} for pod {}", + remote_dir.to_string_lossy(), + &self.name + ), err.into(), ) })?; @@ -537,7 +544,13 @@ where remote_file_path: &Path, mode: &str, ) -> Result<(), ProviderError> { - let data = self.filesystem.read(local_file_path).await.unwrap(); + let data = self.filesystem.read(local_file_path).await.map_err(|err| { + ProviderError::SendFile( + self.name.clone(), + local_file_path.to_string_lossy().to_string(), + err.into(), + ) + })?; if let Some(remote_parent_dir) = self.get_remote_parent_dir(remote_file_path) { self.create_remote_dir(&remote_parent_dir).await?; @@ -553,7 +566,11 @@ where .send() .await .map_err(|err| { - ProviderError::SendFile(local_file_path.to_string_lossy().to_string(), err.into()) + ProviderError::SendFile( + self.name.clone(), + local_file_path.to_string_lossy().to_string(), + err.into(), + ) })?; let _ = self @@ -570,7 +587,11 @@ where ) .await .map_err(|err| { - ProviderError::SendFile(local_file_path.to_string_lossy().to_string(), err.into()) + ProviderError::SendFile( + self.name.clone(), + local_file_path.to_string_lossy().to_string(), + err.into(), + ) })?; let _ = self @@ -582,7 +603,11 @@ where ) .await .map_err(|err| { - ProviderError::SendFile(local_file_path.to_string_lossy().to_string(), err.into()) + ProviderError::SendFile( + self.name.clone(), + local_file_path.to_string_lossy().to_string(), + err.into(), + ) })?; Ok(()) diff --git a/crates/provider/src/kubernetes/scripts/helper-binaries-downloader.sh b/crates/provider/src/kubernetes/scripts/helper-binaries-downloader.sh deleted file mode 100644 index 2eb5c521330e33c0622dc8a83a454d8fc39b382e..0000000000000000000000000000000000000000 --- a/crates/provider/src/kubernetes/scripts/helper-binaries-downloader.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/ash - -log() { - echo "$(date +"%F %T") $1" -} - -wget github.com/moparisthebest/static-curl/releases/download/v7.83.1/curl-amd64 -O /cfg/curl -log "curl downloaded" - -chmod +x /cfg/curl -log "curl chmoded" - -wget -qO- github.com/uutils/coreutils/releases/download/0.0.17/coreutils-0.0.17-x86_64-unknown-linux-musl.tar.gz | tar -xz -C /cfg --strip-components=1 coreutils-0.0.17-x86_64-unknown-linux-musl/coreutils -log "coreutils downloaded" - -chmod +x /cfg/coreutils -log "coreutils chmoded" \ No newline at end of file diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 0e72d2ff693620ba6a046789ed0b168c9115334c..c76df226a25d5212dbd7ab5f32c3ee56b9910633 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::expect_fun_call)] +mod docker; mod kubernetes; mod native; pub mod shared; @@ -102,8 +103,8 @@ pub enum ProviderError { #[error("Error downloading file: '{0}': {1}")] DownloadFile(String, anyhow::Error), - #[error("Error sending file: '{0}': {1}")] - SendFile(String, anyhow::Error), + #[error("Error sending file '{0}' to {1}: {2}")] + SendFile(String, String, anyhow::Error), #[error("Error creating port-forward '{0}:{1}': {2}")] PortForwardError(u16, u16, anyhow::Error), @@ -187,7 +188,7 @@ pub trait ProviderNode { Ok(LOCALHOST) } - // Noop by default (native provider) + // Noop by default (native/docker provider) async fn create_port_forward( &self, _local_port: u16, @@ -229,6 +230,7 @@ pub trait ProviderNode { pub type DynNode = Arc<dyn ProviderNode + Send + Sync>; // re-export +pub use docker::*; pub use kubernetes::*; pub use native::*; pub use shared::{constants, types}; diff --git a/crates/provider/src/native/namespace.rs b/crates/provider/src/native/namespace.rs index 33886cbc69c4c46136f7d5e8e5bd824b94408f94..ddefcc16c61ce3127c05d1a77f89f5d2cc20c16f 100644 --- a/crates/provider/src/native/namespace.rs +++ b/crates/provider/src/native/namespace.rs @@ -11,7 +11,7 @@ use tokio::sync::RwLock; use tracing::trace; use uuid::Uuid; -use super::node::NativeNode; +use super::node::{NativeNode, NativeNodeOptions}; use crate::{ constants::NAMESPACE_PREFIX, types::{ @@ -104,6 +104,8 @@ where ProviderError::NodeAvailableArgsError("".to_string(), command, status) })?; + temp_node.destroy().await?; + Ok(available_args_output) } @@ -112,18 +114,18 @@ where return Err(ProviderError::DuplicatedNodeName(options.name.clone())); } - let node = NativeNode::new( - &self.weak, - &self.base_dir, - &options.name, - &options.program, - &options.args, - &options.env, - &options.injected_files, - &options.created_paths, - &options.db_snapshot.as_ref(), - &self.filesystem, - ) + let node = NativeNode::new(NativeNodeOptions { + namespace: &self.weak, + namespace_base_dir: &self.base_dir, + name: &options.name, + program: &options.program, + args: &options.args, + env: &options.env, + startup_files: &options.injected_files, + created_paths: &options.created_paths, + db_snapshot: options.db_snapshot.as_ref(), + filesystem: &self.filesystem, + }) .await?; self.nodes diff --git a/crates/provider/src/native/node.rs b/crates/provider/src/native/node.rs index 37e73438fd8d3757e75801915da5c691fb0b4c38..4523e818177a420731f6fb8c1ad8986a8932304a 100644 --- a/crates/provider/src/native/node.rs +++ b/crates/provider/src/native/node.rs @@ -37,6 +37,22 @@ use crate::{ ProviderError, ProviderNamespace, ProviderNode, }; +pub(super) struct NativeNodeOptions<'a, FS> +where + FS: FileSystem + Send + Sync + Clone + 'static, +{ + pub(super) namespace: &'a Weak<NativeNamespace<FS>>, + pub(super) namespace_base_dir: &'a PathBuf, + pub(super) name: &'a str, + pub(super) program: &'a str, + pub(super) args: &'a [String], + pub(super) env: &'a [(String, String)], + pub(super) startup_files: &'a [TransferedFile], + pub(super) created_paths: &'a [PathBuf], + pub(super) db_snapshot: Option<&'a AssetLocation>, + pub(super) filesystem: &'a FS, +} + pub(super) struct NativeNode<FS> where FS: FileSystem + Send + Sync + Clone, @@ -63,22 +79,15 @@ impl<FS> NativeNode<FS> where FS: FileSystem + Send + Sync + Clone + 'static, { - #[allow(clippy::too_many_arguments)] pub(super) async fn new( - namespace: &Weak<NativeNamespace<FS>>, - namespace_base_dir: &PathBuf, - name: &str, - program: &str, - args: &[String], - env: &[(String, String)], - startup_files: &[TransferedFile], - created_paths: &[PathBuf], - db_snapshot: &Option<&AssetLocation>, - filesystem: &FS, + options: NativeNodeOptions<'_, FS>, ) -> Result<Arc<Self>, ProviderError> { - let base_dir = PathBuf::from_iter([namespace_base_dir, &PathBuf::from(name)]); + let filesystem = options.filesystem.clone(); + + let base_dir = + PathBuf::from_iter([options.namespace_base_dir, &PathBuf::from(options.name)]); trace!("creating base_dir {:?}", base_dir); - filesystem.create_dir_all(&base_dir).await?; + options.filesystem.create_dir_all(&base_dir).await?; trace!("created base_dir {:?}", base_dir); let base_dir_raw = base_dir.to_string_lossy(); @@ -86,7 +95,7 @@ where let data_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_DATA_DIR)); let relay_data_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_RELAY_DATA_DIR)); let scripts_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_SCRIPTS_DIR)); - let log_path = base_dir.join(format!("{name}.log")); + let log_path = base_dir.join(format!("{}.log", options.name)); trace!("creating dirs {:?}", config_dir); try_join!( @@ -98,11 +107,11 @@ where trace!("created!"); let node = Arc::new(NativeNode { - namespace: namespace.clone(), - name: name.to_string(), - program: program.to_string(), - args: args.to_vec(), - env: env.to_vec(), + namespace: options.namespace.clone(), + name: options.name.to_string(), + program: options.program.to_string(), + args: options.args.to_vec(), + env: options.env.to_vec(), base_dir, config_dir, data_dir, @@ -116,10 +125,10 @@ where filesystem: filesystem.clone(), }); - node.initialize_startup_paths(created_paths).await?; - node.initialize_startup_files(startup_files).await?; + node.initialize_startup_paths(options.created_paths).await?; + node.initialize_startup_files(options.startup_files).await?; - if let Some(db_snap) = db_snapshot { + if let Some(db_snap) = options.db_snapshot { node.initialize_db_snapshot(db_snap).await?; } @@ -492,7 +501,13 @@ where .args(vec![mode, &namespaced_remote_file_path.to_string_lossy()]), ) .await? - .map_err(|(_, err)| ProviderError::SendFile(self.name.clone(), anyhow!("{err}")))?; + .map_err(|(_, err)| { + ProviderError::SendFile( + self.name.clone(), + local_file_path.to_string_lossy().to_string(), + anyhow!("{err}"), + ) + })?; Ok(()) } diff --git a/crates/provider/src/shared/scripts/helper-binaries-downloader.sh b/crates/provider/src/shared/scripts/helper-binaries-downloader.sh new file mode 100644 index 0000000000000000000000000000000000000000..c2fe89df9ff796ea2805bcfd9c8f88719e20dcbe --- /dev/null +++ b/crates/provider/src/shared/scripts/helper-binaries-downloader.sh @@ -0,0 +1,22 @@ +#!/bin/ash + +log() { + echo "$(date +"%F %T") $1" +} + +# used to handle the distinction where /cfg is used for k8s and /helpers for docker/podman +# to share a volume across nodes containing helper binaries and independent from /cfg +# where some node files are stored +OUTDIR=$([ -d /helpers ] && echo "/helpers" || echo "/cfg") + +wget github.com/moparisthebest/static-curl/releases/download/v7.83.1/curl-amd64 -O "$OUTDIR/curl" +log "curl downloaded" + +chmod +x "$OUTDIR/curl" +log "curl chmoded" + +wget -qO- github.com/uutils/coreutils/releases/download/0.0.17/coreutils-0.0.17-x86_64-unknown-linux-musl.tar.gz | tar -xz -C $OUTDIR --strip-components=1 coreutils-0.0.17-x86_64-unknown-linux-musl/coreutils +log "coreutils downloaded" + +chmod +x "$OUTDIR/coreutils" +log "coreutils chmoded" \ No newline at end of file diff --git a/crates/provider/src/kubernetes/scripts/zombie-wrapper.sh b/crates/provider/src/shared/scripts/zombie-wrapper.sh similarity index 79% rename from crates/provider/src/kubernetes/scripts/zombie-wrapper.sh rename to crates/provider/src/shared/scripts/zombie-wrapper.sh index 413a740147d490b72d3c92cffd7b65cbf7fef5fe..56d945e41510174bf7100465a3022839525267ed 100755 --- a/crates/provider/src/kubernetes/scripts/zombie-wrapper.sh +++ b/crates/provider/src/shared/scripts/zombie-wrapper.sh @@ -9,6 +9,17 @@ if [ -f /cfg/coreutils ]; then KILL="/cfg/coreutils kill" SLEEP="/cfg/coreutils sleep" ECHO="/cfg/coreutils echo" +elif [ -f /helpers/coreutils ]; then +# used for docker/podman to have a single volume sharing helper binaries +# across nodes independent from the /cfg where some files are stored +# by the node itself + RM="/helpers/coreutils rm" + MKFIFO="/helpers/coreutils mkfifo" + MKNOD="/helpers/coreutils mknod" + LS="/helpers/coreutils ls" + KILL="/helpers/coreutils kill" + SLEEP="/helpers/coreutils sleep" + ECHO="/helpers/coreutils echo" else RM="rm" MKFIFO="mkfifo" @@ -52,7 +63,8 @@ child_pid="" CMD=($@) # File to store CMD (and update from there) -ZOMBIE_CMD_FILE=/cfg/zombie.cmd +ZOMBIE_CMD_FILE=/tmp/zombie.cmd +ZOMBIE_CMD_PID=/tmp/zombie.pid # Store the cmd and make it available to later usage # NOTE: echo without new line to allow to customize the cmd later @@ -68,7 +80,7 @@ start() { $ECHO $(cat $ZOMBIE_CMD_FILE) # store pid - $ECHO ${child_pid} > /cfg/zombie.pid + $ECHO ${child_pid} > $ZOMBIE_CMD_PID # check if the process is running if ! $LS /proc/$child_pid > /dev/null 2>&1 ; then @@ -78,7 +90,7 @@ start() { echo "PID: $child_pid alive"; fi; else - echo "PID not stored, since was 'cat'"; + echo "Process not started, PID not stored, since was 'cat'"; fi; } @@ -111,6 +123,7 @@ resume() { # keep listening from the pipe while read line <$pipe +echo "read line: ${line}" do if [[ "$line" == "start" ]]; then start diff --git a/crates/provider/src/shared/types.rs b/crates/provider/src/shared/types.rs index 4526ba08f82db240d7a298c449bdccf7a18103ca..e4e58edc1f911df705b22917ad0df8c1750740d5 100644 --- a/crates/provider/src/shared/types.rs +++ b/crates/provider/src/shared/types.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, path::{Path, PathBuf}, process::ExitStatus, }; @@ -47,6 +48,7 @@ pub struct SpawnNodeOptions { /// Database snapshot to be injected (should be a tgz file) /// Could be a local or remote asset pub db_snapshot: Option<AssetLocation>, + pub port_mapping: Option<HashMap<Port, Port>>, } impl SpawnNodeOptions { @@ -64,6 +66,7 @@ impl SpawnNodeOptions { injected_files: vec![], created_paths: vec![], db_snapshot: None, + port_mapping: None, } } @@ -125,6 +128,11 @@ impl SpawnNodeOptions { .collect(); self } + + pub fn port_mapping(mut self, ports: HashMap<Port, Port>) -> Self { + self.port_mapping = Some(ports); + self + } } #[derive(Debug)] diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs index 8875e88cc78a3fde82756b43056d93efec27aec0..0e4afe92f988d6750daaaab8d46a8e5f19580743 100644 --- a/crates/sdk/src/lib.rs +++ b/crates/sdk/src/lib.rs @@ -5,7 +5,7 @@ pub use orchestrator::{ network::{node::NetworkNode, Network}, AddCollatorOptions, AddNodeOptions, Orchestrator, PjsResult, }; -use provider::{KubernetesProvider, NativeProvider}; +use provider::{DockerProvider, KubernetesProvider, NativeProvider}; pub use support::fs::local::LocalFileSystem; pub const PROVIDERS: [&str; 2] = ["k8s", "native"]; @@ -26,19 +26,29 @@ pub trait NetworkConfigExt { /// ``` async fn spawn_native(self) -> Result<Network<LocalFileSystem>, OrchestratorError>; async fn spawn_k8s(self) -> Result<Network<LocalFileSystem>, OrchestratorError>; + async fn spawn_docker(self) -> Result<Network<LocalFileSystem>, OrchestratorError>; } #[async_trait] impl NetworkConfigExt for NetworkConfig { async fn spawn_native(self) -> Result<Network<LocalFileSystem>, OrchestratorError> { - let provider = NativeProvider::new(LocalFileSystem {}); - let orchestrator = Orchestrator::new(LocalFileSystem {}, provider); + let filesystem = LocalFileSystem; + let provider = NativeProvider::new(filesystem.clone()); + let orchestrator = Orchestrator::new(filesystem, provider); orchestrator.spawn(self).await } async fn spawn_k8s(self) -> Result<Network<LocalFileSystem>, OrchestratorError> { - let provider = KubernetesProvider::new(LocalFileSystem {}).await; - let orchestrator = Orchestrator::new(LocalFileSystem {}, provider); + let filesystem = LocalFileSystem; + let provider = KubernetesProvider::new(filesystem.clone()).await; + let orchestrator = Orchestrator::new(filesystem, provider); + orchestrator.spawn(self).await + } + + async fn spawn_docker(self) -> Result<Network<LocalFileSystem>, OrchestratorError> { + let filesystem = LocalFileSystem; + let provider = DockerProvider::new(filesystem.clone()).await; + let orchestrator = Orchestrator::new(filesystem, provider); orchestrator.spawn(self).await } }