lib.rs 107 KB
Newer Older
Fedor Sakharov's avatar
Fedor Sakharov committed
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! # Overseer
//!
//! `overseer` implements the Overseer architecture described in the
//! [implementers-guide](https://w3f.github.io/parachain-implementers-guide/node/index.html).
Fedor Sakharov's avatar
Fedor Sakharov committed
//! For the motivations behind implementing the overseer itself you should
//! check out that guide, documentation in this crate will be mostly discussing
//! technical stuff.
//!
//! An `Overseer` is something that allows spawning/stopping and overseing
//! asynchronous tasks as well as establishing a well-defined and easy to use
//! protocol that the tasks can use to communicate with each other. It is desired
//! that this protocol is the only way tasks communicate with each other, however
//! at this moment there are no foolproof guards against other ways of communication.
//!
//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
//! share the same behavior from `Overseer`'s point of view.
//!
//! ```text
//!                              +-----------------------------+
//!                              |         Overseer            |
//!                              +-----------------------------+
//!
//!             ................|  Overseer "holds" these and uses |..............
//!             .                  them to (re)start things                      .
//!             .                                                                .
//!             .  +-------------------+                +---------------------+  .
//!             .  |   Subsystem1      |                |   Subsystem2        |  .
//!             .  +-------------------+                +---------------------+  .
//!             .           |                                       |            .
//!             ..................................................................
//!                         |                                       |
//!                       start()                                 start()
//!                         V                                       V
//!             ..................| Overseer "runs" these |.......................
//!             .  +--------------------+               +---------------------+  .
//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
//!             .  +--------------------+               +---------------------+  .
//!             ..................................................................
//! ```

// #![deny(unused_results)]
// unused dependencies can not work for test and examples at the same time
// yielding false positives
#![warn(missing_docs)]

Fedor Sakharov's avatar
Fedor Sakharov committed
use std::pin::Pin;
use std::sync::{atomic::{self, AtomicUsize}, Arc};
Fedor Sakharov's avatar
Fedor Sakharov committed
use std::task::Poll;
use std::time::Duration;
use std::collections::{hash_map, HashMap};
Fedor Sakharov's avatar
Fedor Sakharov committed

use futures::channel::oneshot;
Fedor Sakharov's avatar
Fedor Sakharov committed
use futures::{
	poll, select,
	stream::{self, FuturesUnordered, Fuse},
	Future, FutureExt, StreamExt,
Fedor Sakharov's avatar
Fedor Sakharov committed
};
use futures_timer::Delay;

use polkadot_primitives::v1::{Block, BlockId,BlockNumber, Hash, ParachainHost};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use sp_api::{ApiExt, ProvideRuntimeApi};
use polkadot_subsystem::messages::{
	CandidateValidationMessage, CandidateBackingMessage,
	CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage,
	AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
	ProvisionerMessage, RuntimeApiMessage,
	AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage,
	CollatorProtocolMessage, AvailabilityRecoveryMessage, ApprovalDistributionMessage,
	ApprovalVotingMessage, GossipSupportMessage,
};
pub use polkadot_subsystem::{
	Subsystem, SubsystemContext, SubsystemSender, OverseerSignal, FromOverseer, SubsystemError,
	SubsystemResult, SpawnedSubsystem, ActiveLeavesUpdate, ActivatedLeaf, DummySubsystem, jaeger,
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, metered, Metronome};
use polkadot_node_primitives::SpawnNamed;
use polkadot_procmacro_overseer_subsystems_gen::AllSubsystemsGen;
Fedor Sakharov's avatar
Fedor Sakharov committed
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
// The capacity of signal channels to subsystems.
const SIGNAL_CHANNEL_CAPACITY: usize = 64;

Fedor Sakharov's avatar
Fedor Sakharov committed
// A graceful `Overseer` teardown time delay.
const STOP_DELAY: u64 = 1;
// Target for logs.
const LOG_TARGET: &'static str = "parachain::overseer";
trait MapSubsystem<T> {
	type Output;
	fn map_subsystem(&self, sub: T) -> Self::Output;
}
Fedor Sakharov's avatar
Fedor Sakharov committed

impl<F, T, U> MapSubsystem<T> for F where F: Fn(T) -> U {
	type Output = U;
	fn map_subsystem(&self, sub: T) -> U {
		(self)(sub)
	}
/// Whether a header supports parachain consensus or not.
pub trait HeadSupportsParachains {
	/// Return true if the given header supports parachain consensus. Otherwise, false.
	fn head_supports_parachains(&self, head: &Hash) -> bool;
}

impl<Client> HeadSupportsParachains for Arc<Client> where
	Client: ProvideRuntimeApi<Block>,
	Client::Api: ParachainHost<Block>,
{
	fn head_supports_parachains(&self, head: &Hash) -> bool {
		let id = BlockId::Hash(*head);
		self.runtime_api().has_api::<dyn ParachainHost<Block>>(&id).unwrap_or(false)
	}
}

/// This struct is passed as an argument to create a new instance of an [`Overseer`].
/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows
/// mocking in the test code:
///
/// Each [`Subsystem`] is supposed to implement some interface that is generic over
/// message type that is specific to this [`Subsystem`]. At the moment not all
/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`].
#[derive(Debug, Clone, AllSubsystemsGen)]
pub struct AllSubsystems<
	CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (),
	RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (),
	GS = (),
> {
	/// A candidate validation subsystem.
	pub candidate_validation: CV,
	/// A candidate backing subsystem.
	pub candidate_backing: CB,
	/// A candidate selection subsystem.
	pub candidate_selection: CS,
	/// A statement distribution subsystem.
	pub statement_distribution: SD,
	/// An availability distribution subsystem.
	pub availability_distribution: AD,
	/// An availability recovery subsystem.
	pub availability_recovery: AR,
	/// A bitfield signing subsystem.
	pub bitfield_signing: BS,
	/// A bitfield distribution subsystem.
	pub bitfield_distribution: BD,
	/// A provisioner subsystem.
	pub provisioner: P,
	/// A runtime API subsystem.
	pub runtime_api: RA,
	/// An availability store subsystem.
	pub availability_store: AS,
	/// A network bridge subsystem.
	pub network_bridge: NB,
	/// A Chain API subsystem.
	pub chain_api: CA,
	/// A Collation Generation subsystem.
	pub collation_generation: CG,
	/// A Collator Protocol subsystem.
	pub collator_protocol: CP,
	/// An Approval Distribution subsystem.
	pub approval_distribution: ApD,
	/// An Approval Voting subsystem.
	pub approval_voting: ApV,
	/// A Connection Request Issuer subsystem.
	pub gossip_support: GS,
impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
	AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
{
	/// Create a new instance of [`AllSubsystems`].
	///
	/// Each subsystem is set to [`DummySystem`].
	///
	///# Note
	///
	/// Because of a bug in rustc it is required that when calling this function,
	/// you provide a "random" type for the first generic parameter:
	///
	/// ```
	/// polkadot_overseer::AllSubsystems::<()>::dummy();
	/// ```
	pub fn dummy() -> AllSubsystems<
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
	> {
		AllSubsystems {
			candidate_validation: DummySubsystem,
			candidate_backing: DummySubsystem,
			candidate_selection: DummySubsystem,
			statement_distribution: DummySubsystem,
			availability_distribution: DummySubsystem,
			availability_recovery: DummySubsystem,
			bitfield_signing: DummySubsystem,
			bitfield_distribution: DummySubsystem,
			provisioner: DummySubsystem,
			runtime_api: DummySubsystem,
			availability_store: DummySubsystem,
			network_bridge: DummySubsystem,
			chain_api: DummySubsystem,
			collation_generation: DummySubsystem,
			collator_protocol: DummySubsystem,
			approval_distribution: DummySubsystem,
			approval_voting: DummySubsystem,
			gossip_support: DummySubsystem,
	fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ CS, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> {
			candidate_validation: &self.candidate_validation,
			candidate_backing: &self.candidate_backing,
			candidate_selection: &self.candidate_selection,
			statement_distribution: &self.statement_distribution,
			availability_distribution: &self.availability_distribution,
			availability_recovery: &self.availability_recovery,
			bitfield_signing: &self.bitfield_signing,
			bitfield_distribution: &self.bitfield_distribution,
			provisioner: &self.provisioner,
			runtime_api: &self.runtime_api,
			availability_store: &self.availability_store,
			network_bridge: &self.network_bridge,
			chain_api: &self.chain_api,
			collation_generation: &self.collation_generation,
			collator_protocol: &self.collator_protocol,
			approval_distribution: &self.approval_distribution,
			approval_voting: &self.approval_voting,
			gossip_support: &self.gossip_support,
	fn map_subsystems<M>(self, m: M)
		-> AllSubsystems<
			<M as MapSubsystem<CV>>::Output,
			<M as MapSubsystem<CB>>::Output,
			<M as MapSubsystem<CS>>::Output,
			<M as MapSubsystem<SD>>::Output,
			<M as MapSubsystem<AD>>::Output,
			<M as MapSubsystem<AR>>::Output,
			<M as MapSubsystem<BS>>::Output,
			<M as MapSubsystem<BD>>::Output,
			<M as MapSubsystem<P>>::Output,
			<M as MapSubsystem<RA>>::Output,
			<M as MapSubsystem<AS>>::Output,
			<M as MapSubsystem<NB>>::Output,
			<M as MapSubsystem<CA>>::Output,
			<M as MapSubsystem<CG>>::Output,
			<M as MapSubsystem<CP>>::Output,
			<M as MapSubsystem<ApD>>::Output,
			<M as MapSubsystem<ApV>>::Output,
			<M as MapSubsystem<GS>>::Output,
		>
	where
		M: MapSubsystem<CV>,
		M: MapSubsystem<CB>,
		M: MapSubsystem<CS>,
		M: MapSubsystem<SD>,
		M: MapSubsystem<AD>,
		M: MapSubsystem<AR>,
		M: MapSubsystem<BS>,
		M: MapSubsystem<BD>,
		M: MapSubsystem<P>,
		M: MapSubsystem<RA>,
		M: MapSubsystem<AS>,
		M: MapSubsystem<NB>,
		M: MapSubsystem<CA>,
		M: MapSubsystem<CG>,
		M: MapSubsystem<CP>,
		M: MapSubsystem<ApD>,
		M: MapSubsystem<ApV>,
		M: MapSubsystem<GS>,
	{
			candidate_validation: m.map_subsystem(self.candidate_validation),
			candidate_backing: m.map_subsystem(self.candidate_backing),
			candidate_selection: m.map_subsystem(self.candidate_selection),
			statement_distribution: m.map_subsystem(self.statement_distribution),
			availability_distribution: m.map_subsystem(self.availability_distribution),
			availability_recovery: m.map_subsystem(self.availability_recovery),
			bitfield_signing: m.map_subsystem(self.bitfield_signing),
			bitfield_distribution: m.map_subsystem(self.bitfield_distribution),
			provisioner: m.map_subsystem(self.provisioner),
			runtime_api: m.map_subsystem(self.runtime_api),
			availability_store: m.map_subsystem(self.availability_store),
			network_bridge: m.map_subsystem(self.network_bridge),
			chain_api: m.map_subsystem(self.chain_api),
			collation_generation: m.map_subsystem(self.collation_generation),
			collator_protocol: m.map_subsystem(self.collator_protocol),
			approval_distribution: m.map_subsystem(self.approval_distribution),
			approval_voting: m.map_subsystem(self.approval_voting),
			gossip_support: m.map_subsystem(self.gossip_support),
		}
	}
}

type AllSubsystemsSame<T> = AllSubsystems<
	T, T, T, T, T,
	T, T, T, T, T,
	T, T, T, T, T,
	T, T, T,
>;

/// A type of messages that are sent from [`Subsystem`] to [`Overseer`].
///
/// It wraps a system-wide [`AllMessages`] type that represents all possible
/// messages in the system.
///
/// [`AllMessages`]: enum.AllMessages.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`Overseer`]: struct.Overseer.html
enum ToOverseer {
	/// A message that wraps something the `Subsystem` is desiring to
	/// spawn on the overseer and a `oneshot::Sender` to signal the result
	/// of the spawn.
	SpawnJob {
		name: &'static str,
		s: BoxFuture<'static, ()>,
	},

	/// Same as `SpawnJob` but for blocking tasks to be executed on a
	/// dedicated thread pool.
	SpawnBlockingJob {
		name: &'static str,
		s: BoxFuture<'static, ()>,
	},
}

/// An event telling the `Overseer` on the particular block
/// that has been imported or finalized.
///
/// This structure exists solely for the purposes of decoupling
/// `Overseer` code from the client code and the necessity to call
/// `HeaderBackend::block_number_from_id()`.
#[derive(Debug, Clone)]
pub struct BlockInfo {
	/// hash of the block.
	pub hash: Hash,
	/// hash of the parent block.
	pub parent_hash: Hash,
	/// block's number.
	pub number: BlockNumber,
}

impl From<BlockImportNotification<Block>> for BlockInfo {
	fn from(n: BlockImportNotification<Block>) -> Self {
		BlockInfo {
			hash: n.hash,
			parent_hash: n.header.parent_hash,
			number: n.header.number,
		}
	}
}

impl From<FinalityNotification<Block>> for BlockInfo {
	fn from(n: FinalityNotification<Block>) -> Self {
		BlockInfo {
			hash: n.hash,
			parent_hash: n.header.parent_hash,
			number: n.header.number,
		}
	}
}

/// Some event from the outer world.
enum Event {
	BlockImported(BlockInfo),
	BlockFinalized(BlockInfo),
	MsgToSubsystem(AllMessages),
	ExternalRequest(ExternalRequest),
	Stop,
}

/// Some request from outer world.
enum ExternalRequest {
	WaitForActivation {
		hash: Hash,
		response_channel: oneshot::Sender<SubsystemResult<()>>,
	},
}

/// A handler used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub struct OverseerHandler {
	events_tx: metered::MeteredSender<Event>,
}

impl OverseerHandler {
	/// Inform the `Overseer` that that some block was imported.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	pub async fn block_imported(&mut self, block: BlockInfo) {
		self.send_and_log_error(Event::BlockImported(block)).await
	}

	/// Send some message to one of the `Subsystem`s.
	#[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))]
	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) {
		self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await
	}

	/// Inform the `Overseer` that some block was finalized.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	pub async fn block_finalized(&mut self, block: BlockInfo) {
		self.send_and_log_error(Event::BlockFinalized(block)).await
	}

	/// Wait for a block with the given hash to be in the active-leaves set.
	///
	/// The response channel responds if the hash was activated and is closed if the hash was deactivated.
	/// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas,
	/// the response channel may never return if the hash was deactivated before this call.
	/// In this case, it's the caller's responsibility to ensure a timeout is set.
	#[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))]
	pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<SubsystemResult<()>>) {
		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
			hash,
			response_channel
		})).await
	}

	/// Tell `Overseer` to shutdown.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	pub async fn stop(&mut self) {
		self.send_and_log_error(Event::Stop).await
	}

	async fn send_and_log_error(&mut self, event: Event) {
		if self.events_tx.send(event).await.is_err() {
			tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
		}
	}
}

/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
/// import and finality notifications into the [`OverseerHandler`].
///
/// [`Overseer`]: struct.Overseer.html
/// [`OverseerHandler`]: struct.OverseerHandler.html
pub async fn forward_events<P: BlockchainEvents<Block>>(
	client: Arc<P>,
	mut handler: OverseerHandler,
) {
	let mut finality = client.finality_notification_stream();
	let mut imports = client.import_notification_stream();

	loop {
		select! {
			f = finality.next() => {
				match f {
					Some(block) => {
						handler.block_finalized(block.into()).await;
					}
					None => break,
				}
			},
			i = imports.next() => {
				match i {
					Some(block) => {
						handler.block_imported(block.into()).await;
					}
					None => break,
				}
			},
			complete => break,
		}
	}
}

impl Debug for ToOverseer {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		match self {
			ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)"),
			ToOverseer::SpawnBlockingJob { .. } => write!(f, "OverseerMessage::SpawnBlocking(..)")
		}
	}
}

/// A running instance of some [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M> {
	tx_signal: metered::MeteredSender<OverseerSignal>,
	tx_bounded: metered::MeteredSender<MessagePacket<M>>,
	meters: SubsystemMeters,
	signals_received: usize,
	name: &'static str,
}

#[derive(Debug)]
struct MessagePacket<T> {
	signals_received: usize,
	message: T,
}

fn make_packet<T>(signals_received: usize, message: T) -> MessagePacket<T> {
	MessagePacket {
		signals_received,
		message,
	}
}

// The channels held by every subsystem to communicate with every other subsystem.
#[derive(Debug, Clone)]
struct ChannelsOut {
	candidate_validation: metered::MeteredSender<MessagePacket<CandidateValidationMessage>>,
	candidate_backing: metered::MeteredSender<MessagePacket<CandidateBackingMessage>>,
	candidate_selection: metered::MeteredSender<MessagePacket<CandidateSelectionMessage>>,
	statement_distribution: metered::MeteredSender<MessagePacket<StatementDistributionMessage>>,
	availability_distribution: metered::MeteredSender<MessagePacket<AvailabilityDistributionMessage>>,
	availability_recovery: metered::MeteredSender<MessagePacket<AvailabilityRecoveryMessage>>,
	bitfield_signing: metered::MeteredSender<MessagePacket<BitfieldSigningMessage>>,
	bitfield_distribution: metered::MeteredSender<MessagePacket<BitfieldDistributionMessage>>,
	provisioner: metered::MeteredSender<MessagePacket<ProvisionerMessage>>,
	runtime_api: metered::MeteredSender<MessagePacket<RuntimeApiMessage>>,
	availability_store: metered::MeteredSender<MessagePacket<AvailabilityStoreMessage>>,
	network_bridge: metered::MeteredSender<MessagePacket<NetworkBridgeMessage>>,
	chain_api: metered::MeteredSender<MessagePacket<ChainApiMessage>>,
	collation_generation: metered::MeteredSender<MessagePacket<CollationGenerationMessage>>,
	collator_protocol: metered::MeteredSender<MessagePacket<CollatorProtocolMessage>>,
	approval_distribution: metered::MeteredSender<MessagePacket<ApprovalDistributionMessage>>,
	approval_voting: metered::MeteredSender<MessagePacket<ApprovalVotingMessage>>,
	gossip_support: metered::MeteredSender<MessagePacket<GossipSupportMessage>>,

	candidate_validation_unbounded: metered::UnboundedMeteredSender<MessagePacket<CandidateValidationMessage>>,
	candidate_backing_unbounded: metered::UnboundedMeteredSender<MessagePacket<CandidateBackingMessage>>,
	candidate_selection_unbounded: metered::UnboundedMeteredSender<MessagePacket<CandidateSelectionMessage>>,
	statement_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<StatementDistributionMessage>>,
	availability_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<AvailabilityDistributionMessage>>,
	availability_recovery_unbounded: metered::UnboundedMeteredSender<MessagePacket<AvailabilityRecoveryMessage>>,
	bitfield_signing_unbounded: metered::UnboundedMeteredSender<MessagePacket<BitfieldSigningMessage>>,
	bitfield_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<BitfieldDistributionMessage>>,
	provisioner_unbounded: metered::UnboundedMeteredSender<MessagePacket<ProvisionerMessage>>,
	runtime_api_unbounded: metered::UnboundedMeteredSender<MessagePacket<RuntimeApiMessage>>,
	availability_store_unbounded: metered::UnboundedMeteredSender<MessagePacket<AvailabilityStoreMessage>>,
	network_bridge_unbounded: metered::UnboundedMeteredSender<MessagePacket<NetworkBridgeMessage>>,
	chain_api_unbounded: metered::UnboundedMeteredSender<MessagePacket<ChainApiMessage>>,
	collation_generation_unbounded: metered::UnboundedMeteredSender<MessagePacket<CollationGenerationMessage>>,
	collator_protocol_unbounded: metered::UnboundedMeteredSender<MessagePacket<CollatorProtocolMessage>>,
	approval_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<ApprovalDistributionMessage>>,
	approval_voting_unbounded: metered::UnboundedMeteredSender<MessagePacket<ApprovalVotingMessage>>,
	gossip_support_unbounded: metered::UnboundedMeteredSender<MessagePacket<GossipSupportMessage>>,
}

impl ChannelsOut {
	async fn send_and_log_error(
		&mut self,
		signals_received: usize,
		message: AllMessages,
	) {
		let res = match message {
			AllMessages::CandidateValidation(msg) => {
				self.candidate_validation.send(make_packet(signals_received, msg)).await
			},
			AllMessages::CandidateBacking(msg) => {
				self.candidate_backing.send(make_packet(signals_received, msg)).await
			},
			AllMessages::CandidateSelection(msg) => {
				self.candidate_selection.send(make_packet(signals_received, msg)).await
			},
			AllMessages::StatementDistribution(msg) => {
				self.statement_distribution.send(make_packet(signals_received, msg)).await
			},
			AllMessages::AvailabilityDistribution(msg) => {
				self.availability_distribution.send(make_packet(signals_received, msg)).await
			},
			AllMessages::AvailabilityRecovery(msg) => {
				self.availability_recovery.send(make_packet(signals_received, msg)).await
			},
			AllMessages::BitfieldDistribution(msg) => {
				self.bitfield_distribution.send(make_packet(signals_received, msg)).await
			},
			AllMessages::BitfieldSigning(msg) => {
				self.bitfield_signing.send(make_packet(signals_received, msg)).await
			},
			AllMessages::Provisioner(msg) => {
				self.provisioner.send(make_packet(signals_received, msg)).await
			},
			AllMessages::RuntimeApi(msg) => {
				self.runtime_api.send(make_packet(signals_received, msg)).await
			},
			AllMessages::AvailabilityStore(msg) => {
				self.availability_store.send(make_packet(signals_received, msg)).await
			},
			AllMessages::NetworkBridge(msg) => {
				self.network_bridge.send(make_packet(signals_received, msg)).await
			},
			AllMessages::ChainApi(msg) => {
				self.chain_api.send(make_packet(signals_received, msg)).await
			},
			AllMessages::CollationGeneration(msg) => {
				self.collation_generation.send(make_packet(signals_received, msg)).await
			},
			AllMessages::CollatorProtocol(msg) => {
				self.collator_protocol.send(make_packet(signals_received, msg)).await
			},
			AllMessages::ApprovalDistribution(msg) => {
				self.approval_distribution.send(make_packet(signals_received, msg)).await
			},
			AllMessages::ApprovalVoting(msg) => {
				self.approval_voting.send(make_packet(signals_received, msg)).await
			},
			AllMessages::GossipSupport(msg) => {
				self.gossip_support.send(make_packet(signals_received, msg)).await
			},
		};

		if res.is_err() {
			tracing::debug!(
				target: LOG_TARGET,
				"Failed to send a message to another subsystem",
			);

	fn send_unbounded_and_log_error(
		&self,
		signals_received: usize,
		message: AllMessages,
	) {
		let res = match message {
			AllMessages::CandidateValidation(msg) => {
				self.candidate_validation_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::CandidateBacking(msg) => {
				self.candidate_backing_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::CandidateSelection(msg) => {
				self.candidate_selection_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::StatementDistribution(msg) => {
				self.statement_distribution_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::AvailabilityDistribution(msg) => {
				self.availability_distribution_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::AvailabilityRecovery(msg) => {
				self.availability_recovery_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::BitfieldDistribution(msg) => {
				self.bitfield_distribution_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::BitfieldSigning(msg) => {
				self.bitfield_signing_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::Provisioner(msg) => {
				self.provisioner_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::RuntimeApi(msg) => {
				self.runtime_api_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::AvailabilityStore(msg) => {
				self.availability_store_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::NetworkBridge(msg) => {
				self.network_bridge_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::ChainApi(msg) => {
				self.chain_api_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::CollationGeneration(msg) => {
				self.collation_generation_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::CollatorProtocol(msg) => {
				self.collator_protocol_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::ApprovalDistribution(msg) => {
				self.approval_distribution_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::ApprovalVoting(msg) => {
				self.approval_voting_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
			AllMessages::GossipSupport(msg) => {
				self.gossip_support_unbounded
					.unbounded_send(make_packet(signals_received, msg))
					.map_err(|e| e.into_send_error())
			},
		};

		if res.is_err() {
			tracing::debug!(
				target: LOG_TARGET,
				"Failed to send a message to another subsystem",
			);
type SubsystemIncomingMessages<M> = stream::Select<
	metered::MeteredReceiver<MessagePacket<M>>,
	metered::UnboundedMeteredReceiver<MessagePacket<M>>,
>;

#[derive(Debug, Default, Clone)]
struct SignalsReceived(Arc<AtomicUsize>);

impl SignalsReceived {
	fn load(&self) -> usize {
		self.0.load(atomic::Ordering::SeqCst)
	fn inc(&self) {
		self.0.fetch_add(1, atomic::Ordering::SeqCst);
/// A sender from subsystems to other subsystems.
#[derive(Debug, Clone)]
pub struct OverseerSubsystemSender {
	channels: ChannelsOut,
	signals_received: SignalsReceived,
}

#[async_trait::async_trait]
impl SubsystemSender for OverseerSubsystemSender {
	async fn send_message(&mut self, msg: AllMessages) {
		self.channels.send_and_log_error(self.signals_received.load(), msg).await;
	async fn send_messages<T>(&mut self, msgs: T)
		where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
	{
		// This can definitely be optimized if necessary.
		for msg in msgs {
			self.send_message(msg).await;
	fn send_unbounded_message(&mut self, msg: AllMessages) {
		self.channels.send_unbounded_and_log_error(self.signals_received.load(), msg);
/// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or to spawn it's [`SubsystemJob`]s.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
#[derive(Debug)]
pub struct OverseerSubsystemContext<M>{
	signals: metered::MeteredReceiver<OverseerSignal>,
	messages: SubsystemIncomingMessages<M>,
	to_subsystems: OverseerSubsystemSender,
	to_overseer: metered::UnboundedMeteredSender<ToOverseer>,
	signals_received: SignalsReceived,
	pending_incoming: Option<(usize, M)>,
	metrics: Metrics,
}

impl<M> OverseerSubsystemContext<M> {
	/// Create a new `OverseerSubsystemContext`.
	fn new(
		signals: metered::MeteredReceiver<OverseerSignal>,
		messages: SubsystemIncomingMessages<M>,
		to_subsystems: ChannelsOut,
		to_overseer: metered::UnboundedMeteredSender<ToOverseer>,
		metrics: Metrics,
	) -> Self {
		let signals_received = SignalsReceived::default();
		OverseerSubsystemContext {
			signals,
			messages,
			to_subsystems: OverseerSubsystemSender {
				channels: to_subsystems,
				signals_received: signals_received.clone(),
			},
			to_overseer,
			signals_received,
			pending_incoming: None,
			metrics,
		 }
	/// Create a new `OverseerSubsystemContext` with no metering.
	///
	/// Intended for tests.
	#[allow(unused)]
	fn new_unmetered(
		signals: metered::MeteredReceiver<OverseerSignal>,
		messages: SubsystemIncomingMessages<M>,
		to_subsystems: ChannelsOut,
		to_overseer: metered::UnboundedMeteredSender<ToOverseer>,
	) -> Self {
		let metrics = Metrics::default();
		OverseerSubsystemContext::new(signals, messages, to_subsystems, to_overseer, metrics)
#[async_trait::async_trait]
impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
	type Message = M;
	type Sender = OverseerSubsystemSender;

	async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
		match poll!(self.recv()) {
			Poll::Ready(msg) => Ok(Some(msg.map_err(|_| ())?)),
			Poll::Pending => Ok(None),
	async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
		loop {
			// If we have a message pending an overseer signal, we only poll for signals
			// in the meantime.
			if let Some((needs_signals_received, msg)) = self.pending_incoming.take() {
				if needs_signals_received <= self.signals_received.load() {
					return Ok(FromOverseer::Communication { msg });
				} else {
					self.pending_incoming = Some((needs_signals_received, msg));

					// wait for next signal.
					let signal = self.signals.next().await
						.ok_or(SubsystemError::Context(
							"Signal channel is terminated and empty."
							.to_owned()
						))?;

					self.signals_received.inc();
					return Ok(FromOverseer::Signal(signal))
				}
			}

			let mut await_message = self.messages.next().fuse();
			let mut await_signal = self.signals.next().fuse();
			let signals_received = self.signals_received.load();
			let pending_incoming = &mut self.pending_incoming;

			// Otherwise, wait for the next signal or incoming message.
			let from_overseer = futures::select_biased! {
				signal = await_signal => {
					let signal = signal
						.ok_or(SubsystemError::Context(
							"Signal channel is terminated and empty."
							.to_owned()
						))?;

					FromOverseer::Signal(signal)
				}
				msg = await_message => {
					let packet = msg
						.ok_or(SubsystemError::Context(
							"Message channel is terminated and empty."
							.to_owned()
						))?;

					if packet.signals_received > signals_received {
						// wait until we've received enough signals to return this message.
						*pending_incoming = Some((packet.signals_received, packet.message));
						continue;
					} else {
						// we know enough to return this message.
						FromOverseer::Communication { msg: packet.message}
					}
				}
			};

			if let FromOverseer::Signal(_) = from_overseer {
				self.signals_received.inc();
			}

			return Ok(from_overseer);
	async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
		-> SubsystemResult<()>
	{
		self.to_overseer.send(ToOverseer::SpawnJob {
			name,
			s,
		}).await.map_err(Into::into)
	async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
		-> SubsystemResult<()>
	{
		self.to_overseer.send(ToOverseer::SpawnBlockingJob {
			name,
			s,
		}).await.map_err(Into::into)
	fn sender(&mut self) -> &mut OverseerSubsystemSender {
		&mut self.to_subsystems
/// A subsystem that we oversee.
///
/// Ties together the [`Subsystem`] itself and it's running instance
/// (which may be missing if the [`Subsystem`] is not running at the moment
/// for whatever reason).
///
/// [`Subsystem`]: trait.Subsystem.html
struct OverseenSubsystem<M> {
	instance: Option<SubsystemInstance<M>>,
}

impl<M> OverseenSubsystem<M> {
	/// Send a message to the wrapped subsystem.
	///
	/// If the inner `instance` is `None`, nothing is happening.
	async fn send_message(&mut self, msg: M) -> SubsystemResult<()> {
		const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);

		if let Some(ref mut instance) = self.instance {
			match instance.tx_bounded.send(MessagePacket {
				signals_received: instance.signals_received,
				message: msg.into()
			}).timeout(MESSAGE_TIMEOUT).await
			{
				None => {
					tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
					Err(SubsystemError::SubsystemStalled(instance.name))
				}
				Some(res) => res.map_err(Into::into),
			}
		} else {
			Ok(())
	/// Send a signal to the wrapped subsystem.
	///