lib.rs 74.4 KiB
Newer Older

	fn on_head_deactivated(&self) {
		if let Some(metrics) = &self.0 {
			metrics.deactivated_heads_total.inc();
		}
	}

	fn on_message_relayed(&self) {
		if let Some(metrics) = &self.0 {
			metrics.messages_relayed_total.inc();
		}
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			activated_heads_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_activated_heads_total",
					"Number of activated heads."
				)?,
				registry,
			)?,
			deactivated_heads_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_deactivated_heads_total",
					"Number of deactivated heads."
				)?,
				registry,
			)?,
			messages_relayed_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_messages_relayed_total",
					"Number of messages relayed by Overseer."
				)?,
				registry,
			)?,
		};
		Ok(Metrics(Some(metrics)))
	}
}

Fedor Sakharov's avatar
Fedor Sakharov committed
impl<S> Overseer<S>
where
Fedor Sakharov's avatar
Fedor Sakharov committed
{
	/// Create a new instance of the `Overseer` with a fixed set of [`Subsystem`]s.
Fedor Sakharov's avatar
Fedor Sakharov committed
	///
	/// ```text
	///                  +------------------------------------+
	///                  |            Overseer                |
	///                  +------------------------------------+
	///                    /            |             |      \
	///      ................. subsystems...................................
	///      . +-----------+    +-----------+   +----------+   +---------+ .
	///      . |           |    |           |   |          |   |         | .
	///      . +-----------+    +-----------+   +----------+   +---------+ .
	///      ...............................................................
	///                              |
	///                        probably `spawn`
	///                            a `job`
	///                              |
	///                              V
	///                         +-----------+
	///                         |           |
	///                         +-----------+
	///
	/// ```
	///
	/// [`Subsystem`]: trait.Subsystem.html
	///
	/// # Example
	///
	/// The [`Subsystems`] may be any type as long as they implement an expected interface.
	/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them.
	/// For the sake of simplicity the termination of the example is done with a timeout.
Fedor Sakharov's avatar
Fedor Sakharov committed
	/// ```
	/// # use std::time::Duration;
	/// # use futures::{executor, pin_mut, select, FutureExt};
	/// # use futures_timer::Delay;
	/// # use polkadot_overseer::{Overseer, AllSubsystems};
	/// # use polkadot_subsystem::{
	/// #     Subsystem, DummySubsystem, SpawnedSubsystem, SubsystemContext,
	/// #     messages::CandidateValidationMessage,
Fedor Sakharov's avatar
Fedor Sakharov committed
	/// # };
	///
	/// struct ValidationSubsystem;
	///
	/// impl<C> Subsystem<C> for ValidationSubsystem
	///     where C: SubsystemContext<Message=CandidateValidationMessage>
Fedor Sakharov's avatar
Fedor Sakharov committed
	///     fn start(
	///         mut ctx: C,
Fedor Sakharov's avatar
Fedor Sakharov committed
	///     ) -> SpawnedSubsystem {
	///         SpawnedSubsystem {
	///             name: "validation-subsystem",
	///             future: Box::pin(async move {
	///                 loop {
	///                     Delay::new(Duration::from_secs(1)).await;
	///                 }
	///             }),
	///         }
Fedor Sakharov's avatar
Fedor Sakharov committed
	///     }
	/// }
	///
	/// # fn main() { executor::block_on(async move {
	/// let spawner = sp_core::testing::TaskExecutor::new();
	/// let all_subsystems = AllSubsystems::<()>::dummy().replace_candidate_validation(ValidationSubsystem);
Fedor Sakharov's avatar
Fedor Sakharov committed
	/// let (overseer, _handler) = Overseer::new(
	///     None,
Fedor Sakharov's avatar
Fedor Sakharov committed
	///     spawner,
	/// ).unwrap();
	///
	/// let timer = Delay::new(Duration::from_millis(50)).fuse();
	///
	/// let overseer_fut = overseer.run().fuse();
	/// pin_mut!(timer);
	/// pin_mut!(overseer_fut);
	///
	/// select! {
	///     _ = overseer_fut => (),
	///     _ = timer => (),
	/// }
	/// #
	/// # }); }
	/// ```
	pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>(
		leaves: impl IntoIterator<Item = BlockInfo>,
		all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>,
		prometheus_registry: Option<&prometheus::Registry>,
Fedor Sakharov's avatar
Fedor Sakharov committed
		mut s: S,
	) -> SubsystemResult<(Self, OverseerHandler)>
	where
		CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
		CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
		CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send,
		SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send,
		AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send,
		BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>> + Send,
		BD: Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>> + Send,
		P: Subsystem<OverseerSubsystemContext<ProvisionerMessage>> + Send,
		PoVD: Subsystem<OverseerSubsystemContext<PoVDistributionMessage>> + Send,
		RA: Subsystem<OverseerSubsystemContext<RuntimeApiMessage>> + Send,
		AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + Send,
		NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + Send,
		CA: Subsystem<OverseerSubsystemContext<ChainApiMessage>> + Send,
		CG: Subsystem<OverseerSubsystemContext<CollationGenerationMessage>> + Send,
		CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send,
Fedor Sakharov's avatar
Fedor Sakharov committed
		let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);

		let handler = OverseerHandler {
			events_tx: events_tx.clone(),
		};

		let mut running_subsystems_rx = StreamUnordered::new();
		let mut running_subsystems = FuturesUnordered::new();

		let candidate_validation_subsystem = spawn(
Fedor Sakharov's avatar
Fedor Sakharov committed
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.candidate_validation,
Fedor Sakharov's avatar
Fedor Sakharov committed
		)?;

		let candidate_backing_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.candidate_backing,
		)?;

		let candidate_selection_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.candidate_selection,
		)?;

		let statement_distribution_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.statement_distribution,
		)?;

		let availability_distribution_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.availability_distribution,
		)?;

		let bitfield_signing_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.bitfield_signing,
		)?;

		let bitfield_distribution_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.bitfield_distribution,
		)?;

		let provisioner_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.provisioner,
		)?;

		let pov_distribution_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.pov_distribution,
		)?;

		let runtime_api_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.runtime_api,
		)?;

		let availability_store_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.availability_store,
		)?;

		let network_bridge_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.network_bridge,
Fedor Sakharov's avatar
Fedor Sakharov committed
		)?;

		let chain_api_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.chain_api,
		)?;

		let collation_generation_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.collation_generation,
		)?;


		let collator_protocol_subsystem = spawn(
			&mut s,
			&mut running_subsystems,
			&mut running_subsystems_rx,
			all_subsystems.collator_protocol,
		)?;

		let leaves = leaves
			.into_iter()
			.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number))
			.collect();

		let active_leaves = HashMap::new();
		let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry)?;
		let activation_external_listeners = HashMap::new();
Fedor Sakharov's avatar
Fedor Sakharov committed
		let this = Self {
			candidate_validation_subsystem,
Fedor Sakharov's avatar
Fedor Sakharov committed
			candidate_backing_subsystem,
			candidate_selection_subsystem,
			statement_distribution_subsystem,
			availability_distribution_subsystem,
			bitfield_signing_subsystem,
			bitfield_distribution_subsystem,
			provisioner_subsystem,
			pov_distribution_subsystem,
			runtime_api_subsystem,
			availability_store_subsystem,
			network_bridge_subsystem,
			chain_api_subsystem,
			collation_generation_subsystem,
			collator_protocol_subsystem,
Fedor Sakharov's avatar
Fedor Sakharov committed
			s,
			running_subsystems,
			running_subsystems_rx,
			events_rx,
			activation_external_listeners,
			leaves,
			active_leaves,
			metrics,
Fedor Sakharov's avatar
Fedor Sakharov committed
		};

		Ok((this, handler))
	}

	// Stop the overseer.
	async fn stop(mut self) {
		let _ = self.candidate_validation_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.candidate_backing_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.candidate_selection_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.statement_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.availability_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.bitfield_signing_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.bitfield_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.provisioner_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.pov_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.runtime_api_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.availability_store_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.network_bridge_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.chain_api_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.collator_protocol_subsystem.send_signal(OverseerSignal::Conclude).await;
		let _ = self.collation_generation_subsystem.send_signal(OverseerSignal::Conclude).await;
Fedor Sakharov's avatar
Fedor Sakharov committed
		let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();

		loop {
			select! {
				_ = self.running_subsystems.next() => {
Fedor Sakharov's avatar
Fedor Sakharov committed
					if self.running_subsystems.is_empty() {
						break;
					}
				},
				_ = stop_delay => break,
				complete => break,
			}
		}
	}

	/// Run the `Overseer`.
	#[tracing::instrument(skip(self), fields(subsystem = LOG_TARGET))]
Fedor Sakharov's avatar
Fedor Sakharov committed
	pub async fn run(mut self) -> SubsystemResult<()> {
		let mut update = ActiveLeavesUpdate::default();
		for (hash, number) in std::mem::take(&mut self.leaves) {
			update.activated.push(hash);
			let _ = self.active_leaves.insert(hash, number);
			self.on_head_activated(&hash);
		self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;

Fedor Sakharov's avatar
Fedor Sakharov committed
		loop {
			select! {
				msg = self.events_rx.next().fuse() => {
					let msg = if let Some(msg) = msg {
						msg
					} else {
						continue
					};

					match msg {
						Event::MsgToSubsystem(msg) => {
							self.route_message(msg).await?;
						}
						Event::Stop => {
							self.stop().await;
							return Ok(());
						}
						Event::BlockImported(block) => {
							self.block_imported(block).await?;
						}
						Event::BlockFinalized(block) => {
							self.block_finalized(block).await?;
						}
						Event::ExternalRequest(request) => {
							self.handle_external_request(request);
						}
Fedor Sakharov's avatar
Fedor Sakharov committed
					}
				},
				msg = self.running_subsystems_rx.next().fuse() => {
					let msg = if let Some((StreamYield::Item(msg), _)) = msg {
						msg
					} else {
						continue
					};

					match msg {
						ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await?,
						ToOverseer::SpawnJob { name, s } => {
							self.spawn_job(name, s);
						}
						ToOverseer::SpawnBlockingJob { name, s } => {
							self.spawn_blocking_job(name, s);
						}
				},
				res = self.running_subsystems.next().fuse() => {
					let finished = if let Some(finished) = res {
						finished
					} else {
						continue
					};

					tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly");
					self.stop().await;
					return finished;
				},
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
		let mut update = ActiveLeavesUpdate::default();

		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
			if let Some(expected_parent_number) = block.number.checked_sub(1) {
				debug_assert_eq!(expected_parent_number, number);
			}
			update.deactivated.push(block.parent_hash);
			self.on_head_deactivated(&block.parent_hash);
		match self.active_leaves.entry(block.hash) {
			hash_map::Entry::Vacant(entry) => {
				update.activated.push(block.hash);
				let _ = entry.insert(block.number);
				self.on_head_activated(&block.hash);
			},
			hash_map::Entry::Occupied(entry) => {
				debug_assert_eq!(*entry.get(), block.number);
			}
		self.clean_up_external_listeners();

		self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;

	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
		let mut update = ActiveLeavesUpdate::default();
		self.active_leaves.retain(|h, n| {
			if *n <= block.number {
		for deactivated in &update.deactivated {
			self.on_head_deactivated(deactivated)
		}

		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
		// broadcast `ActiveLeavesUpdate` even if empty to issue view updates
		self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
		self.candidate_validation_subsystem.send_signal(signal.clone()).await?;
		self.candidate_backing_subsystem.send_signal(signal.clone()).await?;
		self.candidate_selection_subsystem.send_signal(signal.clone()).await?;
		self.statement_distribution_subsystem.send_signal(signal.clone()).await?;
		self.availability_distribution_subsystem.send_signal(signal.clone()).await?;
		self.bitfield_signing_subsystem.send_signal(signal.clone()).await?;
		self.bitfield_distribution_subsystem.send_signal(signal.clone()).await?;
		self.provisioner_subsystem.send_signal(signal.clone()).await?;
		self.pov_distribution_subsystem.send_signal(signal.clone()).await?;
		self.runtime_api_subsystem.send_signal(signal.clone()).await?;
		self.availability_store_subsystem.send_signal(signal.clone()).await?;
		self.network_bridge_subsystem.send_signal(signal.clone()).await?;
		self.chain_api_subsystem.send_signal(signal.clone()).await?;
		self.collator_protocol_subsystem.send_signal(signal.clone()).await?;
		self.collation_generation_subsystem.send_signal(signal).await?;
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
		self.metrics.on_message_relayed();
Fedor Sakharov's avatar
Fedor Sakharov committed
		match msg {
			AllMessages::CandidateValidation(msg) => {
				self.candidate_validation_subsystem.send_message(msg).await?;
Fedor Sakharov's avatar
Fedor Sakharov committed
			AllMessages::CandidateBacking(msg) => {
				self.candidate_backing_subsystem.send_message(msg).await?;
			AllMessages::CandidateSelection(msg) => {
				self.candidate_selection_subsystem.send_message(msg).await?;
			AllMessages::StatementDistribution(msg) => {
				self.statement_distribution_subsystem.send_message(msg).await?;
			AllMessages::AvailabilityDistribution(msg) => {
				self.availability_distribution_subsystem.send_message(msg).await?;
			AllMessages::BitfieldDistribution(msg) => {
				self.bitfield_distribution_subsystem.send_message(msg).await?;
			AllMessages::BitfieldSigning(msg) => {
				self.bitfield_signing_subsystem.send_message(msg).await?;
			AllMessages::Provisioner(msg) => {
				self.provisioner_subsystem.send_message(msg).await?;
			AllMessages::PoVDistribution(msg) => {
				self.pov_distribution_subsystem.send_message(msg).await?;
			AllMessages::RuntimeApi(msg) => {
				self.runtime_api_subsystem.send_message(msg).await?;
			AllMessages::AvailabilityStore(msg) => {
				self.availability_store_subsystem.send_message(msg).await?;
			AllMessages::NetworkBridge(msg) => {
				self.network_bridge_subsystem.send_message(msg).await?;
			AllMessages::ChainApi(msg) => {
				self.chain_api_subsystem.send_message(msg).await?;
			AllMessages::CollationGeneration(msg) => {
				self.collation_generation_subsystem.send_message(msg).await?;
			AllMessages::CollatorProtocol(msg) => {
				self.collator_protocol_subsystem.send_message(msg).await?;
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	fn on_head_activated(&mut self, hash: &Hash) {
		self.metrics.on_head_activated();
		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
			for listener in listeners {
				// it's fine if the listener is no longer interested
				let _ = listener.send(Ok(()));
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	fn on_head_deactivated(&mut self, hash: &Hash) {
		self.metrics.on_head_deactivated();
		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
			// clean up and signal to listeners the block is deactivated
			drop(listeners);
		}
	}

	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	fn clean_up_external_listeners(&mut self) {
		self.activation_external_listeners.retain(|_, v| {
			// remove dead listeners
			v.retain(|c| !c.is_canceled());
			!v.is_empty()
		})
	}

	#[tracing::instrument(level = "trace", skip(self, request), fields(subsystem = LOG_TARGET))]
	fn handle_external_request(&mut self, request: ExternalRequest) {
		match request {
			ExternalRequest::WaitForActivation { hash, response_channel } => {
				if self.active_leaves.get(&hash).is_some() {
					// it's fine if the listener is no longer interested
					let _ = response_channel.send(Ok(()));
				} else {
					self.activation_external_listeners.entry(hash).or_default().push(response_channel);
				}
			}
		}
	}

	fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
		self.s.spawn(name, j);
Fedor Sakharov's avatar
Fedor Sakharov committed
	}

	fn spawn_blocking_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
		self.s.spawn_blocking(name, j);
	}
fn spawn<S: SpawnNamed, M: Send + 'static>(
Fedor Sakharov's avatar
Fedor Sakharov committed
	spawner: &mut S,
	futures: &mut FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>,
Fedor Sakharov's avatar
Fedor Sakharov committed
	streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
	s: impl Subsystem<OverseerSubsystemContext<M>>,
Fedor Sakharov's avatar
Fedor Sakharov committed
) -> SubsystemResult<OverseenSubsystem<M>> {
	let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
	let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
	let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx };
	let SpawnedSubsystem { future, name } = s.start(ctx);
Fedor Sakharov's avatar
Fedor Sakharov committed

	let (tx, rx) = oneshot::channel();

	let fut = Box::pin(async move {
		if let Err(e) = future.await {
			tracing::error!(subsystem=name, err = ?e, "subsystem exited with error");
			tracing::debug!(subsystem=name, "subsystem exited without an error");
		let _ = tx.send(());
	});

	spawner.spawn(name, fut);
Fedor Sakharov's avatar
Fedor Sakharov committed

	let _ = streams.push(from_rx);
	futures.push(Box::pin(rx.map(|e| { tracing::warn!(err = ?e, "dropping error"); Ok(()) })));
Fedor Sakharov's avatar
Fedor Sakharov committed

	let instance = Some(SubsystemInstance {
		tx: to_tx,
Fedor Sakharov's avatar
Fedor Sakharov committed
	});

	Ok(OverseenSubsystem {
		instance,
	})
}

Fedor Sakharov's avatar
Fedor Sakharov committed
#[cfg(test)]
mod tests {
	use std::collections::HashMap;
	use futures::{executor, pin_mut, select, channel::mpsc, FutureExt, pending};
	use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash};
	use polkadot_subsystem::messages::RuntimeApiRequest;
ordian's avatar
ordian committed
	use polkadot_node_primitives::{Collation, CollationGenerationConfig};
	use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent};

ordian's avatar
ordian committed
	use sp_core::crypto::Pair as _;

Fedor Sakharov's avatar
Fedor Sakharov committed
	use super::*;

	struct TestSubsystem1(mpsc::Sender<usize>);

	impl<C> Subsystem<C> for TestSubsystem1
		where C: SubsystemContext<Message=CandidateValidationMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let mut sender = self.0;
			SpawnedSubsystem {
				name: "test-subsystem-1",
				future: Box::pin(async move {
					let mut i = 0;
					loop {
						match ctx.recv().await {
							Ok(FromOverseer::Communication { .. }) => {
								let _ = sender.send(i).await;
								i += 1;
								continue;
							}
							Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
							Err(_) => return Ok(()),
Fedor Sakharov's avatar
Fedor Sakharov committed
						}
					}
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	}

	struct TestSubsystem2(mpsc::Sender<usize>);

	impl<C> Subsystem<C> for TestSubsystem2
		where C: SubsystemContext<Message=CandidateBackingMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let sender = self.0.clone();
			SpawnedSubsystem {
				name: "test-subsystem-2",
				future: Box::pin(async move {
					let _sender = sender;
					let mut c: usize = 0;
					loop {
						if c < 10 {
							let (tx, _) = oneshot::channel();
							ctx.send_message(
								AllMessages::CandidateValidation(
									CandidateValidationMessage::ValidateFromChainState(
										Default::default(),
										PoV {
											block_data: BlockData(Vec::new()),
										}.into(),
										tx,
									)
Fedor Sakharov's avatar
Fedor Sakharov committed
							continue;
						}
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
								break;
							}
							Ok(Some(_)) => {
								continue;
							}
Fedor Sakharov's avatar
Fedor Sakharov committed
					}
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	}

	struct TestSubsystem4;

	impl<C> Subsystem<C> for TestSubsystem4
		where C: SubsystemContext<Message=CandidateBackingMessage>
	{
		fn start(self, mut _ctx: C) -> SpawnedSubsystem {
			SpawnedSubsystem {
				name: "test-subsystem-4",
				future: Box::pin(async move {
					// Do nothing and exit.
Fedor Sakharov's avatar
Fedor Sakharov committed
	// Checks that a minimal configuration of two jobs can run and exchange messages.
	#[test]
	fn overseer_works() {
		let spawner = sp_core::testing::TaskExecutor::new();
Fedor Sakharov's avatar
Fedor Sakharov committed

		executor::block_on(async move {
			let (s1_tx, mut s1_rx) = mpsc::channel::<usize>(64);
			let (s2_tx, mut s2_rx) = mpsc::channel::<usize>(64);

			let all_subsystems = AllSubsystems::<()>::dummy()
				.replace_candidate_validation(TestSubsystem1(s1_tx))
				.replace_candidate_backing(TestSubsystem2(s2_tx));

Fedor Sakharov's avatar
Fedor Sakharov committed
			let (overseer, mut handler) = Overseer::new(
				None,
Fedor Sakharov's avatar
Fedor Sakharov committed
				spawner,
			).unwrap();
			let overseer_fut = overseer.run().fuse();

			pin_mut!(overseer_fut);

			let mut s1_results = Vec::new();
			let mut s2_results = Vec::new();

			loop {
				select! {
					_ = overseer_fut => break,
Fedor Sakharov's avatar
Fedor Sakharov committed
					s1_next = s1_rx.next() => {
						match s1_next {
							Some(msg) => {
								s1_results.push(msg);
								if s1_results.len() == 10 {
									handler.stop().await;
Fedor Sakharov's avatar
Fedor Sakharov committed
								}
							}
							None => break,
						}
					},
					s2_next = s2_rx.next() => {
						match s2_next {
							Some(_) => s2_results.push(s2_next),
Fedor Sakharov's avatar
Fedor Sakharov committed
							None => break,
						}
					},
					complete => break,
				}
			}

			assert_eq!(s1_results, (0..10).collect::<Vec<_>>());
		});
	}

	// Checks activated/deactivated metrics are updated properly.
	#[test]
	fn overseer_metrics_work() {
		let spawner = sp_core::testing::TaskExecutor::new();

		executor::block_on(async move {
			let first_block_hash = [1; 32].into();
			let second_block_hash = [2; 32].into();
			let third_block_hash = [3; 32].into();

			let first_block = BlockInfo {
				hash: first_block_hash,
				parent_hash: [0; 32].into(),
				number: 1,
			};
			let second_block = BlockInfo {
				hash: second_block_hash,
				parent_hash: first_block_hash,
				number: 2,
			};
			let third_block = BlockInfo {
				hash: third_block_hash,
				parent_hash: second_block_hash,
				number: 3,
			};

			let all_subsystems = AllSubsystems::<()>::dummy();
			let registry = prometheus::Registry::new();
			let (overseer, mut handler) = Overseer::new(
				vec![first_block],
				all_subsystems,
				Some(&registry),
				spawner,
			).unwrap();
			let overseer_fut = overseer.run().fuse();

			pin_mut!(overseer_fut);

			handler.block_imported(second_block).await;
			handler.block_imported(third_block).await;
			handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
			handler.stop().await;

			select! {
				res = overseer_fut => {
					assert!(res.is_ok());
					let metrics = extract_metrics(&registry);
					assert_eq!(metrics["activated"], 3);
					assert_eq!(metrics["deactivated"], 2);
					assert_eq!(metrics["relayed"], 1);
	fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> {
		let gather = registry.gather();
		assert_eq!(gather[0].get_name(), "parachain_activated_heads_total");
		assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total");
		assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total");
		let activated = gather[0].get_metric()[0].get_counter().get_value() as u64;
		let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64;
		let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64;
		let mut result = HashMap::new();
		result.insert("activated", activated);
		result.insert("deactivated", deactivated);
		result.insert("relayed", relayed);
		result
Fedor Sakharov's avatar
Fedor Sakharov committed
	// Spawn a subsystem that immediately exits.
	//
	// Should immediately conclude the overseer itself with an error.
	#[test]
	fn overseer_panics_on_subsystem_exit() {
		let spawner = sp_core::testing::TaskExecutor::new();
Fedor Sakharov's avatar
Fedor Sakharov committed

		executor::block_on(async move {
			let (s1_tx, _) = mpsc::channel(64);
			let all_subsystems = AllSubsystems::<()>::dummy()
				.replace_candidate_validation(TestSubsystem1(s1_tx))
				.replace_candidate_backing(TestSubsystem4);
Fedor Sakharov's avatar
Fedor Sakharov committed
			let (overseer, _handle) = Overseer::new(
				None,
Fedor Sakharov's avatar
Fedor Sakharov committed
				spawner,
			).unwrap();
			let overseer_fut = overseer.run().fuse();
			pin_mut!(overseer_fut);

			select! {
				res = overseer_fut => assert!(res.is_err()),
				complete => (),
			}
		})
	}

	struct TestSubsystem5(mpsc::Sender<OverseerSignal>);

	impl<C> Subsystem<C> for TestSubsystem5
		where C: SubsystemContext<Message=CandidateValidationMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let mut sender = self.0.clone();

			SpawnedSubsystem {
				name: "test-subsystem-5",
				future: Box::pin(async move {
					loop {
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
							Ok(Some(FromOverseer::Signal(s))) => {
								sender.send(s).await.unwrap();
								continue;
							},
							Ok(Some(_)) => continue,
		}
	}

	struct TestSubsystem6(mpsc::Sender<OverseerSignal>);

	impl<C> Subsystem<C> for TestSubsystem6
		where C: SubsystemContext<Message=CandidateBackingMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let mut sender = self.0.clone();

			SpawnedSubsystem {
				name: "test-subsystem-6",
				future: Box::pin(async move {
					loop {
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
							Ok(Some(FromOverseer::Signal(s))) => {
								sender.send(s).await.unwrap();
								continue;
							},
							Ok(Some(_)) => continue,
		}
	}

	// Tests that starting with a defined set of leaves and receiving
	// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
	#[test]
	fn overseer_start_stop_works() {
		let spawner = sp_core::testing::TaskExecutor::new();

		executor::block_on(async move {
			let first_block_hash = [1; 32].into();
			let second_block_hash = [2; 32].into();
			let third_block_hash = [3; 32].into();

			let first_block = BlockInfo {
				hash: first_block_hash,
				parent_hash: [0; 32].into(),
				number: 1,
			};
			let second_block = BlockInfo {
				hash: second_block_hash,
				parent_hash: first_block_hash,
				number: 2,
			};
			let third_block = BlockInfo {
				hash: third_block_hash,
				parent_hash: second_block_hash,
				number: 3,
			};

			let (tx_5, mut rx_5) = mpsc::channel(64);
			let (tx_6, mut rx_6) = mpsc::channel(64);
			let all_subsystems = AllSubsystems::<()>::dummy()
				.replace_candidate_validation(TestSubsystem5(tx_5))
				.replace_candidate_backing(TestSubsystem6(tx_6));
			let (overseer, mut handler) = Overseer::new(
				vec![first_block],
				None,
				spawner,
			).unwrap();

			let overseer_fut = overseer.run().fuse();
			pin_mut!(overseer_fut);

			let mut ss5_results = Vec::new();
			let mut ss6_results = Vec::new();