From 814d84ec44aa6a606f7452dc23f2351885ebf9b7 Mon Sep 17 00:00:00 2001
From: Javier Viola <pepoviola@gmail.com>
Date: Tue, 3 Oct 2023 15:40:26 -0300
Subject: [PATCH] feat: add node methods for actions and metrics (#114)

---
 .../examples/small_network_with_default.rs    |  20 +++-
 crates/orchestrator/Cargo.toml                |   4 +-
 crates/orchestrator/src/network.rs            |  32 +-----
 crates/orchestrator/src/network/node.rs       | 107 ++++++++++++++++--
 crates/orchestrator/src/spawner.rs            |  10 +-
 5 files changed, 126 insertions(+), 47 deletions(-)

diff --git a/crates/examples/examples/small_network_with_default.rs b/crates/examples/examples/small_network_with_default.rs
index d003dde..a9d7ab8 100644
--- a/crates/examples/examples/small_network_with_default.rs
+++ b/crates/examples/examples/small_network_with_default.rs
@@ -37,17 +37,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     // TODO: add check to ensure if unique
     network.add_node("new1", opts, None).await?;
 
-    tokio::time::sleep(Duration::from_secs(5)).await;
+    tokio::time::sleep(Duration::from_secs(2)).await;
 
     // Example of some opertions that you can do
     // with `nodes` (e.g pause, resume, restart)
+
+    tokio::time::sleep(Duration::from_secs(10)).await;
+
+    // Get a ref to the node
+    let node = network.get_node("alice")?;
+
+    let is_10 = node.assert("block_height{status=\"best\"}", 10).await?;
+    println!("is_10: {is_10}");
+
+    let role = node.reports("node_roles").await?;
+    println!("Role is {role}");
+
     // pause the node
-    // network.pause_node("new1").await?;
+    // node.pause().await?;
     // println!("node new1 paused!");
 
-    // tokio::time::sleep(Duration::from_secs(5)).await;
+    tokio::time::sleep(Duration::from_secs(2)).await;
 
-    // network.resume_node("new1").await?;
+    // node.resume().await?;
     // println!("node new1 resumed!");
 
     let col_opts = AddNodeOpts {
diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml
index 626fe49..1663858 100644
--- a/crates/orchestrator/Cargo.toml
+++ b/crates/orchestrator/Cargo.toml
@@ -9,6 +9,7 @@ edition = "2021"
 configuration = { path = "../configuration" }
 support = { path = "../support" }
 provider = { path = "../provider" }
+prom-metrics-parser = { path = "../prom-metrics-parser" }
 tokio = { workspace = true, features = ["time"] }
 thiserror = { workspace = true }
 # TODO: add logger in a new pr.
@@ -21,4 +22,5 @@ rand = { workspace = true }
 sha2 = { workspace = true, default-features = false }
 hex = { workspace = true }
 sp-core = { workspace = true }
-libp2p = { workspace = true }
\ No newline at end of file
+libp2p = { workspace = true }
+reqwest = { workspace = true }
diff --git a/crates/orchestrator/src/network.rs b/crates/orchestrator/src/network.rs
index 6c72370..3654ab1 100644
--- a/crates/orchestrator/src/network.rs
+++ b/crates/orchestrator/src/network.rs
@@ -2,7 +2,7 @@ pub mod node;
 pub mod parachain;
 pub mod relaychain;
 
-use std::{collections::HashMap, path::PathBuf, time::Duration};
+use std::{collections::HashMap, path::PathBuf};
 
 use configuration::{
     shared::node::EnvVar,
@@ -181,36 +181,10 @@ impl<T: FileSystem> Network<T> {
     // deregister and stop the collator?
     // remove_parachain()
 
-    // Node actions
-    pub async fn pause_node(&self, node_name: impl Into<String>) -> Result<(), anyhow::Error> {
+    pub fn get_node(&self, node_name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
         let node_name = node_name.into();
         if let Some(node) = self.nodes_by_name.get(&node_name) {
-            node.inner.pause().await?;
-            Ok(())
-        } else {
-            Err(anyhow::anyhow!("can't find the node!"))
-        }
-    }
-
-    pub async fn resume_node(&self, node_name: impl Into<String>) -> Result<(), anyhow::Error> {
-        let node_name = node_name.into();
-        if let Some(node) = self.nodes_by_name.get(&node_name) {
-            node.inner.resume().await?;
-            Ok(())
-        } else {
-            Err(anyhow::anyhow!("can't find the node!"))
-        }
-    }
-
-    pub async fn restart_node(
-        &self,
-        node_name: impl Into<String>,
-        after: Option<Duration>,
-    ) -> Result<(), anyhow::Error> {
-        let node_name = node_name.into();
-        if let Some(node) = self.nodes_by_name.get(&node_name) {
-            node.inner.restart(after).await?;
-            Ok(())
+            Ok(node)
         } else {
             Err(anyhow::anyhow!("can't find the node!"))
         }
diff --git a/crates/orchestrator/src/network/node.rs b/crates/orchestrator/src/network/node.rs
index 66956d6..0be0b88 100644
--- a/crates/orchestrator/src/network/node.rs
+++ b/crates/orchestrator/src/network/node.rs
@@ -1,4 +1,9 @@
+use std::{sync::Arc, time::Duration};
+
+use anyhow::anyhow;
+use prom_metrics_parser::MetricMap;
 use provider::DynNode;
+use tokio::sync::RwLock;
 
 use crate::network_spec::node::NodeSpec;
 
@@ -11,22 +16,108 @@ pub struct NetworkNode {
     pub(crate) name: String,
     pub(crate) ws_uri: String,
     pub(crate) prometheus_uri: String,
+    metrics_cache: Arc<RwLock<MetricMap>>,
 }
 
 impl NetworkNode {
-    fn new(inner: DynNode, spec: NodeSpec, _ip: String) -> Self {
-        let name = spec.name.clone();
-        let ws_uri = "".into();
-        let prometheus_uri = "".into();
-
+    /// Create a new NetworkNode
+    pub(crate) fn new<T: Into<String>>(
+        name: T,
+        ws_uri: T,
+        prometheus_uri: T,
+        spec: NodeSpec,
+        inner: DynNode,
+    ) -> Self {
         Self {
+            name: name.into(),
+            ws_uri: ws_uri.into(),
+            prometheus_uri: prometheus_uri.into(),
             inner,
             spec,
-            name,
-            ws_uri,
-            prometheus_uri,
+            metrics_cache: Arc::new(Default::default()),
         }
     }
+
+    /// Pause the node, this is implemented by pausing the
+    /// actual process (e.g polkadot) with sendig `SIGSTOP` signal
+    pub async fn pause(&self) -> Result<(), anyhow::Error> {
+        self.inner.pause().await?;
+        Ok(())
+    }
+
+    /// Resume the node, this is implemented by resuming the
+    /// actual process (e.g polkadot) with sendig `SIGCONT` signal
+    pub async fn resume(&self) -> Result<(), anyhow::Error> {
+        self.inner.resume().await?;
+        Ok(())
+    }
+
+    /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
+    pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
+        self.inner.restart(after).await?;
+        Ok(())
+    }
+
+    /// Get metric value 'by name' from prometheus (exposed by the node)
+    /// metric name can be:
+    /// with prefix (e.g: 'polkadot_')
+    /// with chain attribute (e.g: 'chain=rococo-local')
+    /// without prefix and/or without chain attribute
+    pub async fn reports(&self, metric_name: impl Into<String>) -> Result<f64, anyhow::Error> {
+        let metric_name = metric_name.into();
+        // force cache reload
+        self.fetch_metrics().await?;
+        self.metric(&metric_name).await
+    }
+
+    /// Assert on a metric value 'by name' from prometheus (exposed by the node)
+    /// metric name can be:
+    /// with prefix (e.g: 'polkadot_')
+    /// with chain attribute (e.g: 'chain=rococo-local')
+    /// without prefix and/or without chain attribute
+    ///
+    /// We first try to assert on the value using the cached metrics and
+    /// if not meet the criteria we reload the cache and check again
+    pub async fn assert(
+        &self,
+        metric_name: impl Into<String>,
+        value: impl Into<f64>,
+    ) -> Result<bool, anyhow::Error> {
+        let metric_name = metric_name.into();
+        let value = value.into();
+        let val = self.metric(&metric_name).await?;
+        if val == value {
+            Ok(true)
+        } else {
+            // reload metrcis
+            self.fetch_metrics().await?;
+            let val = self.metric(&metric_name).await?;
+            Ok(val == value)
+        }
+    }
+
+    async fn fetch_metrics(&self) -> Result<(), anyhow::Error> {
+        let response = reqwest::get(&self.prometheus_uri).await?;
+        let metrics = prom_metrics_parser::parse(&response.text().await?)?;
+        let mut cache = self.metrics_cache.write().await;
+        *cache = metrics;
+        Ok(())
+    }
+
+    async fn metric(&self, metric_name: &str) -> Result<f64, anyhow::Error> {
+        let mut metrics_map = self.metrics_cache.read().await;
+        if metrics_map.is_empty() {
+            // reload metrics
+            drop(metrics_map);
+            self.fetch_metrics().await?;
+            metrics_map = self.metrics_cache.read().await;
+        }
+
+        let val = metrics_map
+            .get(metric_name)
+            .ok_or(anyhow!("metric '{}'not found!", &metric_name))?;
+        Ok(*val)
+    }
 }
 
 impl std::fmt::Debug for NetworkNode {
diff --git a/crates/orchestrator/src/spawner.rs b/crates/orchestrator/src/spawner.rs
index cd5b31f..61b28b9 100644
--- a/crates/orchestrator/src/spawner.rs
+++ b/crates/orchestrator/src/spawner.rs
@@ -154,11 +154,11 @@ where
     );
     println!("🚀 {} : metrics link {prometheus_uri}", node.name);
     println!("\n");
-    Ok(NetworkNode {
-        inner: running_node,
-        spec: node.clone(),
-        name: node.name.clone(),
+    Ok(NetworkNode::new(
+        node.name.clone(),
         ws_uri,
         prometheus_uri,
-    })
+        node.clone(),
+        running_node,
+    ))
 }
-- 
GitLab