>(
client: Arc,
mut handler: OverseerHandler,
) {
let mut finality = client.finality_notification_stream();
let mut imports = client.import_notification_stream();
loop {
select! {
f = finality.next() => {
match f {
Some(block) => {
handler.block_finalized(block.into()).await;
}
None => break,
}
},
i = imports.next() => {
match i {
Some(block) => {
handler.block_imported(block.into()).await;
}
None => break,
}
},
complete => break,
}
}
}
impl Debug for ToOverseer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ToOverseer::SubsystemMessage(msg) => {
write!(f, "OverseerMessage::SubsystemMessage({:?})", msg)
}
ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)"),
ToOverseer::SpawnBlockingJob { .. } => write!(f, "OverseerMessage::SpawnBlocking(..)")
}
}
}
/// A running instance of some [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance {
tx: mpsc::Sender>,
}
/// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or to spawn it's [`SubsystemJob`]s.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
#[derive(Debug)]
pub struct OverseerSubsystemContext{
rx: mpsc::Receiver>,
tx: mpsc::Sender,
}
#[async_trait::async_trait]
impl SubsystemContext for OverseerSubsystemContext {
type Message = M;
async fn try_recv(&mut self) -> Result>, ()> {
match poll!(self.rx.next()) {
Poll::Ready(Some(msg)) => Ok(Some(msg)),
Poll::Ready(None) => Err(()),
Poll::Pending => Ok(None),
}
}
async fn recv(&mut self) -> SubsystemResult> {
self.rx.next().await
.ok_or(SubsystemError::Context(
"No more messages in rx queue to process"
.to_owned()
))
}
async fn spawn(&mut self, name: &'static str, s: Pin + Send>>)
-> SubsystemResult<()>
{
self.tx.send(ToOverseer::SpawnJob {
name,
s,
}).await.map_err(Into::into)
}
async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>)
-> SubsystemResult<()>
{
self.tx.send(ToOverseer::SpawnBlockingJob {
name,
s,
}).await.map_err(Into::into)
}
async fn send_message(&mut self, msg: AllMessages) {
self.send_and_log_error(ToOverseer::SubsystemMessage(msg)).await
}
async fn send_messages(&mut self, msgs: T)
where T: IntoIterator- + Send, T::IntoIter: Send
{
let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
if self.tx.send_all(&mut msgs).await.is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::
(),
"Failed to send messages to Overseer",
);
}
}
}
impl OverseerSubsystemContext {
async fn send_and_log_error(&mut self, msg: ToOverseer) {
if self.tx.send(msg).await.is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::(),
"Failed to send a message to Overseer",
);
}
}
}
/// A subsystem that we oversee.
///
/// Ties together the [`Subsystem`] itself and it's running instance
/// (which may be missing if the [`Subsystem`] is not running at the moment
/// for whatever reason).
///
/// [`Subsystem`]: trait.Subsystem.html
struct OverseenSubsystem {
instance: Option>,
}
impl OverseenSubsystem {
/// Send a message to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_message(&mut self, msg: M) -> SubsystemResult<()> {
if let Some(ref mut instance) = self.instance {
instance.tx.send(FromOverseer::Communication { msg }).await?;
}
Ok(())
}
/// Send a signal to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
if let Some(ref mut instance) = self.instance {
instance.tx.send(FromOverseer::Signal(signal)).await?;
}
Ok(())
}
}
/// The `Overseer` itself.
pub struct Overseer {
/// A candidate validation subsystem.
candidate_validation_subsystem: OverseenSubsystem,
/// A candidate backing subsystem.
candidate_backing_subsystem: OverseenSubsystem,
/// A candidate selection subsystem.
candidate_selection_subsystem: OverseenSubsystem,
/// A statement distribution subsystem.
statement_distribution_subsystem: OverseenSubsystem,
/// An availability distribution subsystem.
availability_distribution_subsystem: OverseenSubsystem,
/// A bitfield signing subsystem.
bitfield_signing_subsystem: OverseenSubsystem,
/// A bitfield distribution subsystem.
bitfield_distribution_subsystem: OverseenSubsystem,
/// A provisioner subsystem.
provisioner_subsystem: OverseenSubsystem,
/// A PoV distribution subsystem.
pov_distribution_subsystem: OverseenSubsystem,
/// A runtime API subsystem.
runtime_api_subsystem: OverseenSubsystem,
/// An availability store subsystem.
availability_store_subsystem: OverseenSubsystem,
/// A network bridge subsystem.
network_bridge_subsystem: OverseenSubsystem,
/// A Chain API subsystem.
chain_api_subsystem: OverseenSubsystem,
/// A Collation Generation subsystem.
collation_generation_subsystem: OverseenSubsystem,
/// A Collator Protocol subsystem.
collator_protocol_subsystem: OverseenSubsystem,
/// Spawner to spawn tasks to.
s: S,
/// Here we keep handles to spawned subsystems to be notified when they terminate.
running_subsystems: FuturesUnordered>>,
/// Gather running subsystms' outbound streams into one.
running_subsystems_rx: StreamUnordered>,
/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver,
/// External listeners waiting for a hash to be in the active-leave set.
activation_external_listeners: HashMap>>>,
/// A set of leaves that `Overseer` starts working with.
///
/// Drained at the beginning of `run` and never used again.
leaves: Vec<(Hash, BlockNumber)>,
/// The set of the "active leaves".
active_leaves: HashMap,
/// Various Prometheus metrics.
metrics: Metrics,
}
/// This struct is passed as an argument to create a new instance of an [`Overseer`].
///
/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows
/// mocking in the test code:
///
/// Each [`Subsystem`] is supposed to implement some interface that is generic over
/// message type that is specific to this [`Subsystem`]. At the moment not all
/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`].
pub struct AllSubsystems<
CV = (), CB = (), CS = (), SD = (), AD = (), BS = (), BD = (), P = (),
PoVD = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = ()
> {
/// A candidate validation subsystem.
pub candidate_validation: CV,
/// A candidate backing subsystem.
pub candidate_backing: CB,
/// A candidate selection subsystem.
pub candidate_selection: CS,
/// A statement distribution subsystem.
pub statement_distribution: SD,
/// An availability distribution subsystem.
pub availability_distribution: AD,
/// A bitfield signing subsystem.
pub bitfield_signing: BS,
/// A bitfield distribution subsystem.
pub bitfield_distribution: BD,
/// A provisioner subsystem.
pub provisioner: P,
/// A PoV distribution subsystem.
pub pov_distribution: PoVD,
/// A runtime API subsystem.
pub runtime_api: RA,
/// An availability store subsystem.
pub availability_store: AS,
/// A network bridge subsystem.
pub network_bridge: NB,
/// A Chain API subsystem.
pub chain_api: CA,
/// A Collation Generation subsystem.
pub collation_generation: CG,
/// A Collator Protocol subsystem.
pub collator_protocol: CP,
}
impl
AllSubsystems
{
/// Create a new instance of [`AllSubsystems`].
///
/// Each subsystem is set to [`DummySystem`].
///
///# Note
///
/// Because of a bug in rustc it is required that when calling this function,
/// you provide a "random" type for the first generic parameter:
///
/// ```
/// polkadot_overseer::AllSubsystems::<()>::dummy();
/// ```
pub fn dummy() -> AllSubsystems<
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem
> {
AllSubsystems {
candidate_validation: DummySubsystem,
candidate_backing: DummySubsystem,
candidate_selection: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_signing: DummySubsystem,
bitfield_distribution: DummySubsystem,
provisioner: DummySubsystem,
pov_distribution: DummySubsystem,
runtime_api: DummySubsystem,
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
chain_api: DummySubsystem,
collation_generation: DummySubsystem,
collator_protocol: DummySubsystem,
}
}
/// Replace the `candidate_validation` instance in `self`.
pub fn replace_candidate_validation(
self,
candidate_validation: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `candidate_backing` instance in `self`.
pub fn replace_candidate_backing(
self,
candidate_backing: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `candidate_selection` instance in `self`.
pub fn replace_candidate_selection(
self,
candidate_selection: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `statement_distribution` instance in `self`.
pub fn replace_statement_distribution(
self,
statement_distribution: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `availability_distribution` instance in `self`.
pub fn replace_availability_distribution(
self,
availability_distribution: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `bitfield_signing` instance in `self`.
pub fn replace_bitfield_signing(
self,
bitfield_signing: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `bitfield_distribution` instance in `self`.
pub fn replace_bitfield_distribution(
self,
bitfield_distribution: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `provisioner` instance in `self`.
pub fn replace_provisioner(
self,
provisioner: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `pov_distribution` instance in `self`.
pub fn replace_pov_distribution(
self,
pov_distribution: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `runtime_api` instance in `self`.
pub fn replace_runtime_api(
self,
runtime_api: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `availability_store` instance in `self`.
pub fn replace_availability_store(
self,
availability_store: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `network_bridge` instance in `self`.
pub fn replace_network_bridge(
self,
network_bridge: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `chain_api` instance in `self`.
pub fn replace_chain_api(
self,
chain_api: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api,
collation_generation: self.collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `collation_generation` instance in `self`.
pub fn replace_collation_generation(
self,
collation_generation: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation,
collator_protocol: self.collator_protocol,
}
}
/// Replace the `collator_protocol` instance in `self`.
pub fn replace_collator_protocol(
self,
collator_protocol: NEW,
) -> AllSubsystems {
AllSubsystems {
candidate_validation: self.candidate_validation,
candidate_backing: self.candidate_backing,
candidate_selection: self.candidate_selection,
statement_distribution: self.statement_distribution,
availability_distribution: self.availability_distribution,
bitfield_signing: self.bitfield_signing,
bitfield_distribution: self.bitfield_distribution,
provisioner: self.provisioner,
pov_distribution: self.pov_distribution,
runtime_api: self.runtime_api,
availability_store: self.availability_store,
network_bridge: self.network_bridge,
chain_api: self.chain_api,
collation_generation: self.collation_generation,
collator_protocol,
}
}
}
/// Overseer Prometheus metrics.
#[derive(Clone)]
struct MetricsInner {
activated_heads_total: prometheus::Counter,
deactivated_heads_total: prometheus::Counter,
messages_relayed_total: prometheus::Counter,
}
#[derive(Default, Clone)]
struct Metrics(Option);
impl Metrics {
fn on_head_activated(&self) {
if let Some(metrics) = &self.0 {
metrics.activated_heads_total.inc();
}
}
fn on_head_deactivated(&self) {
if let Some(metrics) = &self.0 {
metrics.deactivated_heads_total.inc();
}
}
fn on_message_relayed(&self) {
if let Some(metrics) = &self.0 {
metrics.messages_relayed_total.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result {
let metrics = MetricsInner {
activated_heads_total: prometheus::register(
prometheus::Counter::new(
"parachain_activated_heads_total",
"Number of activated heads."
)?,
registry,
)?,
deactivated_heads_total: prometheus::register(
prometheus::Counter::new(
"parachain_deactivated_heads_total",
"Number of deactivated heads."
)?,
registry,
)?,
messages_relayed_total: prometheus::register(
prometheus::Counter::new(
"parachain_messages_relayed_total",
"Number of messages relayed by Overseer."
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
impl Overseer
where
S: SpawnNamed,
{
/// Create a new instance of the `Overseer` with a fixed set of [`Subsystem`]s.
///
/// ```text
/// +------------------------------------+
/// | Overseer |
/// +------------------------------------+
/// / | | \
/// ................. subsystems...................................
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// . | | | | | | | | .
/// . +-----------+ +-----------+ +----------+ +---------+ .
/// ...............................................................
/// |
/// probably `spawn`
/// a `job`
/// |
/// V
/// +-----------+
/// | |
/// +-----------+
///
/// ```
///
/// [`Subsystem`]: trait.Subsystem.html
///
/// # Example
///
/// The [`Subsystems`] may be any type as long as they implement an expected interface.
/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them.
/// For the sake of simplicity the termination of the example is done with a timeout.
/// ```
/// # use std::time::Duration;
/// # use futures::{executor, pin_mut, select, FutureExt};
/// # use futures_timer::Delay;
/// # use polkadot_overseer::{Overseer, AllSubsystems};
/// # use polkadot_subsystem::{
/// # Subsystem, DummySubsystem, SpawnedSubsystem, SubsystemContext,
/// # messages::CandidateValidationMessage,
/// # };
///
/// struct ValidationSubsystem;
///
/// impl Subsystem for ValidationSubsystem
/// where C: SubsystemContext
/// {
/// fn start(
/// self,
/// mut ctx: C,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem {
/// name: "validation-subsystem",
/// future: Box::pin(async move {
/// loop {
/// Delay::new(Duration::from_secs(1)).await;
/// }
/// }),
/// }
/// }
/// }
///
/// # fn main() { executor::block_on(async move {
/// let spawner = sp_core::testing::TaskExecutor::new();
/// let all_subsystems = AllSubsystems::<()>::dummy().replace_candidate_validation(ValidationSubsystem);
/// let (overseer, _handler) = Overseer::new(
/// vec![],
/// all_subsystems,
/// None,
/// spawner,
/// ).unwrap();
///
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
///
/// let overseer_fut = overseer.run().fuse();
/// pin_mut!(timer);
/// pin_mut!(overseer_fut);
///
/// select! {
/// _ = overseer_fut => (),
/// _ = timer => (),
/// }
/// #
/// # }); }
/// ```
pub fn new(
leaves: impl IntoIterator- ,
all_subsystems: AllSubsystems
,
prometheus_registry: Option<&prometheus::Registry>,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)>
where
CV: Subsystem> + Send,
CB: Subsystem> + Send,
CS: Subsystem> + Send,
SD: Subsystem> + Send,
AD: Subsystem> + Send,
BS: Subsystem> + Send,
BD: Subsystem> + Send,
P: Subsystem> + Send,
PoVD: Subsystem> + Send,
RA: Subsystem> + Send,
AS: Subsystem> + Send,
NB: Subsystem> + Send,
CA: Subsystem> + Send,
CG: Subsystem> + Send,
CP: Subsystem> + Send,
{
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
let handler = OverseerHandler {
events_tx: events_tx.clone(),
};
let mut running_subsystems_rx = StreamUnordered::new();
let mut running_subsystems = FuturesUnordered::new();
let candidate_validation_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.candidate_validation,
)?;
let candidate_backing_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.candidate_backing,
)?;
let candidate_selection_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.candidate_selection,
)?;
let statement_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.statement_distribution,
)?;
let availability_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.availability_distribution,
)?;
let bitfield_signing_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.bitfield_signing,
)?;
let bitfield_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.bitfield_distribution,
)?;
let provisioner_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.provisioner,
)?;
let pov_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.pov_distribution,
)?;
let runtime_api_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.runtime_api,
)?;
let availability_store_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.availability_store,
)?;
let network_bridge_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.network_bridge,
)?;
let chain_api_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.chain_api,
)?;
let collation_generation_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.collation_generation,
)?;
let collator_protocol_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.collator_protocol,
)?;
let leaves = leaves
.into_iter()
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number))
.collect();
let active_leaves = HashMap::new();
let metrics = ::register(prometheus_registry)?;
let activation_external_listeners = HashMap::new();
let this = Self {
candidate_validation_subsystem,
candidate_backing_subsystem,
candidate_selection_subsystem,
statement_distribution_subsystem,
availability_distribution_subsystem,
bitfield_signing_subsystem,
bitfield_distribution_subsystem,
provisioner_subsystem,
pov_distribution_subsystem,
runtime_api_subsystem,
availability_store_subsystem,
network_bridge_subsystem,
chain_api_subsystem,
collation_generation_subsystem,
collator_protocol_subsystem,
s,
running_subsystems,
running_subsystems_rx,
events_rx,
activation_external_listeners,
leaves,
active_leaves,
metrics,
};
Ok((this, handler))
}
// Stop the overseer.
async fn stop(mut self) {
let _ = self.candidate_validation_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.candidate_backing_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.candidate_selection_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.statement_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.availability_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.bitfield_signing_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.bitfield_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.provisioner_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.pov_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.runtime_api_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.availability_store_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.network_bridge_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.chain_api_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.collator_protocol_subsystem.send_signal(OverseerSignal::Conclude).await;
let _ = self.collation_generation_subsystem.send_signal(OverseerSignal::Conclude).await;
let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
loop {
select! {
_ = self.running_subsystems.next() => {
if self.running_subsystems.is_empty() {
break;
}
},
_ = stop_delay => break,
complete => break,
}
}
}
/// Run the `Overseer`.
#[tracing::instrument(skip(self), fields(subsystem = LOG_TARGET))]
pub async fn run(mut self) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();
for (hash, number) in std::mem::take(&mut self.leaves) {
update.activated.push(hash);
let _ = self.active_leaves.insert(hash, number);
self.on_head_activated(&hash);
}
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
loop {
select! {
msg = self.events_rx.next().fuse() => {
let msg = if let Some(msg) = msg {
msg
} else {
continue
};
match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg).await;
}
Event::Stop => {
self.stop().await;
return Ok(());
}
Event::BlockImported(block) => {
self.block_imported(block).await?;
}
Event::BlockFinalized(block) => {
self.block_finalized(block).await?;
}
Event::ExternalRequest(request) => {
self.handle_external_request(request);
}
}
},
msg = self.running_subsystems_rx.next().fuse() => {
let msg = if let Some((StreamYield::Item(msg), _)) = msg {
msg
} else {
continue
};
match msg {
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await,
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
}
ToOverseer::SpawnBlockingJob { name, s } => {
self.spawn_blocking_job(name, s);
}
}
},
res = self.running_subsystems.next().fuse() => {
let finished = if let Some(finished) = res {
finished
} else {
continue
};
tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly");
self.stop().await;
return finished;
},
}
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();
if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
if let Some(expected_parent_number) = block.number.checked_sub(1) {
debug_assert_eq!(expected_parent_number, number);
}
update.deactivated.push(block.parent_hash);
self.on_head_deactivated(&block.parent_hash);
}
match self.active_leaves.entry(block.hash) {
hash_map::Entry::Vacant(entry) => {
update.activated.push(block.hash);
let _ = entry.insert(block.number);
self.on_head_activated(&block.hash);
},
hash_map::Entry::Occupied(entry) => {
debug_assert_eq!(*entry.get(), block.number);
}
}
self.clean_up_external_listeners();
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();
self.active_leaves.retain(|h, n| {
if *n <= block.number {
update.deactivated.push(*h);
false
} else {
true
}
});
for deactivated in &update.deactivated {
self.on_head_deactivated(deactivated)
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
// broadcast `ActiveLeavesUpdate` even if empty to issue view updates
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
self.candidate_validation_subsystem.send_signal(signal.clone()).await?;
self.candidate_backing_subsystem.send_signal(signal.clone()).await?;
self.candidate_selection_subsystem.send_signal(signal.clone()).await?;
self.statement_distribution_subsystem.send_signal(signal.clone()).await?;
self.availability_distribution_subsystem.send_signal(signal.clone()).await?;
self.bitfield_signing_subsystem.send_signal(signal.clone()).await?;
self.bitfield_distribution_subsystem.send_signal(signal.clone()).await?;
self.provisioner_subsystem.send_signal(signal.clone()).await?;
self.pov_distribution_subsystem.send_signal(signal.clone()).await?;
self.runtime_api_subsystem.send_signal(signal.clone()).await?;
self.availability_store_subsystem.send_signal(signal.clone()).await?;
self.network_bridge_subsystem.send_signal(signal.clone()).await?;
self.chain_api_subsystem.send_signal(signal.clone()).await?;
self.collator_protocol_subsystem.send_signal(signal.clone()).await?;
self.collation_generation_subsystem.send_signal(signal).await?;
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn route_message(&mut self, msg: AllMessages) {
self.metrics.on_message_relayed();
match msg {
AllMessages::CandidateValidation(msg) => {
let _ = self.candidate_validation_subsystem.send_message(msg).await;
},
AllMessages::CandidateBacking(msg) => {
let _ = self.candidate_backing_subsystem.send_message(msg).await;
},
AllMessages::CandidateSelection(msg) => {
let _ = self.candidate_selection_subsystem.send_message(msg).await;
},
AllMessages::StatementDistribution(msg) => {
let _ = self.statement_distribution_subsystem.send_message(msg).await;
},
AllMessages::AvailabilityDistribution(msg) => {
let _ = self.availability_distribution_subsystem.send_message(msg).await;
},
AllMessages::BitfieldDistribution(msg) => {
let _ = self.bitfield_distribution_subsystem.send_message(msg).await;
},
AllMessages::BitfieldSigning(msg) => {
let _ = self.bitfield_signing_subsystem.send_message(msg).await;
},
AllMessages::Provisioner(msg) => {
let _ = self.provisioner_subsystem.send_message(msg).await;
},
AllMessages::PoVDistribution(msg) => {
let _ = self.pov_distribution_subsystem.send_message(msg).await;
},
AllMessages::RuntimeApi(msg) => {
let _ = self.runtime_api_subsystem.send_message(msg).await;
},
AllMessages::AvailabilityStore(msg) => {
let _ = self.availability_store_subsystem.send_message(msg).await;
},
AllMessages::NetworkBridge(msg) => {
let _ = self.network_bridge_subsystem.send_message(msg).await;
},
AllMessages::ChainApi(msg) => {
let _ = self.chain_api_subsystem.send_message(msg).await;
},
AllMessages::CollationGeneration(msg) => {
let _ = self.collation_generation_subsystem.send_message(msg).await;
},
AllMessages::CollatorProtocol(msg) => {
let _ = self.collator_protocol_subsystem.send_message(msg).await;
},
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn on_head_activated(&mut self, hash: &Hash) {
self.metrics.on_head_activated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
for listener in listeners {
// it's fine if the listener is no longer interested
let _ = listener.send(Ok(()));
}
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn on_head_deactivated(&mut self, hash: &Hash) {
self.metrics.on_head_deactivated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
// clean up and signal to listeners the block is deactivated
drop(listeners);
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn clean_up_external_listeners(&mut self) {
self.activation_external_listeners.retain(|_, v| {
// remove dead listeners
v.retain(|c| !c.is_canceled());
!v.is_empty()
})
}
#[tracing::instrument(level = "trace", skip(self, request), fields(subsystem = LOG_TARGET))]
fn handle_external_request(&mut self, request: ExternalRequest) {
match request {
ExternalRequest::WaitForActivation { hash, response_channel } => {
if self.active_leaves.get(&hash).is_some() {
// it's fine if the listener is no longer interested
let _ = response_channel.send(Ok(()));
} else {
self.activation_external_listeners.entry(hash).or_default().push(response_channel);
}
}
}
}
fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.s.spawn(name, j);
}
fn spawn_blocking_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.s.spawn_blocking(name, j);
}
}
fn spawn(
spawner: &mut S,
futures: &mut FuturesUnordered>>,
streams: &mut StreamUnordered>,
s: impl Subsystem>,
) -> SubsystemResult> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx };
let SpawnedSubsystem { future, name } = s.start(ctx);
let (tx, rx) = oneshot::channel();
let fut = Box::pin(async move {
if let Err(e) = future.await {
tracing::error!(subsystem=name, err = ?e, "subsystem exited with error");
} else {
tracing::debug!(subsystem=name, "subsystem exited without an error");
}
let _ = tx.send(());
});
spawner.spawn(name, fut);
let _ = streams.push(from_rx);
futures.push(Box::pin(rx.map(|e| { tracing::warn!(err = ?e, "dropping error"); Ok(()) })));
let instance = Some(SubsystemInstance {
tx: to_tx,
});
Ok(OverseenSubsystem {
instance,
})
}
#[cfg(test)]
mod tests {
use std::sync::atomic;
use std::collections::HashMap;
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt, pending};
use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash};
use polkadot_subsystem::messages::RuntimeApiRequest;
use polkadot_node_primitives::{Collation, CollationGenerationConfig};
use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent};
use sp_core::crypto::Pair as _;
use super::*;
struct TestSubsystem1(mpsc::Sender);
impl Subsystem for TestSubsystem1
where C: SubsystemContext
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0;
SpawnedSubsystem {
name: "test-subsystem-1",
future: Box::pin(async move {
let mut i = 0;
loop {
match ctx.recv().await {
Ok(FromOverseer::Communication { .. }) => {
let _ = sender.send(i).await;
i += 1;
continue;
}
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Err(_) => return Ok(()),
_ => (),
}
}
}),
}
}
}
struct TestSubsystem2(mpsc::Sender);
impl Subsystem for TestSubsystem2
where C: SubsystemContext
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let sender = self.0.clone();
SpawnedSubsystem {
name: "test-subsystem-2",
future: Box::pin(async move {
let _sender = sender;
let mut c: usize = 0;
loop {
if c < 10 {
let (tx, _) = oneshot::channel();
ctx.send_message(
AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromChainState(
Default::default(),
PoV {
block_data: BlockData(Vec::new()),
}.into(),
tx,
)
)
).await;
c += 1;
continue;
}
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
break;
}
Ok(Some(_)) => {
continue;
}
Err(_) => return Ok(()),
_ => (),
}
pending!();
}
Ok(())
}),
}
}
}
struct TestSubsystem4;
impl Subsystem for TestSubsystem4
where C: SubsystemContext
{
fn start(self, mut _ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "test-subsystem-4",
future: Box::pin(async move {
// Do nothing and exit.
Ok(())
}),
}
}
}
// Checks that a minimal configuration of two jobs can run and exchange messages.
#[test]
fn overseer_works() {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let (s1_tx, mut s1_rx) = mpsc::channel::(64);
let (s2_tx, mut s2_rx) = mpsc::channel::(64);
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(TestSubsystem1(s1_tx))
.replace_candidate_backing(TestSubsystem2(s2_tx));
let (overseer, mut handler) = Overseer::new(
vec![],
all_subsystems,
None,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
let mut s1_results = Vec::new();
let mut s2_results = Vec::new();
loop {
select! {
_ = overseer_fut => break,
s1_next = s1_rx.next() => {
match s1_next {
Some(msg) => {
s1_results.push(msg);
if s1_results.len() == 10 {
handler.stop().await;
}
}
None => break,
}
},
s2_next = s2_rx.next() => {
match s2_next {
Some(_) => s2_results.push(s2_next),
None => break,
}
},
complete => break,
}
}
assert_eq!(s1_results, (0..10).collect::>());
});
}
// Checks activated/deactivated metrics are updated properly.
#[test]
fn overseer_metrics_work() {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let first_block_hash = [1; 32].into();
let second_block_hash = [2; 32].into();
let third_block_hash = [3; 32].into();
let first_block = BlockInfo {
hash: first_block_hash,
parent_hash: [0; 32].into(),
number: 1,
};
let second_block = BlockInfo {
hash: second_block_hash,
parent_hash: first_block_hash,
number: 2,
};
let third_block = BlockInfo {
hash: third_block_hash,
parent_hash: second_block_hash,
number: 3,
};
let all_subsystems = AllSubsystems::<()>::dummy();
let registry = prometheus::Registry::new();
let (overseer, mut handler) = Overseer::new(
vec![first_block],
all_subsystems,
Some(®istry),
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
handler.block_imported(second_block).await;
handler.block_imported(third_block).await;
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.stop().await;
select! {
res = overseer_fut => {
assert!(res.is_ok());
let metrics = extract_metrics(®istry);
assert_eq!(metrics["activated"], 3);
assert_eq!(metrics["deactivated"], 2);
assert_eq!(metrics["relayed"], 1);
},
complete => (),
}
});
}
fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> {
let gather = registry.gather();
assert_eq!(gather[0].get_name(), "parachain_activated_heads_total");
assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total");
assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total");
let activated = gather[0].get_metric()[0].get_counter().get_value() as u64;
let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64;
let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64;
let mut result = HashMap::new();
result.insert("activated", activated);
result.insert("deactivated", deactivated);
result.insert("relayed", relayed);
result
}
// Spawn a subsystem that immediately exits.
//
// Should immediately conclude the overseer itself with an error.
#[test]
fn overseer_panics_on_subsystem_exit() {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let (s1_tx, _) = mpsc::channel(64);
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(TestSubsystem1(s1_tx))
.replace_candidate_backing(TestSubsystem4);
let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
select! {
res = overseer_fut => assert!(res.is_err()),
complete => (),
}
})
}
struct TestSubsystem5(mpsc::Sender);
impl Subsystem for TestSubsystem5
where C: SubsystemContext
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem {
name: "test-subsystem-5",
future: Box::pin(async move {
loop {
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
Ok(Some(FromOverseer::Signal(s))) => {
sender.send(s).await.unwrap();
continue;
},
Ok(Some(_)) => continue,
Err(_) => break,
_ => (),
}
pending!();
}
Ok(())
}),
}
}
}
struct TestSubsystem6(mpsc::Sender);
impl Subsystem for TestSubsystem6
where C: SubsystemContext
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem {
name: "test-subsystem-6",
future: Box::pin(async move {
loop {
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
Ok(Some(FromOverseer::Signal(s))) => {
sender.send(s).await.unwrap();
continue;
},
Ok(Some(_)) => continue,
Err(_) => break,
_ => (),
}
pending!();
}
Ok(())
}),
}
}
}
// Tests that starting with a defined set of leaves and receiving
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
#[test]
fn overseer_start_stop_works() {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let first_block_hash = [1; 32].into();
let second_block_hash = [2; 32].into();
let third_block_hash = [3; 32].into();
let first_block = BlockInfo {
hash: first_block_hash,
parent_hash: [0; 32].into(),
number: 1,
};
let second_block = BlockInfo {
hash: second_block_hash,
parent_hash: first_block_hash,
number: 2,
};
let third_block = BlockInfo {
hash: third_block_hash,
parent_hash: second_block_hash,
number: 3,
};
let (tx_5, mut rx_5) = mpsc::channel(64);
let (tx_6, mut rx_6) = mpsc::channel(64);
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(TestSubsystem5(tx_5))
.replace_candidate_backing(TestSubsystem6(tx_6));
let (overseer, mut handler) = Overseer::new(
vec![first_block],
all_subsystems,
None,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();
handler.block_imported(second_block).await;
handler.block_imported(third_block).await;
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [second_block_hash].as_ref().into(),
deactivated: [first_block_hash].as_ref().into(),
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [third_block_hash].as_ref().into(),
deactivated: [second_block_hash].as_ref().into(),
}),
];
loop {
select! {
res = overseer_fut => {
assert!(res.is_ok());
break;
},
res = rx_5.next() => {
if let Some(res) = res {
ss5_results.push(res);
}
}
res = rx_6.next() => {
if let Some(res) = res {
ss6_results.push(res);
}
}
complete => break,
}
if ss5_results.len() == expected_heartbeats.len() &&
ss6_results.len() == expected_heartbeats.len() {
handler.stop().await;
}
}
assert_eq!(ss5_results, expected_heartbeats);
assert_eq!(ss6_results, expected_heartbeats);
});
}
// Tests that starting with a defined set of leaves and receiving
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
#[test]
fn overseer_finalize_works() {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let first_block_hash = [1; 32].into();
let second_block_hash = [2; 32].into();
let third_block_hash = [3; 32].into();
let first_block = BlockInfo {
hash: first_block_hash,
parent_hash: [0; 32].into(),
number: 1,
};
let second_block = BlockInfo {
hash: second_block_hash,
parent_hash: [42; 32].into(),
number: 2,
};
let third_block = BlockInfo {
hash: third_block_hash,
parent_hash: second_block_hash,
number: 3,
};
let (tx_5, mut rx_5) = mpsc::channel(64);
let (tx_6, mut rx_6) = mpsc::channel(64);
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(TestSubsystem5(tx_5))
.replace_candidate_backing(TestSubsystem6(tx_6));
// start with two forks of different height.
let (overseer, mut handler) = Overseer::new(
vec![first_block, second_block],
all_subsystems,
None,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();
// this should stop work on both forks we started with earlier.
handler.block_finalized(third_block).await;
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [first_block_hash, second_block_hash].as_ref().into(),
..Default::default()
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
deactivated: [first_block_hash, second_block_hash].as_ref().into(),
..Default::default()
}),
OverseerSignal::BlockFinalized(third_block_hash, 3),
];
loop {
select! {
res = overseer_fut => {
assert!(res.is_ok());
break;
},
res = rx_5.next() => {
if let Some(res) = res {
ss5_results.push(res);
}
}
res = rx_6.next() => {
if let Some(res) = res {
ss6_results.push(res);
}
}
complete => break,
}
if ss5_results.len() == expected_heartbeats.len() &&
ss6_results.len() == expected_heartbeats.len() {
handler.stop().await;
}
}
assert_eq!(ss5_results.len(), expected_heartbeats.len());
assert_eq!(ss6_results.len(), expected_heartbeats.len());
// Notifications on finality for multiple blocks at once
// may be received in different orders.
for expected in expected_heartbeats {
assert!(ss5_results.contains(&expected));
assert!(ss6_results.contains(&expected));
}
});
}
#[derive(Clone)]
struct CounterSubsystem {
stop_signals_received: Arc,
signals_received: Arc,
msgs_received: Arc,
}
impl CounterSubsystem {
fn new(
stop_signals_received: Arc,
signals_received: Arc,
msgs_received: Arc,
) -> Self {
Self {
stop_signals_received,
signals_received,
msgs_received,
}
}
}
impl Subsystem for CounterSubsystem
where
C: SubsystemContext,
M: Send,
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "counter-subsystem",
future: Box::pin(async move {
loop {
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
self.stop_signals_received.fetch_add(1, atomic::Ordering::SeqCst);
break;
},
Ok(Some(FromOverseer::Signal(_))) => {
self.signals_received.fetch_add(1, atomic::Ordering::SeqCst);
continue;
},
Ok(Some(FromOverseer::Communication { .. })) => {
self.msgs_received.fetch_add(1, atomic::Ordering::SeqCst);
continue;
},
Err(_) => (),
_ => (),
}
pending!();
}
Ok(())
}),
}
}
}
fn test_candidate_validation_msg() -> CandidateValidationMessage {
let (sender, _) = oneshot::channel();
let pov = Arc::new(PoV { block_data: BlockData(Vec::new()) });
CandidateValidationMessage::ValidateFromChainState(Default::default(), pov, sender)
}
fn test_candidate_backing_msg() -> CandidateBackingMessage {
let (sender, _) = oneshot::channel();
CandidateBackingMessage::GetBackedCandidates(Default::default(), Vec::new(), sender)
}
fn test_candidate_selection_msg() -> CandidateSelectionMessage {
CandidateSelectionMessage::default()
}
fn test_chain_api_msg() -> ChainApiMessage {
let (sender, _) = oneshot::channel();
ChainApiMessage::FinalizedBlockNumber(sender)
}
fn test_collator_generation_msg() -> CollationGenerationMessage {
CollationGenerationMessage::Initialize(CollationGenerationConfig {
key: CollatorPair::generate().0,
collator: Box::new(|_, _| TestCollator.boxed()),
para_id: Default::default(),
})
}
struct TestCollator;
impl Future for TestCollator {
type Output = Option;
fn poll(self: Pin<&mut Self>, _cx: &mut futures::task::Context) -> Poll {
panic!("at the Disco")
}
}
impl Unpin for TestCollator {}
fn test_collator_protocol_msg() -> CollatorProtocolMessage {
CollatorProtocolMessage::CollateOn(Default::default())
}
fn test_network_bridge_event() -> NetworkBridgeEvent {
NetworkBridgeEvent::PeerDisconnected(PeerId::random())
}
fn test_statement_distribution_msg() -> StatementDistributionMessage {
StatementDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}
fn test_availability_distribution_msg() -> AvailabilityDistributionMessage {
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}
fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}
fn test_provisioner_msg() -> ProvisionerMessage {
let (sender, _) = oneshot::channel();
ProvisionerMessage::RequestInherentData(Default::default(), sender)
}
fn test_pov_distribution_msg() -> PoVDistributionMessage {
PoVDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}
fn test_runtime_api_msg() -> RuntimeApiMessage {
let (sender, _) = oneshot::channel();
RuntimeApiMessage::Request(Default::default(), RuntimeApiRequest::Validators(sender))
}
fn test_availability_store_msg() -> AvailabilityStoreMessage {
let (sender, _) = oneshot::channel();
AvailabilityStoreMessage::QueryAvailableData(CandidateHash(Default::default()), sender)
}
fn test_network_bridge_msg() -> NetworkBridgeMessage {
NetworkBridgeMessage::ReportPeer(PeerId::random(), ReputationChange::new(42, ""))
}
// Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly.
#[test]
fn overseer_all_subsystems_receive_signals_and_messages() {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let stop_signals_received = Arc::new(atomic::AtomicUsize::new(0));
let signals_received = Arc::new(atomic::AtomicUsize::new(0));
let msgs_received = Arc::new(atomic::AtomicUsize::new(0));
let subsystem = CounterSubsystem::new(
stop_signals_received.clone(),
signals_received.clone(),
msgs_received.clone(),
);
let all_subsystems = AllSubsystems {
candidate_validation: subsystem.clone(),
candidate_backing: subsystem.clone(),
candidate_selection: subsystem.clone(),
collation_generation: subsystem.clone(),
collator_protocol: subsystem.clone(),
statement_distribution: subsystem.clone(),
availability_distribution: subsystem.clone(),
bitfield_signing: subsystem.clone(),
bitfield_distribution: subsystem.clone(),
provisioner: subsystem.clone(),
pov_distribution: subsystem.clone(),
runtime_api: subsystem.clone(),
availability_store: subsystem.clone(),
network_bridge: subsystem.clone(),
chain_api: subsystem.clone(),
};
let (overseer, mut handler) = Overseer::new(
vec![],
all_subsystems,
None,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
// send a signal to each subsystem
handler.block_imported(BlockInfo {
hash: Default::default(),
parent_hash: Default::default(),
number: Default::default(),
}).await;
// send a msg to each subsystem
// except for BitfieldSigning as the message is not instantiable
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await;
handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await;
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await;
handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await;
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await;
// send a stop signal to each subsystems
handler.stop().await;
select! {
res = overseer_fut => {
const NUM_SUBSYSTEMS: usize = 15;
assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
// x2 because of broadcast_signal on startup
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS);
// -1 for BitfieldSigning
assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);
assert!(res.is_ok());
},
complete => (),
}
});
}
}