Unverified Commit f621ea2c authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

subsystems have an unbounded channel to the overseer (#2236)



* subsystems have an unbounded channel to the overseer

* Update node/overseer/src/lib.rs

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

* bump Cargo.lock

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>
parent 23f6d27f
Pipeline #119533 passed with stages
in 28 minutes and 55 seconds
......@@ -5348,7 +5348,6 @@ dependencies = [
"polkadot-primitives",
"sc-client-api",
"sp-core",
"streamunordered",
"tracing",
"tracing-futures",
]
......
......@@ -14,7 +14,6 @@ polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../pr
polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-primitives = { path = "../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
streamunordered = "0.5.1"
tracing = "0.1.22"
tracing-futures = "0.2.4"
......
......@@ -70,12 +70,11 @@ use futures::channel::{mpsc, oneshot};
use futures::{
poll, select,
future::BoxFuture,
stream::{self, FuturesUnordered},
stream::{FuturesUnordered, Fuse},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use oorandom::Rand32;
use streamunordered::{StreamYield, StreamUnordered};
use polkadot_primitives::v1::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
......@@ -324,7 +323,7 @@ impl<T> From<T> for MaybeTimed<T> {
#[derive(Debug)]
pub struct OverseerSubsystemContext<M>{
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::Sender<MaybeTimed<ToOverseer>>,
tx: mpsc::UnboundedSender<MaybeTimed<ToOverseer>>,
metrics: Metrics,
rng: Rand32,
threshold: u32,
......@@ -340,7 +339,7 @@ impl<M> OverseerSubsystemContext<M> {
/// to the range `0.0..=1.0`.
fn new(
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::Sender<MaybeTimed<ToOverseer>>,
tx: mpsc::UnboundedSender<MaybeTimed<ToOverseer>>,
metrics: Metrics,
increment: u64,
mut capture_rate: f64,
......@@ -361,7 +360,10 @@ impl<M> OverseerSubsystemContext<M> {
///
/// Intended for tests.
#[allow(unused)]
fn new_unmetered(rx: mpsc::Receiver<FromOverseer<M>>, tx: mpsc::Sender<MaybeTimed<ToOverseer>>) -> Self {
fn new_unmetered(
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::UnboundedSender<MaybeTimed<ToOverseer>>,
) -> Self {
let metrics = Metrics::default();
OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0)
}
......@@ -375,31 +377,6 @@ impl<M> OverseerSubsystemContext<M> {
MaybeTimed { timer, t }
}
/// Make a standalone function which can construct a `MaybeTimed` wrapper around some `T`
/// without borrowing `self`.
///
/// This is somewhat more expensive than `self.maybe_timed` because it must clone some stuff.
fn make_maybe_timed<T>(&mut self) -> impl FnMut(T) -> MaybeTimed<T> {
// We don't want to simply clone this RNG because we don't want to duplicate its state.
// It's not ever going to be used for cryptographic purposes, but it's still better to
// keep good habits.
let (seed, increment) = self.rng.state();
let mut rng = Rand32::new_inc(seed, increment + 1);
let metrics = self.metrics.clone();
let threshold = self.threshold;
move |t| {
let timer = if rng.rand_u32() <= threshold {
metrics.time_message_hold()
} else {
None
};
MaybeTimed { timer, t }
}
}
}
#[async_trait::async_trait]
......@@ -428,7 +405,7 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
self.send_timed(ToOverseer::SpawnJob {
name,
s,
}).await.map_err(Into::into)
}).map_err(|s| s.into_send_error().into())
}
async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
......@@ -437,23 +414,25 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
self.send_timed(ToOverseer::SpawnBlockingJob {
name,
s,
}).await.map_err(Into::into)
}).map_err(|s| s.into_send_error().into())
}
async fn send_message(&mut self, msg: AllMessages) {
self.send_and_log_error(ToOverseer::SubsystemMessage(msg)).await
self.send_and_log_error(ToOverseer::SubsystemMessage(msg))
}
async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
self.send_all_timed_or_log(msgs).await
for msg in msgs {
self.send_and_log_error(ToOverseer::SubsystemMessage(msg));
}
}
}
impl<M> OverseerSubsystemContext<M> {
async fn send_and_log_error(&mut self, msg: ToOverseer) {
if self.send_timed(msg).await.is_err() {
fn send_and_log_error(&mut self, msg: ToOverseer) {
if self.send_timed(msg).is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
......@@ -462,33 +441,13 @@ impl<M> OverseerSubsystemContext<M> {
}
}
async fn send_timed(&mut self, msg: ToOverseer) -> Result<
fn send_timed(&mut self, msg: ToOverseer) -> Result<
(),
<mpsc::Sender<MaybeTimed<ToOverseer>> as futures::Sink<MaybeTimed<ToOverseer>>>::Error
mpsc::TrySendError<MaybeTimed<ToOverseer>>,
>
{
let msg = self.maybe_timed(msg);
self.tx.send(msg).await
}
async fn send_all_timed_or_log<Msg, Msgs>(&mut self, msgs: Msgs)
where
Msgs: IntoIterator<Item = Msg> + Send,
Msgs::IntoIter: Send,
Msg: Into<AllMessages> + Send,
{
let mut maybe_timed = self.make_maybe_timed();
let mut msgs = stream::iter(
msgs.into_iter()
.map(move |msg| Ok(maybe_timed(ToOverseer::SubsystemMessage(msg.into()))))
);
if self.tx.send_all(&mut msgs).await.is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
"Failed to send messages to Overseer",
);
}
self.tx.unbounded_send(msg)
}
}
......@@ -600,7 +559,7 @@ pub struct Overseer<S> {
running_subsystems: FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>,
/// Gather running subsystems' outbound streams into one.
running_subsystems_rx: StreamUnordered<mpsc::Receiver<MaybeTimed<ToOverseer>>>,
to_overseer_rx: Fuse<mpsc::UnboundedReceiver<MaybeTimed<ToOverseer>>>,
/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,
......@@ -1291,7 +1250,7 @@ where
let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry)?;
let mut running_subsystems_rx = StreamUnordered::new();
let (to_overseer_tx, to_overseer_rx) = mpsc::unbounded();
let mut running_subsystems = FuturesUnordered::new();
let mut seed = 0x533d; // arbitrary
......@@ -1299,7 +1258,7 @@ where
let candidate_validation_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.candidate_validation,
&metrics,
&mut seed,
......@@ -1308,7 +1267,7 @@ where
let candidate_backing_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.candidate_backing,
&metrics,
&mut seed,
......@@ -1317,7 +1276,7 @@ where
let candidate_selection_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.candidate_selection,
&metrics,
&mut seed,
......@@ -1326,7 +1285,7 @@ where
let statement_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.statement_distribution,
&metrics,
&mut seed,
......@@ -1335,7 +1294,7 @@ where
let availability_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.availability_distribution,
&metrics,
&mut seed,
......@@ -1344,7 +1303,7 @@ where
let bitfield_signing_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.bitfield_signing,
&metrics,
&mut seed,
......@@ -1353,7 +1312,7 @@ where
let bitfield_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.bitfield_distribution,
&metrics,
&mut seed,
......@@ -1362,7 +1321,7 @@ where
let provisioner_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.provisioner,
&metrics,
&mut seed,
......@@ -1371,7 +1330,7 @@ where
let pov_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.pov_distribution,
&metrics,
&mut seed,
......@@ -1380,7 +1339,7 @@ where
let runtime_api_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.runtime_api,
&metrics,
&mut seed,
......@@ -1389,7 +1348,7 @@ where
let availability_store_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.availability_store,
&metrics,
&mut seed,
......@@ -1398,7 +1357,7 @@ where
let network_bridge_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.network_bridge,
&metrics,
&mut seed,
......@@ -1407,7 +1366,7 @@ where
let chain_api_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.chain_api,
&metrics,
&mut seed,
......@@ -1416,7 +1375,7 @@ where
let collation_generation_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.collation_generation,
&metrics,
&mut seed,
......@@ -1426,7 +1385,7 @@ where
let collator_protocol_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
to_overseer_tx.clone(),
all_subsystems.collator_protocol,
&metrics,
&mut seed,
......@@ -1458,7 +1417,7 @@ where
collator_protocol_subsystem,
s,
running_subsystems,
running_subsystems_rx,
to_overseer_rx: to_overseer_rx.fuse(),
events_rx,
activation_external_listeners,
leaves,
......@@ -1546,11 +1505,14 @@ where
}
}
},
msg = self.running_subsystems_rx.next().fuse() => {
let MaybeTimed { timer, t: msg } = if let Some((StreamYield::Item(msg), _)) = msg {
msg
} else {
continue
msg = self.to_overseer_rx.next() => {
let MaybeTimed { timer, t: msg } = match msg {
Some(m) => m,
None => {
// This is a fused stream so we will shut down after receiving all
// shutdown notifications.
continue
}
};
match msg {
......@@ -1773,14 +1735,19 @@ where
fn spawn<S: SpawnNamed, M: Send + 'static>(
spawner: &mut S,
futures: &mut FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>,
streams: &mut StreamUnordered<mpsc::Receiver<MaybeTimed<ToOverseer>>>,
to_overseer: mpsc::UnboundedSender<MaybeTimed<ToOverseer>>,
s: impl Subsystem<OverseerSubsystemContext<M>>,
metrics: &Metrics,
seed: &mut u64,
) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = OverseerSubsystemContext::new(to_rx, from_tx, metrics.clone(), *seed, MESSAGE_TIMER_METRIC_CAPTURE_RATE);
let ctx = OverseerSubsystemContext::new(
to_rx,
to_overseer,
metrics.clone(),
*seed,
MESSAGE_TIMER_METRIC_CAPTURE_RATE,
);
let SpawnedSubsystem { future, name } = s.start(ctx);
// increment the seed now that it's been used, so the next context will have its own distinct RNG
......@@ -1799,7 +1766,6 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
spawner.spawn(name, fut);
let _ = streams.push(from_rx);
futures.push(Box::pin(rx.map(|e| { tracing::warn!(err = ?e, "dropping error"); Ok(()) })));
let instance = Some(SubsystemInstance {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment