Skip to content
Snippets Groups Projects
Verified Commit 30feb4b1 authored by Loris Moulin's avatar Loris Moulin
Browse files

feat: added modified implementation of run_script/run_command/copy_from_node...

feat: added modified implementation of run_script/run_command/copy_from_node in NativeProvider, removed unused comments
parent 563dd2c3
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,7 @@ use std::{
sync::{Arc, Weak},
};
use anyhow::anyhow;
use async_trait::async_trait;
use configuration::types::Port;
use nix::{
......@@ -29,10 +30,10 @@ use tokio::{
use uuid::Uuid;
use crate::{
errors::ProviderError,
shared::constants::{DEFAULT_TMP_DIR, NODE_CONFIG_DIR, NODE_DATA_DIR},
DynNamespace, DynNode, ExecutionResult, Provider, ProviderCapabilities, ProviderNamespace,
ProviderNode, RunCommandOptions, RunScriptOptions, SpawnNodeOptions, SpawnTempOptions,
shared::constants::{DEFAULT_TMP_DIR, NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_SCRIPTS_DIR},
DynNamespace, DynNode, ExecutionResult, Provider, ProviderCapabilities, ProviderError,
ProviderNamespace, ProviderNode, RunCommandOptions, RunScriptOptions, SpawnNodeOptions,
SpawnTempOptions,
};
pub struct NativeProviderOptions<FS>
......@@ -140,6 +141,7 @@ impl<FS: FileSystem + Send + Sync + Clone + 'static> ProviderNamespace for Nativ
let log_path = format!("{}/{}.log", &base_dir, &options.name);
let config_dir = format!("{}{}", &base_dir, NODE_CONFIG_DIR);
let data_dir = format!("{}{}", &base_dir, NODE_DATA_DIR);
let scripts_dir = format!("{}{}", &base_dir, NODE_SCRIPTS_DIR);
inner.filesystem.create_dir(&base_dir).await.unwrap();
inner.filesystem.create_dir(&config_dir).await.unwrap();
inner.filesystem.create_dir(&data_dir).await.unwrap();
......@@ -161,6 +163,8 @@ impl<FS: FileSystem + Send + Sync + Clone + 'static> ProviderNamespace for Nativ
command: options.command,
args: options.args,
env: options.env,
base_dir,
scripts_dir,
log_path,
process,
stdout_reading_handle,
......@@ -219,6 +223,8 @@ struct NativeNodeInner<FS: FileSystem + Send + Sync + Clone> {
command: String,
args: Vec<String>,
env: Vec<(String, String)>,
base_dir: String,
scripts_dir: String,
log_path: String,
process: Child,
stdout_reading_handle: JoinHandle<()>,
......@@ -228,8 +234,6 @@ struct NativeNodeInner<FS: FileSystem + Send + Sync + Clone> {
namespace: WeakNativeNamespace<FS>,
}
impl<FS: FileSystem + Send + Sync + Clone> NativeNodeInner<FS> {}
#[derive(Debug, Clone)]
struct NativeNode<FS: FileSystem + Send + Sync + Clone> {
inner: Arc<RwLock<NativeNodeInner<FS>>>,
......@@ -254,37 +258,81 @@ impl<FS: FileSystem + Send + Sync + Clone + 'static> ProviderNode for NativeNode
Ok(inner.filesystem.read_to_string(&inner.log_path).await?)
}
async fn dump_logs(&self, dest: PathBuf) -> Result<(), ProviderError> {
async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError> {
let logs = self.logs().await?;
Ok(self
.inner
.write()
.await
.filesystem
.write(dest, logs.as_bytes())
.write(local_dest, logs.as_bytes())
.await?)
}
async fn run_command(
&self,
_options: RunCommandOptions,
options: RunCommandOptions,
) -> Result<ExecutionResult, ProviderError> {
todo!()
let result = Command::new(options.command)
.args(options.args)
.output()
.await
.map_err(|err| ProviderError::RunCommandError(err.into()))?;
if result.status.success() {
Ok(Ok(String::from_utf8_lossy(&result.stdout).to_string()))
} else {
Ok(Err((
result.status,
String::from_utf8_lossy(&result.stderr).to_string(),
)))
}
}
async fn run_script(
&self,
_options: RunScriptOptions,
options: RunScriptOptions,
) -> Result<ExecutionResult, ProviderError> {
todo!()
let inner = self.inner.read().await;
let local_script_path = PathBuf::from(&options.local_script_path);
if !local_script_path.try_exists().unwrap() {
return Err(ProviderError::RunCommandError(anyhow!("Test")));
}
// extract file name and build remote file path
let script_file_name = local_script_path
.file_name()
.map(|file_name| file_name.to_string_lossy().to_string())
.ok_or(ProviderError::InvalidScriptPath(options.local_script_path))?;
let remote_script_path = format!("{}/{}", inner.scripts_dir, script_file_name);
// copy and set script's execute permission
inner
.filesystem
.copy(local_script_path, &remote_script_path)
.await?;
inner
.filesystem
.set_mode(&remote_script_path, 0o744)
.await?;
// execute script
self.run_command(RunCommandOptions::new(remote_script_path).args(options.args))
.await
}
async fn copy_file_from_node(
&self,
_remote_src: PathBuf,
_local_dest: PathBuf,
remote_src: PathBuf,
local_dest: PathBuf,
) -> Result<(), ProviderError> {
todo!()
let inner = self.inner.read().await;
let remote_file_path = format!("{}{}", inner.base_dir, remote_src.to_str().unwrap());
inner.filesystem.copy(remote_file_path, local_dest).await?;
Ok(())
}
async fn pause(&self) -> Result<(), ProviderError> {
......@@ -307,7 +355,7 @@ impl<FS: FileSystem + Send + Sync + Clone + 'static> ProviderNode for NativeNode
Ok(())
}
async fn restart(&mut self, after: Option<Duration>) -> Result<(), ProviderError> {
async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError> {
if let Some(duration) = after {
sleep(duration).await;
}
......@@ -418,7 +466,7 @@ fn create_process_with_log_tasks(
let stdout = process.stdout.take().expect("infaillible, stdout is piped");
let stderr = process.stderr.take().expect("Infaillible, stderr is piped");
// create additonnal long-running tasks for logs
// create additionnal long-running tasks for logs
let (stdout_tx, rx) = mpsc::channel(10);
let stderr_tx = stdout_tx.clone();
let stdout_reading_handle = create_stream_polling_task(stdout, stdout_tx);
......@@ -435,372 +483,33 @@ fn create_process_with_log_tasks(
#[cfg(test)]
mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn it_should_works() {
todo!();
}
}
// #[derive(Debug, Clone, PartialEq)]
// pub struct NativeProvider<T: FileSystem + Send + Sync> {
// // Namespace of the client (isolation directory)
// namespace: String,
// // TODO: re-iterate, since we are creating the config with the sdk
// // Path where configuration relies, all the `files` are accessed relative to this.
// // config_path: String,
// // Command to use, e.g "bash"
// command: String,
// // Temporary directory, root directory for the network
// tmp_dir: String,
// remote_dir: String,
// data_dir: String,
// process_map: HashMap<String, Process>,
// filesystem: T,
// }
// impl<T: FileSystem + Send + Sync> NativeProvider<T> {
// /// Zombienet `native` provider allows to run the nodes as a local process in the local environment
// /// params:
// /// namespace: Namespace of the client
// /// config_path: Path where configuration relies
// /// tmp_dir: Temporary directory where files will be placed
// /// filesystem: Filesystem to use (std::fs::FileSystem, mock etc.)
// pub fn new(
// namespace: impl Into<String>,
// // config_path: impl Into<String>,
// tmp_dir: impl Into<String>,
// filesystem: T,
// ) -> Self {
// let tmp_dir = tmp_dir.into();
// let process_map: HashMap<String, Process> = HashMap::new();
// Self {
// namespace: namespace.into(),
// // config_path: config_path.into(),
// remote_dir: format!("{}{}", &tmp_dir, DEFAULT_REMOTE_DIR),
// data_dir: format!("{}{}", &tmp_dir, DEFAULT_DATA_DIR),
// command: "bash".into(),
// tmp_dir,
// process_map,
// filesystem,
// }
// }
// fn get_process_by_node_name(&self, node_name: &str) -> Result<&Process, ProviderError> {
// self.process_map
// .get(node_name)
// .ok_or(ProviderError::MissingNodeInfo(
// node_name.to_owned(),
// "process".into(),
// ))
// }
// }
// pub struct Node {}
// #[async_trait]
// impl<T> Provider for NativeProvider<T>
// where
// T: FileSystem + Send + Sync,
// {
// type Node = Node;
// fn require_image() -> bool {
// false
// }
// async fn create_namespace(&mut self) -> Result<(), ProviderError> {
// // Native provider don't have the `namespace` isolation.
// // but we create the `remoteDir` to place files
// self.filesystem
// .create_dir(&self.remote_dir)
// .await
// .map_err(|e| ProviderError::FSError(Box::new(e)))?;
// Ok(())
// }
// async fn destroy_namespace(&self) -> Result<(), ProviderError> {
// // get pids to kill all related process
// let pids: Vec<String> = self
// .process_map
// .iter()
// .filter(|(_, process)| process.pid != 0)
// .map(|(_, process)| process.pid.to_string())
// .collect();
// // TODO: use a crate (or even std) to get this info instead of relying on bash
// let result = self
// .run_command(
// [format!(
// "ps ax| awk '{{print $1}}'| grep -E '{}'",
// pids.join("|")
// )]
// .to_vec(),
// NativeRunCommandOptions {
// is_failure_allowed: true,
// },
// )
// .await
// .unwrap();
// if result.exit_code.code().unwrap() == 0 {
// let pids_to_kill: Vec<String> = result
// .std_out
// .split(|c| c == '\n')
// .map(|s| s.into())
// .collect();
// let _ = self
// .run_command(
// [format!("kill -9 {}", pids_to_kill.join(" "))].to_vec(),
// NativeRunCommandOptions {
// is_failure_allowed: true,
// },
// )
// .await?;
// }
// Ok(())
// }
// async fn static_setup(&mut self) -> Result<(), ProviderError> {
// Ok(())
// }
// async fn spawn_node(
// &self,
// _node: Node,
// _files_inject: Vec<FileMap>,
// _keystore: &str,
// _db_snapshot: &str,
// ) -> Result<(), ProviderError> {
// // TODO: We should implement the logic to go from the `Node` (nodeSpec)
// // to the running node, since we will no expose anymore the underline `Def`.
// // We can follow the logic of the spawn_from_def later.
// Ok(())
// }
// async fn spawn_temp(
// &self,
// _node: Node,
// _files_inject: Vec<FileMap>,
// _files_get: Vec<FileMap>,
// ) -> Result<(), ProviderError> {
// // TODO: We should implement the logic to go from the `Node` (nodeSpec)
// // to the running node, since we will no expose anymore the underline `Def`.
// // We can follow the logic of the spawn_from_def later.
// Ok(())
// }
// async fn copy_file_from_node(
// &mut self,
// pod_file_path: PathBuf,
// local_file_path: PathBuf,
// ) -> Result<(), ProviderError> {
// // log::debug!("cp {} {}", pod_file_path.to_string_lossy(), local_file_path.to_string_lossy());
use std::os::unix::prelude::PermissionsExt;
// self.filesystem
// .copy(&pod_file_path, &local_file_path)
// .await
// .map_err(|e| ProviderError::FSError(Box::new(e)))?;
// Ok(())
// }
// async fn run_command(
// &self,
// mut args: Vec<String>,
// opts: NativeRunCommandOptions,
// ) -> Result<RunCommandResponse, ProviderError> {
// if let Some(arg) = args.get(0) {
// if arg == "bash" {
// args.remove(0);
// }
// }
use super::*;
// // -c is already used in the process::Command to execute the command thus
// // needs to be removed in case provided
// if let Some(arg) = args.get(0) {
// if arg == "-c" {
// args.remove(0);
// }
// }
// let result = Command::new(&self.command)
// .arg("-c")
// .arg(args.join(" "))
// .output()
// .await?;
// if !result.status.success() && !opts.is_failure_allowed {
// return Err(ProviderError::RunCommandError(args.join(" ")));
// } else {
// // cmd success or we allow to fail
// // in either case we return Ok
// Ok(RunCommandResponse {
// exit_code: result.status,
// std_out: String::from_utf8_lossy(&result.stdout).into(),
// std_err: if result.stderr.is_empty() {
// None
// } else {
// Some(String::from_utf8_lossy(&result.stderr).into())
// },
// })
// }
// }
// // TODO: Add test
// async fn run_script(
// &mut self,
// identifier: String,
// script_path: String,
// args: Vec<String>,
// ) -> Result<RunCommandResponse, ProviderError> {
// let script_filename = Path::new(&script_path)
// .file_name()
// .ok_or(ProviderError::InvalidScriptPath(script_path.clone()))?
// .to_str()
// .ok_or(ProviderError::InvalidScriptPath(script_path.clone()))?;
// let script_path_in_pod = format!("{}/{}/{}", self.tmp_dir, identifier, script_filename);
// // upload the script
// self.filesystem
// .copy(&script_path, &script_path_in_pod)
// .await
// .map_err(|e| ProviderError::FSError(Box::new(e)))?;
// // set as executable
// self.run_command(
// vec![
// "chmod".to_owned(),
// "+x".to_owned(),
// script_path_in_pod.clone(),
// ],
// NativeRunCommandOptions::default(),
// )
// .await?;
// let command = format!(
// "cd {}/{} && {} {}",
// self.tmp_dir,
// identifier,
// script_path_in_pod,
// args.join(" ")
// );
// let result = self
// .run_command(vec![command], NativeRunCommandOptions::default())
// .await?;
// Ok(RunCommandResponse {
// exit_code: result.exit_code,
// std_out: result.std_out,
// std_err: result.std_err,
// })
// }
// // TODO: Add test
// async fn get_node_logs(&mut self, name: &str) -> Result<String, ProviderError> {
// // For now in native let's just return all the logs
// let result = self
// .filesystem
// .read_file(&format!("{}/{}.log", self.tmp_dir, name))
// .await
// .map_err(|e| ProviderError::FSError(Box::new(e)))?;
// return Ok(result);
// }
// async fn dump_logs(&mut self, path: String, pod_name: String) -> Result<(), ProviderError> {
// let dst_file_name: String = format!("{}/logs/{}.log", path, pod_name);
// let _ = self
// .filesystem
// .copy(
// &format!("{}/{}.log", self.tmp_dir, pod_name),
// &dst_file_name,
// )
// .await;
// Ok(())
// }
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn it_should_works() {
let file = std::fs::File::create(format!(
"{}/{}",
std::env::temp_dir().to_string_lossy(),
Uuid::new_v4()
))
.unwrap();
// async fn get_logs_command(&self, name: &str) -> Result<String, ProviderError> {
// Ok(format!("tail -f {}/{}.log", self.tmp_dir, name))
// }
let metadata = file.metadata().unwrap();
// // TODO: Add test
// async fn restart(
// &mut self,
// node_name: &str,
// after_secs: Option<u16>,
// ) -> Result<bool, ProviderError> {
// let process = self.get_process_by_node_name(node_name)?;
let mut permissions = metadata.permissions();
permissions.set_mode(0o744);
// let _resp = self
// .run_command(
// vec![format!("kill -9 {:?}", process.pid)],
// NativeRunCommandOptions {
// is_failure_allowed: true,
// },
// )
// .await?;
// // log::debug!("{:?}", &resp);
// if let Some(secs) = after_secs {
// sleep(Duration::from_secs(secs.into())).await;
// }
// let process: &mut Process =
// self.process_map
// .get_mut(node_name)
// .ok_or(ProviderError::MissingNodeInfo(
// node_name.to_owned(),
// "process".into(),
// ))?;
// let mapped_env: HashMap<&str, &str> = process
// .env
// .iter()
// .map(|env_var| (env_var.name.as_str(), env_var.value.as_str()))
// .collect();
// let child_process: Child = Command::new(self.command.clone())
// .arg("-c")
// .arg(process.command.clone())
// .envs(&mapped_env)
// .spawn()
// .map_err(|e| ProviderError::ErrorSpawningNode(e.to_string()))?;
// process.pid = child_process.id().ok_or(ProviderError::ErrorSpawningNode(
// "Failed to get pid".to_string(),
// ))?;
// Ok(true)
// }
// async fn get_node_info(&self, node_name: &str) -> Result<(IpAddr, Port), ProviderError> {
// let host_port = self.get_port_mapping(P2P_PORT, node_name).await?;
// Ok((LOCALHOST, host_port))
// }
tokio::fs::set_permissions("/tmp/myscript.sh", permissions)
.await
.unwrap();
// async fn get_node_ip(&self, _node_name: &str) -> Result<IpAddr, ProviderError> {
// Ok(LOCALHOST)
// }
// let result = Command::new("/tmp/myscript.sh").output().await.unwrap();
// async fn get_port_mapping(&self, port: Port, node_name: &str) -> Result<Port, ProviderError> {
// match self.process_map.get(node_name) {
// Some(process) => match process.port_mapping.get(&port) {
// Some(port) => Ok(*port),
// None => Err(ProviderError::MissingNodeInfo(
// node_name.to_owned(),
// "port".into(),
// )),
// },
// None => Err(ProviderError::MissingNodeInfo(
// node_name.to_owned(),
// "process".into(),
// )),
// }
// }
// }
// println!("{:?}", result);
}
}
// #[cfg(test)]
// mod tests {
......
......@@ -6,6 +6,8 @@ pub const DEFAULT_TMP_DIR: &str = "/tmp";
pub const NODE_CONFIG_DIR: &str = "/cfg";
/// Directory for node configuration
pub const NODE_DATA_DIR: &str = "/data";
/// Directory for node scripts
pub const NODE_SCRIPTS_DIR: &str = "/scripts";
/// Localhost ip
pub const _LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
/// The port substrate listens for p2p connections on
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment