Unverified Commit 9a32ab1d authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Some code cleanup in overseer (#2008)

* Some code cleanup in overseer

- Switches to select! in the overseer run loop to be more fair about
message processing between the different sources.
- Added a check to only send `ActiveLeaves` if the update actually
contains any data.

* Move the check

* Restore old behavior

* Simplify message sending and signal sending to subsystems

* Update node/subsystem/src/lib.rs
parent a0541ce7
Pipeline #115110 passed with stages
in 24 minutes and 50 seconds
...@@ -68,7 +68,7 @@ use std::collections::{hash_map, HashMap}; ...@@ -68,7 +68,7 @@ use std::collections::{hash_map, HashMap};
use futures::channel::{mpsc, oneshot}; use futures::channel::{mpsc, oneshot};
use futures::{ use futures::{
pending, poll, select, poll, select,
future::BoxFuture, future::BoxFuture,
stream::{self, FuturesUnordered}, stream::{self, FuturesUnordered},
Future, FutureExt, SinkExt, StreamExt, Future, FutureExt, SinkExt, StreamExt,
...@@ -384,6 +384,30 @@ struct OverseenSubsystem<M> { ...@@ -384,6 +384,30 @@ struct OverseenSubsystem<M> {
instance: Option<SubsystemInstance<M>>, instance: Option<SubsystemInstance<M>>,
} }
impl<M> OverseenSubsystem<M> {
/// Send a message to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_message(&mut self, msg: M) -> SubsystemResult<()> {
if let Some(ref mut instance) = self.instance {
instance.tx.send(FromOverseer::Communication { msg }).await?;
}
Ok(())
}
/// Send a signal to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
if let Some(ref mut instance) = self.instance {
instance.tx.send(FromOverseer::Signal(signal)).await?;
}
Ok(())
}
}
/// The `Overseer` itself. /// The `Overseer` itself.
pub struct Overseer<S> { pub struct Overseer<S> {
/// A candidate validation subsystem. /// A candidate validation subsystem.
...@@ -1240,65 +1264,21 @@ where ...@@ -1240,65 +1264,21 @@ where
// Stop the overseer. // Stop the overseer.
async fn stop(mut self) { async fn stop(mut self) {
if let Some(ref mut s) = self.candidate_validation_subsystem.instance { let _ = self.candidate_validation_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; let _ = self.candidate_backing_subsystem.send_signal(OverseerSignal::Conclude).await;
} let _ = self.candidate_selection_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.statement_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
if let Some(ref mut s) = self.candidate_backing_subsystem.instance { let _ = self.availability_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; let _ = self.bitfield_signing_subsystem.send_signal(OverseerSignal::Conclude).await;
} let _ = self.bitfield_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.provisioner_subsystem.send_signal(OverseerSignal::Conclude).await;
if let Some(ref mut s) = self.candidate_selection_subsystem.instance { let _ = self.pov_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; let _ = self.runtime_api_subsystem.send_signal(OverseerSignal::Conclude).await;
} let _ = self.availability_store_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.network_bridge_subsystem.send_signal(OverseerSignal::Conclude).await;
if let Some(ref mut s) = self.statement_distribution_subsystem.instance { let _ = self.chain_api_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; let _ = self.collator_protocol_subsystem.send_signal(OverseerSignal::Conclude).await;
} let _ = self.collation_generation_subsystem.send_signal(OverseerSignal::Conclude).await;
if let Some(ref mut s) = self.availability_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.bitfield_signing_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.provisioner_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.pov_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.runtime_api_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.availability_store_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.network_bridge_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.chain_api_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.collation_generation_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
...@@ -1318,10 +1298,9 @@ where ...@@ -1318,10 +1298,9 @@ where
/// Run the `Overseer`. /// Run the `Overseer`.
#[tracing::instrument(skip(self), fields(subsystem = LOG_TARGET))] #[tracing::instrument(skip(self), fields(subsystem = LOG_TARGET))]
pub async fn run(mut self) -> SubsystemResult<()> { pub async fn run(mut self) -> SubsystemResult<()> {
let leaves = std::mem::take(&mut self.leaves);
let mut update = ActiveLeavesUpdate::default(); let mut update = ActiveLeavesUpdate::default();
for (hash, number) in leaves.into_iter() { for (hash, number) in std::mem::take(&mut self.leaves) {
update.activated.push(hash); update.activated.push(hash);
let _ = self.active_leaves.insert(hash, number); let _ = self.active_leaves.insert(hash, number);
self.on_head_activated(&hash); self.on_head_activated(&hash);
...@@ -1330,50 +1309,62 @@ where ...@@ -1330,50 +1309,62 @@ where
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
loop { loop {
while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) { select! {
match msg { msg = self.events_rx.next().fuse() => {
Event::MsgToSubsystem(msg) => { let msg = if let Some(msg) = msg {
self.route_message(msg).await; msg
} } else {
Event::Stop => { continue
self.stop().await; };
return Ok(());
} match msg {
Event::BlockImported(block) => { Event::MsgToSubsystem(msg) => {
self.block_imported(block).await?; self.route_message(msg).await;
} }
Event::BlockFinalized(block) => { Event::Stop => {
self.block_finalized(block).await?; self.stop().await;
} return Ok(());
Event::ExternalRequest(request) => { }
self.handle_external_request(request); Event::BlockImported(block) => {
} self.block_imported(block).await?;
} }
} Event::BlockFinalized(block) => {
self.block_finalized(block).await?;
while let Poll::Ready(Some((StreamYield::Item(msg), _))) = poll!( }
&mut self.running_subsystems_rx.next() Event::ExternalRequest(request) => {
) { self.handle_external_request(request);
match msg { }
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await,
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
} }
ToOverseer::SpawnBlockingJob { name, s } => { },
self.spawn_blocking_job(name, s); msg = self.running_subsystems_rx.next().fuse() => {
let msg = if let Some((StreamYield::Item(msg), _)) = msg {
msg
} else {
continue
};
match msg {
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await,
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
}
ToOverseer::SpawnBlockingJob { name, s } => {
self.spawn_blocking_job(name, s);
}
} }
} },
} res = self.running_subsystems.next().fuse() => {
let finished = if let Some(finished) = res {
// Some subsystem exited? It's time to panic. finished
if let Poll::Ready(Some(finished)) = poll!(self.running_subsystems.next()) { } else {
tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly"); continue
self.stop().await; };
return finished;
tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly");
self.stop().await;
return finished;
},
} }
// Looks like nothing is left to be polled, let's take a break.
pending!();
} }
} }
...@@ -1424,7 +1415,11 @@ where ...@@ -1424,7 +1415,11 @@ where
self.on_head_deactivated(deactivated) self.on_head_deactivated(deactivated)
} }
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; // Most of the time we have a leave already closed when it is finalized, so we check here if there are actually
// any updates before sending it to the subsystems.
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash)).await?; self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash)).await?;
...@@ -1433,65 +1428,21 @@ where ...@@ -1433,65 +1428,21 @@ where
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
if let Some(ref mut s) = self.candidate_validation_subsystem.instance { self.candidate_validation_subsystem.send_signal(signal.clone()).await?;
s.tx.send(FromOverseer::Signal(signal.clone())).await?; self.candidate_backing_subsystem.send_signal(signal.clone()).await?;
} self.candidate_selection_subsystem.send_signal(signal.clone()).await?;
self.statement_distribution_subsystem.send_signal(signal.clone()).await?;
if let Some(ref mut s) = self.candidate_backing_subsystem.instance { self.availability_distribution_subsystem.send_signal(signal.clone()).await?;
s.tx.send(FromOverseer::Signal(signal.clone())).await?; self.bitfield_signing_subsystem.send_signal(signal.clone()).await?;
} self.bitfield_distribution_subsystem.send_signal(signal.clone()).await?;
self.provisioner_subsystem.send_signal(signal.clone()).await?;
if let Some(ref mut s) = self.candidate_selection_subsystem.instance { self.pov_distribution_subsystem.send_signal(signal.clone()).await?;
s.tx.send(FromOverseer::Signal(signal.clone())).await?; self.runtime_api_subsystem.send_signal(signal.clone()).await?;
} self.availability_store_subsystem.send_signal(signal.clone()).await?;
self.network_bridge_subsystem.send_signal(signal.clone()).await?;
if let Some(ref mut s) = self.statement_distribution_subsystem.instance { self.chain_api_subsystem.send_signal(signal.clone()).await?;
s.tx.send(FromOverseer::Signal(signal.clone())).await?; self.collator_protocol_subsystem.send_signal(signal.clone()).await?;
} self.collation_generation_subsystem.send_signal(signal).await?;
if let Some(ref mut s) = self.availability_distribution_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.bitfield_signing_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.provisioner_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.pov_distribution_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.runtime_api_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.availability_store_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.network_bridge_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.chain_api_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.collation_generation_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
Ok(()) Ok(())
} }
...@@ -1501,80 +1452,50 @@ where ...@@ -1501,80 +1452,50 @@ where
self.metrics.on_message_relayed(); self.metrics.on_message_relayed();
match msg { match msg {
AllMessages::CandidateValidation(msg) => { AllMessages::CandidateValidation(msg) => {
if let Some(ref mut s) = self.candidate_validation_subsystem.instance { let _ = self.candidate_validation_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::CandidateBacking(msg) => { AllMessages::CandidateBacking(msg) => {
if let Some(ref mut s) = self.candidate_backing_subsystem.instance { let _ = self.candidate_backing_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::CandidateSelection(msg) => { AllMessages::CandidateSelection(msg) => {
if let Some(ref mut s) = self.candidate_selection_subsystem.instance { let _ = self.candidate_selection_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::StatementDistribution(msg) => { AllMessages::StatementDistribution(msg) => {
if let Some(ref mut s) = self.statement_distribution_subsystem.instance { let _ = self.statement_distribution_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::AvailabilityDistribution(msg) => { AllMessages::AvailabilityDistribution(msg) => {
if let Some(ref mut s) = self.availability_distribution_subsystem.instance { let _ = self.availability_distribution_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::BitfieldDistribution(msg) => { AllMessages::BitfieldDistribution(msg) => {
if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { let _ = self.bitfield_distribution_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::BitfieldSigning(msg) => { AllMessages::BitfieldSigning(msg) => {
if let Some(ref mut s) = self.bitfield_signing_subsystem.instance { let _ = self.bitfield_signing_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication{ msg }).await; },
}
}
AllMessages::Provisioner(msg) => { AllMessages::Provisioner(msg) => {
if let Some(ref mut s) = self.provisioner_subsystem.instance { let _ = self.provisioner_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::PoVDistribution(msg) => { AllMessages::PoVDistribution(msg) => {
if let Some(ref mut s) = self.pov_distribution_subsystem.instance { let _ = self.pov_distribution_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::RuntimeApi(msg) => { AllMessages::RuntimeApi(msg) => {
if let Some(ref mut s) = self.runtime_api_subsystem.instance { let _ = self.runtime_api_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::AvailabilityStore(msg) => { AllMessages::AvailabilityStore(msg) => {
if let Some(ref mut s) = self.availability_store_subsystem.instance { let _ = self.availability_store_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::NetworkBridge(msg) => { AllMessages::NetworkBridge(msg) => {
if let Some(ref mut s) = self.network_bridge_subsystem.instance { let _ = self.network_bridge_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::ChainApi(msg) => { AllMessages::ChainApi(msg) => {
if let Some(ref mut s) = self.chain_api_subsystem.instance { let _ = self.chain_api_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::CollationGeneration(msg) => { AllMessages::CollationGeneration(msg) => {
if let Some(ref mut s) = self.collation_generation_subsystem.instance { let _ = self.collation_generation_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
AllMessages::CollatorProtocol(msg) => { AllMessages::CollatorProtocol(msg) => {
if let Some(ref mut s) = self.collator_protocol_subsystem.instance { let _ = self.collator_protocol_subsystem.send_message(msg).await;
let _ = s.tx.send(FromOverseer::Communication { msg }).await; },
}
}
} }
} }
...@@ -1671,7 +1592,7 @@ fn spawn<S: SpawnNamed, M: Send + 'static>( ...@@ -1671,7 +1592,7 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
mod tests { mod tests {
use std::sync::atomic; use std::sync::atomic;
use std::collections::HashMap; use std::collections::HashMap;
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; use futures::{executor, pin_mut, select, channel::mpsc, FutureExt, pending};
use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash}; use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash};
use polkadot_subsystem::messages::RuntimeApiRequest; use polkadot_subsystem::messages::RuntimeApiRequest;
......
...@@ -58,12 +58,17 @@ pub struct ActiveLeavesUpdate { ...@@ -58,12 +58,17 @@ pub struct ActiveLeavesUpdate {
impl ActiveLeavesUpdate { impl ActiveLeavesUpdate {
/// Create a ActiveLeavesUpdate with a single activated hash /// Create a ActiveLeavesUpdate with a single activated hash
pub fn start_work(hash: Hash) -> Self { pub fn start_work(hash: Hash) -> Self {
Self { activated: [hash].as_ref().into(), ..Default::default() } Self { activated: [hash][..].into(), ..Default::default() }
} }
/// Create a ActiveLeavesUpdate with a single deactivated hash /// Create a ActiveLeavesUpdate with a single deactivated hash
pub fn stop_work(hash: Hash) -> Self { pub fn stop_work(hash: Hash) -> Self {
Self { deactivated: [hash].as_ref().into(), ..Default::default() } Self { deactivated: [hash][..].into(), ..Default::default() }
}
/// Is this update empty and doesn't contain any information?
pub fn is_empty(&self) -> bool {
self.activated.is_empty() && self.deactivated.is_empty()
} }
} }
...@@ -72,9 +77,9 @@ impl PartialEq for ActiveLeavesUpdate { ...@@ -72,9 +77,9 @@ impl PartialEq for ActiveLeavesUpdate {
/// ///
/// Instead, it means equality when `activated` and `deactivated` are considered as sets. /// Instead, it means equality when `activated` and `deactivated` are considered as sets.
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
use std::collections::HashSet; self.activated.len() == other.activated.len() && self.deactivated.len() == other.deactivated.len()
self.activated.iter().collect::<HashSet<_>>() == other.activated.iter().collect::<HashSet<_>>() && && self.activated.iter().all(|a| other.activated.contains(a))
self.deactivated.iter().collect::<HashSet<_>>() == other.deactivated.iter().collect::<HashSet<_>>() && self.deactivated.iter().all(|a| other.deactivated.contains(a))
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment