Unverified Commit 004dc50d authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus Committed by GitHub
Browse files

Add metrics timing message passing from OverseerSubsystemContext to Overseer::route_message (#2201)

* add timing setup to OverseerSubsystemContext

* figure out how to initialize the rng

* attach a timer to a portion of the messages traveling to the Overseer

This timer only exists / logs a fraction of the time (configurable
by `MESSAGE_TIMER_METRIC_CAPTURE_RATE`). When it exists, it tracks
the span between the `OverSubsystemContext` receiving the message
and its receipt in `Overseer::run`.

* propagate message timing to the start of route_message

This should be more accurate; it ensures that the timer runs
at least as long as that function. As `route_message` is async,
it may not actually run for some time after it is called (or ever).

* fix failing test

* rand_chacha apparently implicitly has getrandom feature

* change rng initialization

The previous impl using `from_entropy` depends on the `getrandom`
crate, which uses the system entropy source, and which does not
work on `wasm32-unknown-unknown` because it wants to fall back to
a JS implementation which we can't assume exists.

This impl depends only on `rand::thread_rng`, which has no documentation
stating that it's similarly limited.

* remove randomness in favor of a simpler 1 of N procedure

This deserves a bit of explanation, as the motivating issue explicitly
requested randomness. In short, it's hard to get randomness to compile
for `wasm32-unknown-unknown` because that is explicitly intended to be
as deterministic as practical. Additionally, even though it would never
be used for consensus purposes, it still felt offputting to intentionally
introduce randomness into a node's operations. Except, it wasn't really
random, either: it was a deterministic PRNG varying only in its state,
and getting the state to work right for that target would have required
initializing from a constant.

Given that it was a deterministic sequence anyway, it seemed much simpler
and more explicit to simply select one of each N messages instead of
attempting any kind of realistic randomness.

* reinstate randomness for better statistical properties

This partially reverts commit 0ab8594c.

`oorandom` is much lighter than the previous `rand`-based implementation,
which makes this easier to work with.

This implementation gives each subsystem and each child RNG a distinct
increment, which should ensure they produce distinct streams of values.
parent 0253be89
Pipeline #118919 canceled with stages
in 5 minutes and 11 seconds
......@@ -3841,6 +3841,12 @@ dependencies = [
"parking_lot 0.11.1",
]
[[package]]
name = "oorandom"
version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opaque-debug"
version = "0.2.3"
......@@ -5337,6 +5343,7 @@ dependencies = [
"futures 0.3.8",
"futures-timer 3.0.2",
"kv-log-macro",
"oorandom",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
......
......@@ -5,17 +5,18 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
async-trait = "0.1.42"
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.8"
tracing = "0.1.22"
tracing-futures = "0.2.4"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
oorandom = "11.1.3"
polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" }
polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-primitives = { path = "../../primitives" }
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" }
async-trait = "0.1.42"
streamunordered = "0.5.1"
tracing = "0.1.22"
tracing-futures = "0.2.4"
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
......
......@@ -59,7 +59,7 @@
// yielding false positives
#![warn(missing_docs)]
use std::fmt::Debug;
use std::fmt::{self, Debug};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
......@@ -74,6 +74,7 @@ use futures::{
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use oorandom::Rand32;
use streamunordered::{StreamYield, StreamUnordered};
use polkadot_primitives::v1::{Block, BlockNumber, Hash};
......@@ -99,6 +100,8 @@ const CHANNEL_CAPACITY: usize = 1024;
const STOP_DELAY: u64 = 1;
// Target for logs.
const LOG_TARGET: &'static str = "overseer";
// Rate at which messages are timed.
const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005;
/// A type of messages that are sent from [`Subsystem`] to [`Overseer`].
///
......@@ -291,6 +294,26 @@ struct SubsystemInstance<M> {
name: &'static str,
}
type MaybeTimer = Option<metrics::prometheus::prometheus::HistogramTimer>;
#[derive(Debug)]
struct MaybeTimed<T> {
timer: MaybeTimer,
t: T,
}
impl<T> MaybeTimed<T> {
fn into_inner(self) -> T {
self.t
}
}
impl<T> From<T> for MaybeTimed<T> {
fn from(t: T) -> Self {
Self { timer: None, t }
}
}
/// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or to spawn it's [`SubsystemJob`]s.
......@@ -301,7 +324,82 @@ struct SubsystemInstance<M> {
#[derive(Debug)]
pub struct OverseerSubsystemContext<M>{
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::Sender<ToOverseer>,
tx: mpsc::Sender<MaybeTimed<ToOverseer>>,
metrics: Metrics,
rng: Rand32,
threshold: u32,
}
impl<M> OverseerSubsystemContext<M> {
/// Create a new `OverseerSubsystemContext`.
///
/// `increment` determines the initial increment of the internal RNG.
/// The internal RNG is used to determine which messages are timed.
///
/// `capture_rate` determines what fraction of messages are timed. Its value is clamped
/// to the range `0.0..=1.0`.
fn new(
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::Sender<MaybeTimed<ToOverseer>>,
metrics: Metrics,
increment: u64,
mut capture_rate: f64,
) -> Self {
let rng = Rand32::new_inc(0, increment);
if capture_rate < 0.0 {
capture_rate = 0.0;
} else if capture_rate > 1.0 {
capture_rate = 1.0;
}
let threshold = (capture_rate * u32::MAX as f64) as u32;
OverseerSubsystemContext { rx, tx, metrics, rng, threshold }
}
/// Create a new `OverseserSubsystemContext` with no metering.
///
/// Intended for tests.
#[allow(unused)]
fn new_unmetered(rx: mpsc::Receiver<FromOverseer<M>>, tx: mpsc::Sender<MaybeTimed<ToOverseer>>) -> Self {
let metrics = Metrics::default();
OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0)
}
fn maybe_timed<T>(&mut self, t: T) -> MaybeTimed<T> {
let timer = if self.rng.rand_u32() <= self.threshold {
self.metrics.time_message_hold()
} else {
None
};
MaybeTimed { timer, t }
}
/// Make a standalone function which can construct a `MaybeTimed` wrapper around some `T`
/// without borrowing `self`.
///
/// This is somewhat more expensive than `self.maybe_timed` because it must clone some stuff.
fn make_maybe_timed<T>(&mut self) -> impl FnMut(T) -> MaybeTimed<T> {
// We don't want to simply clone this RNG because we don't want to duplicate its state.
// It's not ever going to be used for cryptographic purposes, but it's still better to
// keep good habits.
let (seed, increment) = self.rng.state();
let mut rng = Rand32::new_inc(seed, increment + 1);
let metrics = self.metrics.clone();
let threshold = self.threshold;
move |t| {
let timer = if rng.rand_u32() <= threshold {
metrics.time_message_hold()
} else {
None
};
MaybeTimed { timer, t }
}
}
}
#[async_trait::async_trait]
......@@ -327,7 +425,7 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
self.tx.send(ToOverseer::SpawnJob {
self.send_timed(ToOverseer::SpawnJob {
name,
s,
}).await.map_err(Into::into)
......@@ -336,7 +434,7 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
self.tx.send(ToOverseer::SpawnBlockingJob {
self.send_timed(ToOverseer::SpawnBlockingJob {
name,
s,
}).await.map_err(Into::into)
......@@ -349,25 +447,46 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
if self.tx.send_all(&mut msgs).await.is_err() {
self.send_all_timed_or_log(msgs).await
}
}
impl<M> OverseerSubsystemContext<M> {
async fn send_and_log_error(&mut self, msg: ToOverseer) {
if self.send_timed(msg).await.is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
"Failed to send messages to Overseer",
"Failed to send a message to Overseer",
);
}
}
}
impl<M> OverseerSubsystemContext<M> {
async fn send_and_log_error(&mut self, msg: ToOverseer) {
if self.tx.send(msg).await.is_err() {
async fn send_timed(&mut self, msg: ToOverseer) -> Result<
(),
<mpsc::Sender<MaybeTimed<ToOverseer>> as futures::Sink<MaybeTimed<ToOverseer>>>::Error
>
{
let msg = self.maybe_timed(msg);
self.tx.send(msg).await
}
async fn send_all_timed_or_log<Msg, Msgs>(&mut self, msgs: Msgs)
where
Msgs: IntoIterator<Item = Msg> + Send,
Msgs::IntoIter: Send,
Msg: Into<AllMessages> + Send,
{
let mut maybe_timed = self.make_maybe_timed();
let mut msgs = stream::iter(
msgs.into_iter()
.map(move |msg| Ok(maybe_timed(ToOverseer::SubsystemMessage(msg.into()))))
);
if self.tx.send_all(&mut msgs).await.is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
"Failed to send a message to Overseer",
"Failed to send messages to Overseer",
);
}
}
......@@ -480,8 +599,8 @@ pub struct Overseer<S> {
/// Here we keep handles to spawned subsystems to be notified when they terminate.
running_subsystems: FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>,
/// Gather running subsystms' outbound streams into one.
running_subsystems_rx: StreamUnordered<mpsc::Receiver<ToOverseer>>,
/// Gather running subsystems' outbound streams into one.
running_subsystems_rx: StreamUnordered<mpsc::Receiver<MaybeTimed<ToOverseer>>>,
/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,
......@@ -966,6 +1085,7 @@ struct MetricsInner {
activated_heads_total: prometheus::Counter<prometheus::U64>,
deactivated_heads_total: prometheus::Counter<prometheus::U64>,
messages_relayed_total: prometheus::Counter<prometheus::U64>,
message_relay_timing: prometheus::Histogram,
}
#[derive(Default, Clone)]
......@@ -989,6 +1109,11 @@ impl Metrics {
metrics.messages_relayed_total.inc();
}
}
/// Provide a timer for the duration between receiving a message and passing it to `route_message`
fn time_message_hold(&self) -> MaybeTimer {
self.0.as_ref().map(|metrics| metrics.message_relay_timing.start_timer())
}
}
impl metrics::Metrics for Metrics {
......@@ -1015,11 +1140,39 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
message_relay_timing: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts {
common_opts: prometheus::Opts::new(
"overseer_messages_relay_timing",
"Time spent holding a message in the overseer before passing it to `route_message`",
),
// guessing at the desired resolution, but we know that messages will time
// out after 0.5 seconds, so the bucket set below seems plausible:
// `0.0001 * (1.6 ^ 18) ~= 0.472`. Prometheus auto-generates a final bucket
// for all values between the final value and `+Inf`, so this should work.
//
// The documented legal range for the inputs are:
//
// - `> 0.0`
// - `> 1.0`
// - `! 0`
buckets: prometheus::exponential_buckets(0.0001, 1.6, 18).expect("inputs are within documented range; qed"),
}
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
impl fmt::Debug for Metrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Metrics {{...}}")
}
}
impl<S> Overseer<S>
where
S: SpawnNamed,
......@@ -1136,14 +1289,20 @@ where
events_tx: events_tx.clone(),
};
let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry)?;
let mut running_subsystems_rx = StreamUnordered::new();
let mut running_subsystems = FuturesUnordered::new();
let mut seed = 0x533d; // arbitrary
let candidate_validation_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.candidate_validation,
&metrics,
&mut seed,
)?;
let candidate_backing_subsystem = spawn(
......@@ -1151,6 +1310,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.candidate_backing,
&metrics,
&mut seed,
)?;
let candidate_selection_subsystem = spawn(
......@@ -1158,6 +1319,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.candidate_selection,
&metrics,
&mut seed,
)?;
let statement_distribution_subsystem = spawn(
......@@ -1165,6 +1328,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.statement_distribution,
&metrics,
&mut seed,
)?;
let availability_distribution_subsystem = spawn(
......@@ -1172,6 +1337,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.availability_distribution,
&metrics,
&mut seed,
)?;
let bitfield_signing_subsystem = spawn(
......@@ -1179,6 +1346,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.bitfield_signing,
&metrics,
&mut seed,
)?;
let bitfield_distribution_subsystem = spawn(
......@@ -1186,6 +1355,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.bitfield_distribution,
&metrics,
&mut seed,
)?;
let provisioner_subsystem = spawn(
......@@ -1193,6 +1364,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.provisioner,
&metrics,
&mut seed,
)?;
let pov_distribution_subsystem = spawn(
......@@ -1200,6 +1373,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.pov_distribution,
&metrics,
&mut seed,
)?;
let runtime_api_subsystem = spawn(
......@@ -1207,6 +1382,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.runtime_api,
&metrics,
&mut seed,
)?;
let availability_store_subsystem = spawn(
......@@ -1214,6 +1391,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.availability_store,
&metrics,
&mut seed,
)?;
let network_bridge_subsystem = spawn(
......@@ -1221,6 +1400,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.network_bridge,
&metrics,
&mut seed,
)?;
let chain_api_subsystem = spawn(
......@@ -1228,6 +1409,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.chain_api,
&metrics,
&mut seed,
)?;
let collation_generation_subsystem = spawn(
......@@ -1235,6 +1418,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.collation_generation,
&metrics,
&mut seed,
)?;
......@@ -1243,6 +1428,8 @@ where
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.collator_protocol,
&metrics,
&mut seed,
)?;
let leaves = leaves
......@@ -1251,8 +1438,6 @@ where
.collect();
let active_leaves = HashMap::new();
let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry)?;
let activation_external_listeners = HashMap::new();
let this = Self {
......@@ -1342,7 +1527,7 @@ where
match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg).await?;
self.route_message(msg.into()).await?;
}
Event::Stop => {
self.stop().await;
......@@ -1360,14 +1545,17 @@ where
}
},
msg = self.running_subsystems_rx.next().fuse() => {
let msg = if let Some((StreamYield::Item(msg), _)) = msg {
let MaybeTimed { timer, t: msg } = if let Some((StreamYield::Item(msg), _)) = msg {
msg
} else {
continue
};
match msg {
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await?,
ToOverseer::SubsystemMessage(msg) => {
let msg = MaybeTimed { timer, t: msg };
self.route_message(msg).await?
},
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
}
......@@ -1462,7 +1650,8 @@ where
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
async fn route_message(&mut self, msg: MaybeTimed<AllMessages>) -> SubsystemResult<()> {
let msg = msg.into_inner();
self.metrics.on_message_relayed();
match msg {
AllMessages::CandidateValidation(msg) => {
......@@ -1572,14 +1761,19 @@ where
fn spawn<S: SpawnNamed, M: Send + 'static>(
spawner: &mut S,
futures: &mut FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
streams: &mut StreamUnordered<mpsc::Receiver<MaybeTimed<ToOverseer>>>,
s: impl Subsystem<OverseerSubsystemContext<M>>,
metrics: &Metrics,
seed: &mut u64,
) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx };
let ctx = OverseerSubsystemContext::new(to_rx, from_tx, metrics.clone(), *seed, MESSAGE_TIMER_METRIC_CAPTURE_RATE);
let SpawnedSubsystem { future, name } = s.start(ctx);
// increment the seed now that it's been used, so the next context will have its own distinct RNG
*seed += 1;
let (tx, rx) = oneshot::channel();
let fut = Box::pin(async move {
......@@ -1827,12 +2021,13 @@ mod tests {
fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> {
let gather = registry.gather();
assert_eq!(gather[0].get_name(), "parachain_activated_heads_total");
assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total");
assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total");
let activated = gather[0].get_metric()[0].get_counter().get_value() as u64;
let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64;
let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64;
assert_eq!(gather[0].get_name(), "overseer_messages_relay_timing");
assert_eq!(gather[1].get_name(), "parachain_activated_heads_total");
assert_eq!(gather[2].get_name(), "parachain_deactivated_heads_total");
assert_eq!(gather[3].get_name(), "parachain_messages_relayed_total");
let activated = gather[1].get_metric()[0].get_counter().get_value() as u64;
let deactivated = gather[2].get_metric()[0].get_counter().get_value() as u64;
let relayed = gather[3].get_metric()[0].get_counter().get_value() as u64;
let mut result = HashMap::new();
result.insert("activated", activated);
result.insert("deactivated", deactivated);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment