Skip to content
Snippets Groups Projects
Unverified Commit eec89c2e authored by Javier Viola's avatar Javier Viola Committed by GitHub
Browse files

feat: add verifier to check readiness (#118)

parent 2a221b48
Branches
No related merge requests found
use std::time::Duration;
use configuration::NetworkConfigBuilder; use configuration::NetworkConfigBuilder;
use orchestrator::{AddNodeOpts, Orchestrator}; use orchestrator::{AddNodeOpts, Orchestrator};
use provider::NativeProvider; use provider::NativeProvider;
...@@ -37,13 +35,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { ...@@ -37,13 +35,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: add check to ensure if unique // TODO: add check to ensure if unique
network.add_node("new1", opts, None).await?; network.add_node("new1", opts, None).await?;
tokio::time::sleep(Duration::from_secs(2)).await;
// Example of some opertions that you can do // Example of some opertions that you can do
// with `nodes` (e.g pause, resume, restart) // with `nodes` (e.g pause, resume, restart)
tokio::time::sleep(Duration::from_secs(10)).await;
// Get a ref to the node // Get a ref to the node
let node = network.get_node("alice")?; let node = network.get_node("alice")?;
...@@ -57,8 +51,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { ...@@ -57,8 +51,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// node.pause().await?; // node.pause().await?;
// println!("node new1 paused!"); // println!("node new1 paused!");
tokio::time::sleep(Duration::from_secs(2)).await;
// node.resume().await?; // node.resume().await?;
// println!("node new1 resumed!"); // println!("node new1 resumed!");
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
mod errors; mod errors;
mod generators; mod generators;
mod network; mod network;
mod network_helper;
mod network_spec; mod network_spec;
mod shared; mod shared;
mod spawner; mod spawner;
...@@ -73,8 +74,9 @@ where ...@@ -73,8 +74,9 @@ where
// create namespace // create namespace
let ns = self.provider.create_namespace().await?; let ns = self.provider.create_namespace().await?;
println!("ns: {:#?}", ns.id()); println!("\n\n");
println!("base_dir: {:#?}", ns.base_dir()); println!("🧰 ns: {:#?}", ns.id());
println!("🧰 base_dir: {:#?}", ns.base_dir());
// TODO: noop for native // TODO: noop for native
// Static setup // Static setup
...@@ -211,6 +213,7 @@ where ...@@ -211,6 +213,7 @@ where
scoped_fs: &scoped_fs, scoped_fs: &scoped_fs,
parachain: None, parachain: None,
bootnodes_addr: &vec![], bootnodes_addr: &vec![],
wait_ready: false,
}; };
let global_files_to_inject = vec![TransferedFile { let global_files_to_inject = vec![TransferedFile {
...@@ -372,7 +375,8 @@ where ...@@ -372,7 +375,8 @@ where
// - add-ons (introspector/tracing/etc) // - add-ons (introspector/tracing/etc)
// - verify nodes (clean metrics cache?) // verify nodes
network_helper::verifier::verify_nodes(&network.nodes()).await?;
// - write zombie.json state file (we should defined in a way we can load later) // - write zombie.json state file (we should defined in a way we can load later)
......
...@@ -142,6 +142,7 @@ impl<T: FileSystem> Network<T> { ...@@ -142,6 +142,7 @@ impl<T: FileSystem> Network<T> {
scoped_fs: &scoped_fs, scoped_fs: &scoped_fs,
parachain: para_spec, parachain: para_spec,
bootnodes_addr: &vec![], bootnodes_addr: &vec![],
wait_ready: true,
}; };
let mut global_files_to_inject = vec![TransferedFile { let mut global_files_to_inject = vec![TransferedFile {
...@@ -190,6 +191,10 @@ impl<T: FileSystem> Network<T> { ...@@ -190,6 +191,10 @@ impl<T: FileSystem> Network<T> {
} }
} }
pub fn nodes(&self) -> Vec<&NetworkNode> {
self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
}
// Internal API // Internal API
pub(crate) fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) { pub(crate) fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
if let Some(para_id) = para_id { if let Some(para_id) = para_id {
......
pub mod verifier;
use std::time::Duration;
use tokio::time::timeout;
use crate::network::node::NetworkNode;
pub async fn verify_nodes(nodes: &[&NetworkNode]) -> Result<(), anyhow::Error> {
timeout(Duration::from_secs(90), check_nodes(nodes))
.await
.map_err(|_| anyhow::anyhow!("one or more nodes are not ready!"))
}
// TODO: we should inject in someway the logic to make the request
// in order to allow us to `mock` and easily test this.
// maybe moved to the provider with a NodeStatus, and some helpers like wait_running, wait_ready, etc... ? to be discussed
async fn check_nodes(nodes: &[&NetworkNode]) {
loop {
let tasks: Vec<_> = nodes
.iter()
.map(|node| {
// TODO: move to logger
// println!("getting from {}", node.name);
reqwest::get(node.prometheus_uri.clone())
})
.collect();
let all_ready = futures::future::try_join_all(tasks).await;
if all_ready.is_ok() {
return;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
...@@ -32,6 +32,9 @@ pub struct SpawnNodeCtx<'a, T: FileSystem> { ...@@ -32,6 +32,9 @@ pub struct SpawnNodeCtx<'a, T: FileSystem> {
pub(crate) parachain: Option<&'a ParachainSpec>, pub(crate) parachain: Option<&'a ParachainSpec>,
/// The string represenation of the bootnode addres to pass to nodes /// The string represenation of the bootnode addres to pass to nodes
pub(crate) bootnodes_addr: &'a Vec<String>, pub(crate) bootnodes_addr: &'a Vec<String>,
/// Flag to wait node is ready or not
/// Ready state means we can query prometheus internal server
pub(crate) wait_ready: bool,
} }
pub async fn spawn_node<'a, T>( pub async fn spawn_node<'a, T>(
...@@ -153,6 +156,7 @@ where ...@@ -153,6 +156,7 @@ where
node.name node.name
); );
println!("🚀 {} : metrics link {prometheus_uri}", node.name); println!("🚀 {} : metrics link {prometheus_uri}", node.name);
println!("📓 logs cmd: tail -f {}/{}.log", base_dir, node.name);
println!("\n"); println!("\n");
Ok(NetworkNode::new( Ok(NetworkNode::new(
node.name.clone(), node.name.clone(),
......
pub mod register_para;
pub mod validator_actions;
\ No newline at end of file
use std::str::FromStr;
use subxt::{dynamic::Value, OnlineClient, SubstrateConfig};
use subxt_signer::{sr25519::Keypair, SecretUri};
use support::fs::FileSystem;
use crate::{shared::types::RegisterParachainOptions, ScopedFilesystem};
pub async fn register(
options: RegisterParachainOptions,
scoped_fs: &ScopedFilesystem<'_, impl FileSystem>,
) -> Result<(), anyhow::Error> {
println!("Registering parachain: {:?}", options);
// get the seed
let sudo: Keypair;
if let Some(possible_seed) = options.seed {
sudo = Keypair::from_seed(possible_seed).expect("seed should return a Keypair.");
} else {
let uri = SecretUri::from_str("//Alice")?;
sudo = Keypair::from_uri(&uri)?;
}
let genesis_state = scoped_fs
.read_to_string(options.state_path)
.await
.expect("State Path should be ok by this point.");
let wasm_data = scoped_fs
.read_to_string(options.wasm_path)
.await
.expect("Wasm Path should be ok by this point.");
let api = OnlineClient::<SubstrateConfig>::from_url(options.node_ws_url).await?;
let schedule_para = subxt::dynamic::tx(
"ParasSudoWrapper",
"sudo_schedule_para_initialize",
vec![
Value::primitive(options.id.into()),
Value::named_composite([
(
"genesis_head",
Value::from_bytes(hex::decode(&genesis_state[2..])?),
),
(
"validation_code",
Value::from_bytes(hex::decode(&wasm_data[2..])?),
),
("para_kind", Value::bool(options.onboard_as_para)),
]),
],
);
let sudo_call = subxt::dynamic::tx("Sudo", "sudo", vec![schedule_para.into_value()]);
// TODO: uncomment below and fix the sign and submit (and follow afterwards until
// finalized block) to register the parachain
let result = api
.tx()
.sign_and_submit_then_watch_default(&sudo_call, &sudo)
.await?;
let result = result.wait_for_in_block().await?;
println!("In block: {:#?}", result.block_hash());
Ok(())
}
\ No newline at end of file
use std::str::FromStr;
use subxt::{dynamic::Value, OnlineClient, SubstrateConfig};
use subxt_signer::{sr25519::Keypair, SecretUri};
pub async fn register(
validator_ids: Vec<String>,
node_ws_url: &str,
) -> Result<(), anyhow::Error> {
println!("Registering validators: {:?}", validator_ids);
// get the seed
// let sudo: Keypair;
// if let Some(possible_seed) = options.seed {
// sudo = Keypair::from_seed(possible_seed).expect("seed should return a Keypair.");
// } else {
let uri = SecretUri::from_str("//Alice")?;
let sudo = Keypair::from_uri(&uri)?;
// }
println!("pse");
let api = OnlineClient::<SubstrateConfig>::from_url(node_ws_url).await?;
println!("pse connected");
// let bytes: Vec<Value> = validator_ids.iter().map(|id| Value::from_bytes(id)).collect();
// println!("{:?}", bytes);
let register_call = subxt::dynamic::tx(
"ValidatorManager",
"register_validators",
vec![Value::unnamed_composite(vec![Value::from_bytes(validator_ids.first().unwrap().as_bytes())])],
);
let sudo_call = subxt::dynamic::tx("Sudo", "sudo", vec![register_call.into_value()]);
println!("pse1");
// TODO: uncomment below and fix the sign and submit (and follow afterwards until
// finalized block) to register the parachain
let result = api
.tx()
.sign_and_submit_then_watch_default(&sudo_call, &sudo)
.await?;
println!("result: {:#?}", result);
let result = result.wait_for_in_block().await?;
println!("In block: {:#?}", result.block_hash());
Ok(())
}
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment