diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2a7cf88ff3335ff84d7e5c00753ecc33f2e4c513..558d24949a7fb410952797fe6a877c0e051a16b0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,6 +21,12 @@ jobs: # TODO 24-02-08: Disable nightly due to tkaitchuck/aHash#200. #- nightly steps: + # https://github.com/jlumbroso/free-disk-space + - name: Free Disk Space (Ubuntu) + uses: jlumbroso/free-disk-space@main + with: + tool-cache: false + - uses: actions/checkout@v3 - name: Init nigthly install for fmt diff --git a/crates/examples/examples/db_snapshot.rs b/crates/examples/examples/db_snapshot.rs new file mode 100644 index 0000000000000000000000000000000000000000..68aa61123bf76b844f3a0b37d4dad999291e0abe --- /dev/null +++ b/crates/examples/examples/db_snapshot.rs @@ -0,0 +1,28 @@ +use zombienet_sdk::{NetworkConfigBuilder, NetworkConfigExt}; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + tracing_subscriber::fmt::init(); + let mut _network = NetworkConfigBuilder::new() + .with_relaychain(|r| { + r.with_chain("rococo-local") + .with_default_command("substrate-node") + .with_default_image("docker.io/paritypr/substrate:3428-e5be9c93") + .with_default_db_snapshot("https://storage.googleapis.com/zombienet-db-snaps/substrate/0001-basic-warp-sync/chains-9677807d738b951e9f6c82e5fd15518eb0ae0419.tgz") + .with_chain_spec_path("/Users/pepo/parity/polkadot-sdk/substrate/zombienet/0001-basic-warp-sync/chain-spec.json") + .with_node(|node| node.with_name("alice")) + .with_node(|node| node.with_name("bob")) + .with_node(|node| node.with_name("charlie")) + }) + .build() + .unwrap() + // .spawn_native() + .spawn_k8s() + .await?; + + println!("🚀🚀🚀🚀 network deployed"); + + // For now let just loop.... + #[allow(clippy::empty_loop)] + loop {} +} diff --git a/crates/orchestrator/src/generators/chain_spec.rs b/crates/orchestrator/src/generators/chain_spec.rs index eb042f212565beb3ca21d2379c66700c90fbff51..faf9d223527fbcf84170883c0b2ef3243bd0ab2e 100644 --- a/crates/orchestrator/src/generators/chain_spec.rs +++ b/crates/orchestrator/src/generators/chain_spec.rs @@ -162,7 +162,19 @@ impl ChainSpec { } if is_raw(maybe_plain_spec_path.clone(), scoped_fs).await? { - self.raw_path = Some(maybe_plain_spec_path); + let spec_path = PathBuf::from(format!("{}.json", self.chain_spec_name)); + let tf_file = TransferedFile::new( + &PathBuf::from_iter([ns.base_dir(), &maybe_plain_spec_path]), + &spec_path, + ); + scoped_fs.copy_files(vec![&tf_file]).await.map_err(|e| { + GeneratorError::ChainSpecGeneration(format!( + "Error copying file: {}, err: {}", + tf_file, e + )) + })?; + + self.raw_path = Some(spec_path); } else { self.maybe_plain_path = Some(maybe_plain_spec_path); } diff --git a/crates/orchestrator/src/generators/command.rs b/crates/orchestrator/src/generators/command.rs index 9cee572ab2805bcd70772bd746c2529051fe2d98..61695472090bcf02ed1257054a90efd986ca0497 100644 --- a/crates/orchestrator/src/generators/command.rs +++ b/crates/orchestrator/src/generators/command.rs @@ -232,6 +232,7 @@ pub fn generate_for_node( if *is_validator && !args.contains(&Arg::Flag("--validator".into())) { tmp_args.push("--validator".into()); + // TODO: we need to impl cli args checking tmp_args.push("--insecure-validator-i-know-what-i-do".into()); } diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index 9da760f85da85e4fd1e93a43bf0ab1a77befb23d..d823261ab1ec433547de8f3bf1c2080f50d35bf5 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -24,7 +24,7 @@ use provider::{ }; use support::fs::{FileSystem, FileSystemError}; use tokio::time::timeout; -use tracing::{debug, info}; +use tracing::{debug, info, trace}; use crate::{ generators::chain_spec::ParaGenesisConfig, @@ -425,6 +425,7 @@ impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> { self.base_dir, file.remote_path.to_string_lossy() )); + trace!("coping file: {file}"); self.fs .copy(file.local_path.as_path(), full_remote_path) .await?; diff --git a/crates/orchestrator/src/network_spec/node.rs b/crates/orchestrator/src/network_spec/node.rs index 0e3cba823b750089384fa094807cc85afaf9fc28..5a3dda3bb78fef983d194756021b110ecdbd4c08 100644 --- a/crates/orchestrator/src/network_spec/node.rs +++ b/crates/orchestrator/src/network_spec/node.rs @@ -150,6 +150,12 @@ impl NodeSpec { let accounts = generators::generate_node_keys(&seed)?; let accounts = NodeAccounts { seed, accounts }; + let db_snapshot = match (node_config.db_snapshot(), chain_context.default_db_snapshot) { + (Some(db_snapshot), _) => Some(db_snapshot), + (None, Some(db_snapshot)) => Some(db_snapshot), + _ => None, + }; + Ok(Self { name: node_config.name().to_string(), key, @@ -170,7 +176,7 @@ impl NodeSpec { .collect(), resources: node_config.resources().cloned(), p2p_cert_hash: node_config.p2p_cert_hash().map(str::to_string), - db_snapshot: node_config.db_snapshot().cloned(), + db_snapshot: db_snapshot.cloned(), accounts, ws_port: generators::generate_node_port(node_config.ws_port())?, rpc_port: generators::generate_node_port(node_config.rpc_port())?, diff --git a/crates/orchestrator/src/spawner.rs b/crates/orchestrator/src/spawner.rs index 290c13affd67c63a18cd551dab4b4318ba5b7a3f..97a959445c3ef9ca12526a41d25048a26d21d27a 100644 --- a/crates/orchestrator/src/spawner.rs +++ b/crates/orchestrator/src/spawner.rs @@ -156,7 +156,8 @@ where .map(|var| (var.name.clone(), var.value.clone())), ) .injected_files(files_to_inject) - .created_paths(created_paths); + .created_paths(created_paths) + .db_snapshot(node.db_snapshot.clone()); let spawn_ops = if let Some(image) = node.image.as_ref() { spawn_ops.image(image.as_str()) diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index ae4c4e1eb045c9588f9dfd9a77e808dbfea2a4e7..4967526ce0ec23c050f05e177f26d26f714b7ad4 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -38,6 +38,8 @@ hex = { workspace = true } tracing = { workspace = true } reqwest = { workspace = true } regex = { workspace = true } +url = { workspace = true } +flate2 = "1.0" # Zomebienet deps support = { workspace = true } diff --git a/crates/provider/src/kubernetes/namespace.rs b/crates/provider/src/kubernetes/namespace.rs index 88a4820222dbfe29bb5d90c1161ce0a9983455c5..81f6f31ff3886a2cfde2fc95807aba9a7204462c 100644 --- a/crates/provider/src/kubernetes/namespace.rs +++ b/crates/provider/src/kubernetes/namespace.rs @@ -348,6 +348,7 @@ where env: &options.env, startup_files: &options.injected_files, resources: options.resources.as_ref(), + db_snapshot: options.db_snapshot.as_ref(), k8s_client: &self.k8s_client, filesystem: &self.filesystem, }) diff --git a/crates/provider/src/kubernetes/node.rs b/crates/provider/src/kubernetes/node.rs index 03cf50a3e92496a0273f91dcf1f1df57200da453..75c21dff23134e142d90280cb4d62e181567973f 100644 --- a/crates/provider/src/kubernetes/node.rs +++ b/crates/provider/src/kubernetes/node.rs @@ -9,11 +9,17 @@ use std::{ use anyhow::anyhow; use async_trait::async_trait; -use configuration::shared::{constants::THIS_IS_A_BUG, resources::Resources}; +use configuration::{ + shared::{constants::THIS_IS_A_BUG, resources::Resources}, + types::AssetLocation, +}; use futures::future::try_join_all; use k8s_openapi::api::core::v1::{ServicePort, ServiceSpec}; +use sha2::Digest; use support::fs::FileSystem; use tokio::{sync::RwLock, task::JoinHandle, time::sleep, try_join}; +use tracing::trace; +use url::Url; use super::{namespace::KubernetesNamespace, pod_spec_builder::PodSpecBuilder}; use crate::{ @@ -38,6 +44,7 @@ where pub(super) env: &'a [(String, String)], pub(super) startup_files: &'a [TransferedFile], pub(super) resources: Option<&'a Resources>, + pub(super) db_snapshot: Option<&'a AssetLocation>, pub(super) k8s_client: &'a KubernetesClient, pub(super) filesystem: &'a FS, } @@ -115,6 +122,10 @@ where ) .await?; + if let Some(db_snap) = options.db_snapshot { + node.initialize_db_snapshot(db_snap).await?; + } + node.initialize_startup_files(options.startup_files).await?; node.start().await?; @@ -216,6 +227,47 @@ where Ok(()) } + async fn initialize_db_snapshot( + &self, + db_snapshot: &AssetLocation, + ) -> Result<(), ProviderError> { + trace!("snap: {db_snapshot}"); + let url_of_snap = match db_snapshot { + AssetLocation::Url(location) => location.clone(), + AssetLocation::FilePath(filepath) => self.upload_to_fileserver(filepath).await?, + }; + + // we need to get the snapshot from a public access + // and extract to /data + let opts = RunCommandOptions::new("mkdir").args([ + "-p", + "/data/", + "&&", + "mkdir", + "-p", + "/relay-data/", + "&&", + // Use our version of curl + "/cfg/curl", + url_of_snap.as_ref(), + "--output", + "/data/db.tgz", + "&&", + "cd", + "/", + "&&", + "tar", + "--skip-old-files", + "-xzvf", + "/data/db.tgz", + ]); + + trace!("cmd opts: {:#?}", opts); + let _ = self.run_command(opts).await?; + + Ok(()) + } + async fn initialize_startup_files( &self, startup_files: &[TransferedFile], @@ -293,6 +345,40 @@ where .unwrap_or_else(|| panic!("namespace shouldn't be dropped, {}", THIS_IS_A_BUG)) } + async fn upload_to_fileserver(&self, location: &Path) -> Result<Url, ProviderError> { + let data = self.filesystem.read(location).await?; + let hashed_path = hex::encode(sha2::Sha256::digest(&data)); + let req = self + .http_client + .head(format!( + "http://{}/{hashed_path}", + self.file_server_local_host().await? + )) + .build() + .map_err(|err| { + ProviderError::UploadFile(location.to_string_lossy().to_string(), err.into()) + })?; + + let url = req.url().clone(); + let res = self.http_client.execute(req).await.map_err(|err| { + ProviderError::UploadFile(location.to_string_lossy().to_string(), err.into()) + })?; + + if res.status() != reqwest::StatusCode::OK { + // we need to upload the file + self.http_client + .post(url.as_ref()) + .body(data) + .send() + .await + .map_err(|err| { + ProviderError::UploadFile(location.to_string_lossy().to_string(), err.into()) + })?; + } + + Ok(url) + } + async fn file_server_local_host(&self) -> Result<String, ProviderError> { if let Some(namespace) = self.namespace.upgrade() { if let Some(port) = *namespace.file_server_port.read().await { diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index f7139c386ae10497b83eab5168f9503057313990..0c4d026be76f1536e497039d63cde7de6d80d32a 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -92,6 +92,12 @@ pub enum ProviderError { #[error("Failed to setup fileserver: {0}")] FileServerSetupError(anyhow::Error), + #[error("Error uploading file: '{0}': {1}")] + UploadFile(String, anyhow::Error), + + #[error("Error downloading file: '{0}': {1}")] + DownloadFile(String, anyhow::Error), + #[error("Error sending file: '{0}': {1}")] SendFile(String, anyhow::Error), diff --git a/crates/provider/src/native/namespace.rs b/crates/provider/src/native/namespace.rs index 7ba225fc58ceb0bf50dc92058d93c45574687fb7..83ac4af40d356b625e75712be6fdcc683204a73b 100644 --- a/crates/provider/src/native/namespace.rs +++ b/crates/provider/src/native/namespace.rs @@ -100,6 +100,7 @@ where &options.env, &options.injected_files, &options.created_paths, + &options.db_snapshot.as_ref(), &self.filesystem, ) .await?; diff --git a/crates/provider/src/native/node.rs b/crates/provider/src/native/node.rs index 84b4925a3c8ba005aba82d45c99de3479d4eb87e..f199f98e099054f6fa9931e8022eb09472345018 100644 --- a/crates/provider/src/native/node.rs +++ b/crates/provider/src/native/node.rs @@ -7,12 +7,16 @@ use std::{ use anyhow::anyhow; use async_trait::async_trait; +use configuration::{shared::constants::THIS_IS_A_BUG, types::AssetLocation}; +use flate2::read::GzDecoder; use futures::future::try_join_all; use nix::{ sys::signal::{kill, Signal}, unistd::Pid, }; +use sha2::Digest; use support::fs::FileSystem; +use tar::Archive; use tokio::{ io::{AsyncRead, AsyncReadExt, BufReader}, process::{Child, ChildStderr, ChildStdout, Command}, @@ -30,7 +34,7 @@ use super::namespace::NativeNamespace; use crate::{ constants::{NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_RELAY_DATA_DIR, NODE_SCRIPTS_DIR}, types::{ExecutionResult, RunCommandOptions, RunScriptOptions, TransferedFile}, - ProviderError, ProviderNode, + ProviderError, ProviderNamespace, ProviderNode, }; pub(super) struct NativeNode<FS> @@ -69,6 +73,7 @@ where env: &[(String, String)], startup_files: &[TransferedFile], created_paths: &[PathBuf], + db_snapshot: &Option<&AssetLocation>, filesystem: &FS, ) -> Result<Arc<Self>, ProviderError> { let base_dir = PathBuf::from_iter([namespace_base_dir, &PathBuf::from(name)]); @@ -114,6 +119,10 @@ where node.initialize_startup_paths(created_paths).await?; node.initialize_startup_files(startup_files).await?; + if let Some(db_snap) = db_snapshot { + node.initialize_db_snapshot(db_snap).await?; + } + let (stdout, stderr) = node.initialize_process().await?; node.initialize_log_writing(stdout, stderr).await; @@ -150,6 +159,62 @@ where Ok(()) } + async fn initialize_db_snapshot( + &self, + db_snapshot: &AssetLocation, + ) -> Result<(), ProviderError> { + trace!("snap: {db_snapshot}"); + + // check if we need to get the db or is already in the ns + let ns_base_dir = self.namespace_base_dir(); + let hashed_location = match db_snapshot { + AssetLocation::Url(location) => hex::encode(sha2::Sha256::digest(location.to_string())), + AssetLocation::FilePath(filepath) => { + hex::encode(sha2::Sha256::digest(filepath.to_string_lossy().to_string())) + }, + }; + + let full_path = format!("{}/{}.tgz", ns_base_dir, hashed_location); + trace!("db_snap fullpath in ns: {full_path}"); + if !self.filesystem.exists(&full_path).await { + // needs to download/copy + self.get_db_snapshot(db_snapshot, &full_path).await?; + } + + let contents = self.filesystem.read(full_path).await.unwrap(); + let gz = GzDecoder::new(&contents[..]); + let mut archive = Archive::new(gz); + archive + .unpack(self.base_dir.to_string_lossy().as_ref()) + .unwrap(); + + Ok(()) + } + + async fn get_db_snapshot( + &self, + location: &AssetLocation, + full_path: &str, + ) -> Result<(), ProviderError> { + trace!("getting db_snapshot from: {:?} to: {full_path}", location); + match location { + AssetLocation::Url(location) => { + let res = reqwest::get(location.as_ref()) + .await + .map_err(|err| ProviderError::DownloadFile(location.to_string(), err.into()))?; + + let contents: &[u8] = &res.bytes().await.unwrap(); + trace!("writing: {full_path}"); + self.filesystem.write(full_path, contents).await?; + }, + AssetLocation::FilePath(filepath) => { + self.filesystem.copy(filepath, full_path).await?; + }, + }; + + Ok(()) + } + async fn initialize_process(&self) -> Result<(ChildStdout, ChildStderr), ProviderError> { let mut process = Command::new(&self.program) .args(&self.args) @@ -271,6 +336,13 @@ where Ok(()) } + + fn namespace_base_dir(&self) -> String { + self.namespace + .upgrade() + .map(|namespace| namespace.base_dir().to_string_lossy().to_string()) + .unwrap_or_else(|| panic!("namespace shouldn't be dropped, {}", THIS_IS_A_BUG)) + } } #[async_trait] diff --git a/crates/provider/src/shared/types.rs b/crates/provider/src/shared/types.rs index 0c0eaadc1117ccb109377e62545ddccfabac0e7e..4526ba08f82db240d7a298c449bdccf7a18103ca 100644 --- a/crates/provider/src/shared/types.rs +++ b/crates/provider/src/shared/types.rs @@ -3,7 +3,7 @@ use std::{ process::ExitStatus, }; -use configuration::shared::resources::Resources; +use configuration::{shared::resources::Resources, types::AssetLocation}; pub type Port = u16; @@ -25,18 +25,28 @@ pub struct ProviderCapabilities { #[derive(Debug, Clone)] pub struct SpawnNodeOptions { + /// Name of the node pub name: String, + /// Image of the node (IFF is supported by the provider) pub image: Option<String>, + /// Resources to apply to the node (IFF is supported by the provider) pub resources: Option<Resources>, + /// Main command to execute pub program: String, + /// Arguments to pass to the main command pub args: Vec<String>, + /// Environment to set when runnning the `program` pub env: Vec<(String, String)>, // TODO: rename startup_files + /// Files to inject at startup pub injected_files: Vec<TransferedFile>, /// Paths to create before start the node (e.g keystore) /// should be created with `create_dir_all` in order /// to create the full path even when we have missing parts pub created_paths: Vec<PathBuf>, + /// Database snapshot to be injected (should be a tgz file) + /// Could be a local or remote asset + pub db_snapshot: Option<AssetLocation>, } impl SpawnNodeOptions { @@ -53,6 +63,7 @@ impl SpawnNodeOptions { env: vec![], injected_files: vec![], created_paths: vec![], + db_snapshot: None, } } @@ -69,6 +80,11 @@ impl SpawnNodeOptions { self } + pub fn db_snapshot(mut self, db_snap: Option<AssetLocation>) -> Self { + self.db_snapshot = db_snap; + self + } + pub fn args<S, I>(mut self, args: I) -> Self where S: AsRef<str>, @@ -215,6 +231,7 @@ impl GenerateFilesOptions { } } +#[derive(Debug)] pub struct RunCommandOptions { pub program: String, pub args: Vec<String>,