lib.rs 74.4 KiB
Newer Older
Fedor Sakharov's avatar
Fedor Sakharov committed
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! # Overseer
//!
//! `overseer` implements the Overseer architecture described in the
//! [implementers-guide](https://w3f.github.io/parachain-implementers-guide/node/index.html).
Fedor Sakharov's avatar
Fedor Sakharov committed
//! For the motivations behind implementing the overseer itself you should
//! check out that guide, documentation in this crate will be mostly discussing
//! technical stuff.
//!
//! An `Overseer` is something that allows spawning/stopping and overseing
//! asynchronous tasks as well as establishing a well-defined and easy to use
//! protocol that the tasks can use to communicate with each other. It is desired
//! that this protocol is the only way tasks communicate with each other, however
//! at this moment there are no foolproof guards against other ways of communication.
//!
//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
//! share the same behavior from `Overseer`'s point of view.
//!
//! ```text
//!                              +-----------------------------+
//!                              |         Overseer            |
//!                              +-----------------------------+
//!
//!             ................|  Overseer "holds" these and uses |..............
//!             .                  them to (re)start things                      .
//!             .                                                                .
//!             .  +-------------------+                +---------------------+  .
//!             .  |   Subsystem1      |                |   Subsystem2        |  .
//!             .  +-------------------+                +---------------------+  .
//!             .           |                                       |            .
//!             ..................................................................
//!                         |                                       |
//!                       start()                                 start()
//!                         V                                       V
//!             ..................| Overseer "runs" these |.......................
//!             .  +--------------------+               +---------------------+  .
//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
//!             .  +--------------------+               +---------------------+  .
//!             ..................................................................
//! ```

// #![deny(unused_results)]
// unused dependencies can not work for test and examples at the same time
// yielding false positives
#![warn(missing_docs)]

Fedor Sakharov's avatar
Fedor Sakharov committed
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
Fedor Sakharov's avatar
Fedor Sakharov committed
use std::task::Poll;
use std::time::Duration;
use std::collections::{hash_map, HashMap};
Fedor Sakharov's avatar
Fedor Sakharov committed

use futures::channel::{mpsc, oneshot};
use futures::{
	poll, select,
	stream::{self, FuturesUnordered},
Fedor Sakharov's avatar
Fedor Sakharov committed
	Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use streamunordered::{StreamYield, StreamUnordered};

use polkadot_primitives::v1::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_subsystem::messages::{
	CandidateValidationMessage, CandidateBackingMessage,
	CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage,
	AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
	ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage,
	AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage,
};
pub use polkadot_subsystem::{
	Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
	SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem,
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}};
use polkadot_node_primitives::SpawnNamed;
Fedor Sakharov's avatar
Fedor Sakharov committed

// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
// A graceful `Overseer` teardown time delay.
const STOP_DELAY: u64 = 1;
// Target for logs.
const LOG_TARGET: &'static str = "overseer";

Fedor Sakharov's avatar
Fedor Sakharov committed
/// A type of messages that are sent from [`Subsystem`] to [`Overseer`].
///
/// It wraps a system-wide [`AllMessages`] type that represents all possible
/// messages in the system.
///
/// [`AllMessages`]: enum.AllMessages.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`Overseer`]: struct.Overseer.html
enum ToOverseer {
	/// This is a message sent by a `Subsystem`.
	SubsystemMessage(AllMessages),

	/// A message that wraps something the `Subsystem` is desiring to
	/// spawn on the overseer and a `oneshot::Sender` to signal the result
	/// of the spawn.
	SpawnJob {
Fedor Sakharov's avatar
Fedor Sakharov committed
		s: BoxFuture<'static, ()>,
	},

	/// Same as `SpawnJob` but for blocking tasks to be executed on a
	/// dedicated thread pool.
	SpawnBlockingJob {
		name: &'static str,
		s: BoxFuture<'static, ()>,
	},
/// An event telling the `Overseer` on the particular block
/// that has been imported or finalized.
///
/// This structure exists solely for the purposes of decoupling
/// `Overseer` code from the client code and the necessity to call
/// `HeaderBackend::block_number_from_id()`.
pub struct BlockInfo {
	/// hash of the block.
	pub hash: Hash,
	/// hash of the parent block.
	pub parent_hash: Hash,
	/// block's number.
	pub number: BlockNumber,
}

impl From<BlockImportNotification<Block>> for BlockInfo {
	fn from(n: BlockImportNotification<Block>) -> Self {
		BlockInfo {
			hash: n.hash,
			parent_hash: n.header.parent_hash,
			number: n.header.number,
		}
	}
}

impl From<FinalityNotification<Block>> for BlockInfo {
	fn from(n: FinalityNotification<Block>) -> Self {
		BlockInfo {
			hash: n.hash,
			parent_hash: n.header.parent_hash,
			number: n.header.number,
		}
	}
}

/// Some event from the outer world.
Fedor Sakharov's avatar
Fedor Sakharov committed
enum Event {
	BlockImported(BlockInfo),
	BlockFinalized(BlockInfo),
Fedor Sakharov's avatar
Fedor Sakharov committed
	MsgToSubsystem(AllMessages),
	ExternalRequest(ExternalRequest),
Fedor Sakharov's avatar
Fedor Sakharov committed
	Stop,
}

/// Some request from outer world.
enum ExternalRequest {
	WaitForActivation {
		hash: Hash,
		response_channel: oneshot::Sender<SubsystemResult<()>>,
Fedor Sakharov's avatar
Fedor Sakharov committed
/// A handler used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
Fedor Sakharov's avatar
Fedor Sakharov committed
pub struct OverseerHandler {
	events_tx: mpsc::Sender<Event>,
}

impl OverseerHandler {
	/// Inform the `Overseer` that that some block was imported.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	pub async fn block_imported(&mut self, block: BlockInfo) {
		self.send_and_log_error(Event::BlockImported(block)).await
Fedor Sakharov's avatar
Fedor Sakharov committed
	}

	/// Send some message to one of the `Subsystem`s.
	#[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))]
	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) {
		self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await
	/// Inform the `Overseer` that some block was finalized.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	pub async fn block_finalized(&mut self, block: BlockInfo) {
		self.send_and_log_error(Event::BlockFinalized(block)).await
Fedor Sakharov's avatar
Fedor Sakharov committed

	/// Wait for a block with the given hash to be in the active-leaves set.
	/// This method is used for external code like `Proposer` that doesn't subscribe to Overseer's signals.
	///
	/// The response channel responds if the hash was activated and is closed if the hash was deactivated.
	/// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas,
	/// the response channel may never return if the hash was deactivated before this call.
	/// In this case, it's the caller's responsibility to ensure a timeout is set.
	#[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))]
	pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<SubsystemResult<()>>) {
		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
Fedor Sakharov's avatar
Fedor Sakharov committed
	}

	/// Tell `Overseer` to shutdown.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	pub async fn stop(&mut self) {
		self.send_and_log_error(Event::Stop).await
	}

	async fn send_and_log_error(&mut self, event: Event) {
		if self.events_tx.send(event).await.is_err() {
			tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
		}
/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
/// import and finality notifications into the [`OverseerHandler`].
///
/// [`Overseer`]: struct.Overseer.html
/// [`OverseerHandler`]: struct.OverseerHandler.html
pub async fn forward_events<P: BlockchainEvents<Block>>(
	client: Arc<P>,
	mut handler: OverseerHandler,
	let mut finality = client.finality_notification_stream();
	let mut imports = client.import_notification_stream();

	loop {
		select! {
			f = finality.next() => {
				match f {
					Some(block) => {
						handler.block_finalized(block.into()).await;
					}
					None => break,
				}
			},
			i = imports.next() => {
				match i {
					Some(block) => {
						handler.block_imported(block.into()).await;
Fedor Sakharov's avatar
Fedor Sakharov committed
impl Debug for ToOverseer {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		match self {
			ToOverseer::SubsystemMessage(msg) => {
				write!(f, "OverseerMessage::SubsystemMessage({:?})", msg)
			}
			ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)"),
			ToOverseer::SpawnBlockingJob { .. } => write!(f, "OverseerMessage::SpawnBlocking(..)")
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	}
}

/// A running instance of some [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M> {
Fedor Sakharov's avatar
Fedor Sakharov committed
	tx: mpsc::Sender<FromOverseer<M>>,
Fedor Sakharov's avatar
Fedor Sakharov committed
}

/// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or to spawn it's [`SubsystemJob`]s.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
#[derive(Debug)]
pub struct OverseerSubsystemContext<M>{
Fedor Sakharov's avatar
Fedor Sakharov committed
	rx: mpsc::Receiver<FromOverseer<M>>,
	tx: mpsc::Sender<ToOverseer>,
}

#[async_trait::async_trait]
impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
	type Message = M;

	async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
Fedor Sakharov's avatar
Fedor Sakharov committed
		match poll!(self.rx.next()) {
			Poll::Ready(Some(msg)) => Ok(Some(msg)),
			Poll::Ready(None) => Err(()),
			Poll::Pending => Ok(None),
		}
	}

	async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
		self.rx.next().await
			.ok_or(SubsystemError::Context(
				"No more messages in rx queue to process"
				.to_owned()
			))
	async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
		-> SubsystemResult<()>
	{
Fedor Sakharov's avatar
Fedor Sakharov committed
		self.tx.send(ToOverseer::SpawnJob {
Fedor Sakharov's avatar
Fedor Sakharov committed
			s,
		}).await.map_err(Into::into)
	async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
		-> SubsystemResult<()>
	{
		self.tx.send(ToOverseer::SpawnBlockingJob {
			name,
			s,
		}).await.map_err(Into::into)
	async fn send_message(&mut self, msg: AllMessages) -> bool {
		self.send_and_log_error(ToOverseer::SubsystemMessage(msg)).await
	async fn send_messages<T>(&mut self, msgs: T) -> bool
		where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
	{
		const SEND_TIMEOUT: Duration = Duration::from_millis(1000);
		let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
		match self.tx.send_all(&mut msgs).timeout(SEND_TIMEOUT).await {
			None => {
				tracing::error!(target: LOG_TARGET, "Outgoing messages timeout reached");
				false
			}
			Some(Err(_)) => {
				tracing::debug!(
					target: LOG_TARGET,
					msg_type = std::any::type_name::<M>(),
					"Failed to send a messages to Overseer",
				);

				// We return `true` here because errors indicate no change of re-try.
				true
			}
			Some(Ok(_)) => true,
		}
	}
}

impl<M> OverseerSubsystemContext<M> {
	async fn send_and_log_error(&mut self, msg: ToOverseer) -> bool {
		const SEND_TIMEOUT: Duration = Duration::from_millis(500);

		match self.tx.send(msg).timeout(SEND_TIMEOUT).await {
			None => {
				tracing::error!(target: LOG_TARGET, "Outgoing message timeout reached");
				false
			}
			Some(Err(_)) => {
				tracing::debug!(
					target: LOG_TARGET,
					msg_type = std::any::type_name::<M>(),
					"Failed to send a message to Overseer",
				);

				// We return `true` here because errors indicate no chance of re-try.
				true
			}
			Some(Ok(_)) => true,
Fedor Sakharov's avatar
Fedor Sakharov committed
	}
}

/// A subsystem that we oversee.
///
/// Ties together the [`Subsystem`] itself and it's running instance
/// (which may be missing if the [`Subsystem`] is not running at the moment
/// for whatever reason).
///
/// [`Subsystem`]: trait.Subsystem.html
struct OverseenSubsystem<M> {
Fedor Sakharov's avatar
Fedor Sakharov committed
	instance: Option<SubsystemInstance<M>>,
}

impl<M> OverseenSubsystem<M> {
	/// Send a message to the wrapped subsystem.
	///
	/// If the inner `instance` is `None`, nothing is happening.
	async fn send_message(&mut self, msg: M) -> SubsystemResult<()> {
		const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);

		if let Some(ref mut instance) = self.instance {
			match instance.tx.send(
				FromOverseer::Communication { msg }
			).timeout(MESSAGE_TIMEOUT).await
			{
				None => {
					tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
					Err(SubsystemError::SubsystemStalled(instance.name))
				}
				Some(res) => res.map_err(Into::into),
			}
		} else {
			Ok(())
		}
	}

	/// Send a signal to the wrapped subsystem.
	///
	/// If the inner `instance` is `None`, nothing is happening.
	async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
		const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10);

		if let Some(ref mut instance) = self.instance {
			match instance.tx.send(FromOverseer::Signal(signal)).timeout(SIGNAL_TIMEOUT).await {
				None => {
					tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
					Err(SubsystemError::SubsystemStalled(instance.name))
				}
				Some(res) => res.map_err(Into::into),
			}
		} else {
			Ok(())
Fedor Sakharov's avatar
Fedor Sakharov committed
/// The `Overseer` itself.
pub struct Overseer<S> {
	/// A candidate validation subsystem.
	candidate_validation_subsystem: OverseenSubsystem<CandidateValidationMessage>,
Fedor Sakharov's avatar
Fedor Sakharov committed

	/// A candidate backing subsystem.
	candidate_backing_subsystem: OverseenSubsystem<CandidateBackingMessage>,
Fedor Sakharov's avatar
Fedor Sakharov committed

	/// A candidate selection subsystem.
	candidate_selection_subsystem: OverseenSubsystem<CandidateSelectionMessage>,

	/// A statement distribution subsystem.
	statement_distribution_subsystem: OverseenSubsystem<StatementDistributionMessage>,

	/// An availability distribution subsystem.
	availability_distribution_subsystem: OverseenSubsystem<AvailabilityDistributionMessage>,

	/// A bitfield signing subsystem.
	bitfield_signing_subsystem: OverseenSubsystem<BitfieldSigningMessage>,

	/// A bitfield distribution subsystem.
	bitfield_distribution_subsystem: OverseenSubsystem<BitfieldDistributionMessage>,

	/// A provisioner subsystem.
	provisioner_subsystem: OverseenSubsystem<ProvisionerMessage>,

	/// A PoV distribution subsystem.
	pov_distribution_subsystem: OverseenSubsystem<PoVDistributionMessage>,

	/// A runtime API subsystem.
	runtime_api_subsystem: OverseenSubsystem<RuntimeApiMessage>,

	/// An availability store subsystem.
	availability_store_subsystem: OverseenSubsystem<AvailabilityStoreMessage>,

	/// A network bridge subsystem.
	network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>,

	/// A Chain API subsystem.
	chain_api_subsystem: OverseenSubsystem<ChainApiMessage>,
	/// A Collation Generation subsystem.
	collation_generation_subsystem: OverseenSubsystem<CollationGenerationMessage>,

	/// A Collator Protocol subsystem.
	collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>,

Fedor Sakharov's avatar
Fedor Sakharov committed
	/// Spawner to spawn tasks to.
	s: S,

	/// Here we keep handles to spawned subsystems to be notified when they terminate.
	running_subsystems: FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>,
Fedor Sakharov's avatar
Fedor Sakharov committed

	/// Gather running subsystms' outbound streams into one.
	running_subsystems_rx: StreamUnordered<mpsc::Receiver<ToOverseer>>,

	/// Events that are sent to the overseer from the outside world
	events_rx: mpsc::Receiver<Event>,
	/// External listeners waiting for a hash to be in the active-leave set.
	activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
	/// A set of leaves that `Overseer` starts working with.
	///
	/// Drained at the beginning of `run` and never used again.
	leaves: Vec<(Hash, BlockNumber)>,

	/// The set of the "active leaves".
	active_leaves: HashMap<Hash, BlockNumber>,

	/// Various Prometheus metrics.
	metrics: Metrics,
/// This struct is passed as an argument to create a new instance of an [`Overseer`].
///
/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows
/// mocking in the test code:
///
/// Each [`Subsystem`] is supposed to implement some interface that is generic over
/// message type that is specific to this [`Subsystem`]. At the moment not all
/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`].
pub struct AllSubsystems<
	CV = (), CB = (), CS = (), SD = (), AD = (), BS = (), BD = (), P = (),
	PoVD = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = ()
> {
	/// A candidate validation subsystem.
	pub candidate_validation: CV,
	/// A candidate backing subsystem.
	pub candidate_backing: CB,
	/// A candidate selection subsystem.
	pub candidate_selection: CS,
	/// A statement distribution subsystem.
	pub statement_distribution: SD,
	/// An availability distribution subsystem.
	pub availability_distribution: AD,
	/// A bitfield signing subsystem.
	pub bitfield_signing: BS,
	/// A bitfield distribution subsystem.
	pub bitfield_distribution: BD,
	/// A provisioner subsystem.
	pub provisioner: P,
	/// A PoV distribution subsystem.
	pub pov_distribution: PoVD,
	/// A runtime API subsystem.
	pub runtime_api: RA,
	/// An availability store subsystem.
	pub availability_store: AS,
	/// A network bridge subsystem.
	pub network_bridge: NB,
	/// A Chain API subsystem.
	pub chain_api: CA,
	/// A Collation Generation subsystem.
	pub collation_generation: CG,
	/// A Collator Protocol subsystem.
	pub collator_protocol: CP,
572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983
impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
	AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>
{
	/// Create a new instance of [`AllSubsystems`].
	///
	/// Each subsystem is set to [`DummySystem`].
	///
	///# Note
	///
	/// Because of a bug in rustc it is required that when calling this function,
	/// you provide a "random" type for the first generic parameter:
	///
	/// ```
	/// polkadot_overseer::AllSubsystems::<()>::dummy();
	/// ```
	pub fn dummy() -> AllSubsystems<
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem,
		DummySubsystem
	> {
		AllSubsystems {
			candidate_validation: DummySubsystem,
			candidate_backing: DummySubsystem,
			candidate_selection: DummySubsystem,
			statement_distribution: DummySubsystem,
			availability_distribution: DummySubsystem,
			bitfield_signing: DummySubsystem,
			bitfield_distribution: DummySubsystem,
			provisioner: DummySubsystem,
			pov_distribution: DummySubsystem,
			runtime_api: DummySubsystem,
			availability_store: DummySubsystem,
			network_bridge: DummySubsystem,
			chain_api: DummySubsystem,
			collation_generation: DummySubsystem,
			collator_protocol: DummySubsystem,
		}
	}

	/// Replace the `candidate_validation` instance in `self`.
	pub fn replace_candidate_validation<NEW>(
		self,
		candidate_validation: NEW,
	) -> AllSubsystems<NEW, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `candidate_backing` instance in `self`.
	pub fn replace_candidate_backing<NEW>(
		self,
		candidate_backing: NEW,
	) -> AllSubsystems<CV, NEW, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `candidate_selection` instance in `self`.
	pub fn replace_candidate_selection<NEW>(
		self,
		candidate_selection: NEW,
	) -> AllSubsystems<CV, CB, NEW, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `statement_distribution` instance in `self`.
	pub fn replace_statement_distribution<NEW>(
		self,
		statement_distribution: NEW,
	) -> AllSubsystems<CV, CB, CS, NEW, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `availability_distribution` instance in `self`.
	pub fn replace_availability_distribution<NEW>(
		self,
		availability_distribution: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, NEW, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `bitfield_signing` instance in `self`.
	pub fn replace_bitfield_signing<NEW>(
		self,
		bitfield_signing: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, NEW, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `bitfield_distribution` instance in `self`.
	pub fn replace_bitfield_distribution<NEW>(
		self,
		bitfield_distribution: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, BS, NEW, P, PoVD, RA, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `provisioner` instance in `self`.
	pub fn replace_provisioner<NEW>(
		self,
		provisioner: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, NEW, PoVD, RA, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `pov_distribution` instance in `self`.
	pub fn replace_pov_distribution<NEW>(
		self,
		pov_distribution: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, NEW, RA, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `runtime_api` instance in `self`.
	pub fn replace_runtime_api<NEW>(
		self,
		runtime_api: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, NEW, AS, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `availability_store` instance in `self`.
	pub fn replace_availability_store<NEW>(
		self,
		availability_store: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, NEW, NB, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `network_bridge` instance in `self`.
	pub fn replace_network_bridge<NEW>(
		self,
		network_bridge: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NEW, CA, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `chain_api` instance in `self`.
	pub fn replace_chain_api<NEW>(
		self,
		chain_api: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, NEW, CG, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api,
			collation_generation: self.collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `collation_generation` instance in `self`.
	pub fn replace_collation_generation<NEW>(
		self,
		collation_generation: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, NEW, CP> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation,
			collator_protocol: self.collator_protocol,
		}
	}

	/// Replace the `collator_protocol` instance in `self`.
	pub fn replace_collator_protocol<NEW>(
		self,
		collator_protocol: NEW,
	) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, NEW> {
		AllSubsystems {
			candidate_validation: self.candidate_validation,
			candidate_backing: self.candidate_backing,
			candidate_selection: self.candidate_selection,
			statement_distribution: self.statement_distribution,
			availability_distribution: self.availability_distribution,
			bitfield_signing: self.bitfield_signing,
			bitfield_distribution: self.bitfield_distribution,
			provisioner: self.provisioner,
			pov_distribution: self.pov_distribution,
			runtime_api: self.runtime_api,
			availability_store: self.availability_store,
			network_bridge: self.network_bridge,
			chain_api: self.chain_api,
			collation_generation: self.collation_generation,
			collator_protocol,
		}
	}
}

/// Overseer Prometheus metrics.
#[derive(Clone)]
struct MetricsInner {
	activated_heads_total: prometheus::Counter<prometheus::U64>,
	deactivated_heads_total: prometheus::Counter<prometheus::U64>,
	messages_relayed_total: prometheus::Counter<prometheus::U64>,
}

#[derive(Default, Clone)]
struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_head_activated(&self) {
		if let Some(metrics) = &self.0 {
			metrics.activated_heads_total.inc();
		}
	}