Skip to content
Snippets Groups Projects
Unverified Commit 85993c8f authored by Javier Viola's avatar Javier Viola Committed by GitHub
Browse files

feat(orchestrator): Add wait methods for log line count (#235)

parent 4bf3fcf7
Branches
No related merge requests found
Pipeline #481235 failed with stage
in 3 minutes and 16 seconds
......@@ -56,6 +56,7 @@ axum-extra = { version = "0.9" }
tower = { version = "0.4" }
tower-http = { version = "0.5" }
tracing-subscriber = { version = "0.3" }
glob-match = "0.2.1"
# Zombienet workspace crates:
support = { package = "zombienet-support", version = "0.2.5", path = "crates/support" }
......
......@@ -30,6 +30,8 @@ reqwest = { workspace = true }
tracing = { workspace = true }
pjs-rs = { version = "0.1.2", optional = true }
uuid = { workspace = true }
regex = { workspace = true }
glob-match = { workspace = true }
# Zombienet deps
configuration = { workspace = true }
......
use std::{sync::Arc, time::Duration};
use anyhow::anyhow;
use glob_match::glob_match;
use prom_metrics_parser::MetricMap;
use provider::DynNode;
use regex::Regex;
use subxt::{backend::rpc::RpcClient, OnlineClient};
use support::{constants::THIS_IS_A_BUG, net::wait_ws_ready};
use support::net::wait_ws_ready;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{debug, trace, warn};
use tracing::{debug, trace};
use crate::network_spec::node::NodeSpec;
#[cfg(feature = "pjs")]
......@@ -66,12 +68,6 @@ impl NetworkNode {
&self.ws_uri
}
/// Get the logs of the node
/// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
pub async fn logs(&self) -> Result<String, anyhow::Error> {
Ok(self.inner.logs().await?)
}
// Subxt
/// Get the rpc client for the node
......@@ -138,7 +134,7 @@ impl NetworkNode {
Ok(())
}
// Assertions
// Metrics assertions
/// Get metric value 'by name' from prometheus (exposed by the node)
/// metric name can be:
......@@ -189,12 +185,14 @@ impl NetworkNode {
}
}
// Wait methods
// Wait methods for metrics
/// Wait until a metric value pass the `predicate`
pub async fn wait_metric(
&self,
metric_name: impl Into<String>,
predicate: impl Fn(f64) -> bool,
) -> Result<bool, anyhow::Error> {
) -> Result<(), anyhow::Error> {
let metric_name = metric_name.into();
debug!("waiting until metric {metric_name} pass the predicate");
loop {
......@@ -202,7 +200,7 @@ impl NetworkNode {
match res {
Ok(res) => {
if res {
return Ok(true);
return Ok(());
}
},
Err(e) => {
......@@ -234,12 +232,14 @@ impl NetworkNode {
}
}
/// Wait until a metric value pass the `predicate`
/// with a timeout (secs)
pub async fn wait_metric_with_timeout(
&self,
metric_name: impl Into<String>,
predicate: impl Fn(f64) -> bool,
timeout_secs: impl Into<u64>,
) -> Result<bool, anyhow::Error> {
) -> Result<(), anyhow::Error> {
let metric_name = metric_name.into();
let secs = timeout_secs.into();
debug!("waiting until metric {metric_name} pass the predicate");
......@@ -251,12 +251,7 @@ impl NetworkNode {
if let Ok(inner_res) = res {
match inner_res {
Ok(true) => Ok(true),
Ok(false) => {
// should not happens
warn!("wait_metric return false");
Err(anyhow!("wait_metric return false, {THIS_IS_A_BUG}"))
},
Ok(_) => Ok(()),
Err(e) => Err(anyhow!("Error waiting for metric: {}", e)),
}
} else {
......@@ -267,11 +262,67 @@ impl NetworkNode {
}
}
// Logs
/// Get the logs of the node
/// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
pub async fn logs(&self) -> Result<String, anyhow::Error> {
Ok(self.inner.logs().await?)
}
/// Wait until a the number of matching log lines is reach
pub async fn wait_log_line_count<'a>(
&self,
pattern: impl Into<String>,
is_glob: bool,
count: usize,
) -> Result<(), anyhow::Error> {
let pattern: String = pattern.into();
debug!("waiting until we find pattern {pattern} {count} times");
let match_fn: Box<dyn Fn(&str) -> bool> = if is_glob {
Box::new(|line: &str| -> bool { glob_match(&pattern, line) })
} else {
let re = Regex::new(&pattern)?;
Box::new(move |line: &str| -> bool { re.is_match(line) })
};
loop {
let mut q = 0_usize;
let logs = self.logs().await?;
for line in logs.lines() {
println!("line is {line}");
if match_fn(line) {
println!("pattern {pattern} match in line {line}");
q += 1;
if q >= count {
return Ok(());
}
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
/// Wait until a the number of matching log lines is reach
/// with timeout (secs)
pub async fn wait_log_line_count_with_timeout(
&self,
substring: impl Into<String>,
is_glob: bool,
count: usize,
timeout_secs: impl Into<u64>,
) -> Result<(), anyhow::Error> {
let secs = timeout_secs.into();
debug!("waiting until match {count} lines");
tokio::time::timeout(
Duration::from_secs(secs),
self.wait_log_line_count(substring, is_glob, count),
)
.await?
}
// TODO: impl
// wait_log_line_count
// wait_log_line_count_with_timeout
// wait_subxt_client
// wait_subxt_client_with_timeout
// wait_event_count
// wait_event_count_with_timeout
......
......@@ -52,19 +52,22 @@ async fn ci_k8s_basic_functionalities_should_works() {
.await;
assert!(r.is_err());
let (best_block_pass, client) = try_join!(
let (_best_block_pass, client) = try_join!(
alice.wait_metric(BEST_BLOCK_METRIC, |x| x > 5_f64),
alice.wait_client::<subxt::PolkadotConfig>()
)
.unwrap();
// check best block through metrics without timeout
assert!(best_block_pass);
alice
.wait_log_line_count("*rted #1*", true, 10)
.await
.unwrap();
// check best block through metrics with timeout
assert!(alice
.wait_metric_with_timeout(BEST_BLOCK_METRIC, |x| x > 10_f64, 45_u32)
.await
.unwrap());
.is_ok());
// ensure timeout error
let best_block = alice.reports(BEST_BLOCK_METRIC).await.unwrap();
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment