Skip to content
Snippets Groups Projects
Unverified Commit 814d84ec authored by Javier Viola's avatar Javier Viola Committed by GitHub
Browse files

feat: add node methods for actions and metrics (#114)

parent daba04c0
Branches
No related merge requests found
......@@ -37,17 +37,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: add check to ensure if unique
network.add_node("new1", opts, None).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
tokio::time::sleep(Duration::from_secs(2)).await;
// Example of some opertions that you can do
// with `nodes` (e.g pause, resume, restart)
tokio::time::sleep(Duration::from_secs(10)).await;
// Get a ref to the node
let node = network.get_node("alice")?;
let is_10 = node.assert("block_height{status=\"best\"}", 10).await?;
println!("is_10: {is_10}");
let role = node.reports("node_roles").await?;
println!("Role is {role}");
// pause the node
// network.pause_node("new1").await?;
// node.pause().await?;
// println!("node new1 paused!");
// tokio::time::sleep(Duration::from_secs(5)).await;
tokio::time::sleep(Duration::from_secs(2)).await;
// network.resume_node("new1").await?;
// node.resume().await?;
// println!("node new1 resumed!");
let col_opts = AddNodeOpts {
......
......@@ -9,6 +9,7 @@ edition = "2021"
configuration = { path = "../configuration" }
support = { path = "../support" }
provider = { path = "../provider" }
prom-metrics-parser = { path = "../prom-metrics-parser" }
tokio = { workspace = true, features = ["time"] }
thiserror = { workspace = true }
# TODO: add logger in a new pr.
......@@ -21,4 +22,5 @@ rand = { workspace = true }
sha2 = { workspace = true, default-features = false }
hex = { workspace = true }
sp-core = { workspace = true }
libp2p = { workspace = true }
\ No newline at end of file
libp2p = { workspace = true }
reqwest = { workspace = true }
......@@ -2,7 +2,7 @@ pub mod node;
pub mod parachain;
pub mod relaychain;
use std::{collections::HashMap, path::PathBuf, time::Duration};
use std::{collections::HashMap, path::PathBuf};
use configuration::{
shared::node::EnvVar,
......@@ -181,36 +181,10 @@ impl<T: FileSystem> Network<T> {
// deregister and stop the collator?
// remove_parachain()
// Node actions
pub async fn pause_node(&self, node_name: impl Into<String>) -> Result<(), anyhow::Error> {
pub fn get_node(&self, node_name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
let node_name = node_name.into();
if let Some(node) = self.nodes_by_name.get(&node_name) {
node.inner.pause().await?;
Ok(())
} else {
Err(anyhow::anyhow!("can't find the node!"))
}
}
pub async fn resume_node(&self, node_name: impl Into<String>) -> Result<(), anyhow::Error> {
let node_name = node_name.into();
if let Some(node) = self.nodes_by_name.get(&node_name) {
node.inner.resume().await?;
Ok(())
} else {
Err(anyhow::anyhow!("can't find the node!"))
}
}
pub async fn restart_node(
&self,
node_name: impl Into<String>,
after: Option<Duration>,
) -> Result<(), anyhow::Error> {
let node_name = node_name.into();
if let Some(node) = self.nodes_by_name.get(&node_name) {
node.inner.restart(after).await?;
Ok(())
Ok(node)
} else {
Err(anyhow::anyhow!("can't find the node!"))
}
......
use std::{sync::Arc, time::Duration};
use anyhow::anyhow;
use prom_metrics_parser::MetricMap;
use provider::DynNode;
use tokio::sync::RwLock;
use crate::network_spec::node::NodeSpec;
......@@ -11,22 +16,108 @@ pub struct NetworkNode {
pub(crate) name: String,
pub(crate) ws_uri: String,
pub(crate) prometheus_uri: String,
metrics_cache: Arc<RwLock<MetricMap>>,
}
impl NetworkNode {
fn new(inner: DynNode, spec: NodeSpec, _ip: String) -> Self {
let name = spec.name.clone();
let ws_uri = "".into();
let prometheus_uri = "".into();
/// Create a new NetworkNode
pub(crate) fn new<T: Into<String>>(
name: T,
ws_uri: T,
prometheus_uri: T,
spec: NodeSpec,
inner: DynNode,
) -> Self {
Self {
name: name.into(),
ws_uri: ws_uri.into(),
prometheus_uri: prometheus_uri.into(),
inner,
spec,
name,
ws_uri,
prometheus_uri,
metrics_cache: Arc::new(Default::default()),
}
}
/// Pause the node, this is implemented by pausing the
/// actual process (e.g polkadot) with sendig `SIGSTOP` signal
pub async fn pause(&self) -> Result<(), anyhow::Error> {
self.inner.pause().await?;
Ok(())
}
/// Resume the node, this is implemented by resuming the
/// actual process (e.g polkadot) with sendig `SIGCONT` signal
pub async fn resume(&self) -> Result<(), anyhow::Error> {
self.inner.resume().await?;
Ok(())
}
/// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
self.inner.restart(after).await?;
Ok(())
}
/// Get metric value 'by name' from prometheus (exposed by the node)
/// metric name can be:
/// with prefix (e.g: 'polkadot_')
/// with chain attribute (e.g: 'chain=rococo-local')
/// without prefix and/or without chain attribute
pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
let metric_name = metric_name.into();
// force cache reload
self.fetch_metrics().await?;
self.metric(&metric_name).await
}
/// Assert on a metric value 'by name' from prometheus (exposed by the node)
/// metric name can be:
/// with prefix (e.g: 'polkadot_')
/// with chain attribute (e.g: 'chain=rococo-local')
/// without prefix and/or without chain attribute
///
/// We first try to assert on the value using the cached metrics and
/// if not meet the criteria we reload the cache and check again
pub async fn assert(
&self,
metric_name: impl Into<String>,
value: impl Into<f64>,
) -> Result<bool, anyhow::Error> {
let metric_name = metric_name.into();
let value = value.into();
let val = self.metric(&metric_name).await?;
if val == value {
Ok(true)
} else {
// reload metrcis
self.fetch_metrics().await?;
let val = self.metric(&metric_name).await?;
Ok(val == value)
}
}
async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
let response = reqwest::get(&self.prometheus_uri).await?;
let metrics = prom_metrics_parser::parse(&response.text().await?)?;
let mut cache = self.metrics_cache.write().await;
*cache = metrics;
Ok(())
}
async fn metric(&self, metric_name: &str) -> Result<f64, anyhow::Error> {
let mut metrics_map = self.metrics_cache.read().await;
if metrics_map.is_empty() {
// reload metrics
drop(metrics_map);
self.fetch_metrics().await?;
metrics_map = self.metrics_cache.read().await;
}
let val = metrics_map
.get(metric_name)
.ok_or(anyhow!("metric '{}'not found!", &metric_name))?;
Ok(*val)
}
}
impl std::fmt::Debug for NetworkNode {
......
......@@ -154,11 +154,11 @@ where
);
println!("🚀 {} : metrics link {prometheus_uri}", node.name);
println!("\n");
Ok(NetworkNode {
inner: running_node,
spec: node.clone(),
name: node.name.clone(),
Ok(NetworkNode::new(
node.name.clone(),
ws_uri,
prometheus_uri,
})
node.clone(),
running_node,
))
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment