lib.rs 51.7 KiB
Newer Older
	s: impl Subsystem<OverseerSubsystemContext<M>>,
Fedor Sakharov's avatar
Fedor Sakharov committed
) -> SubsystemResult<OverseenSubsystem<M>> {
	let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
	let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
	let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx };
	let SpawnedSubsystem { future, name } = s.start(ctx);
Fedor Sakharov's avatar
Fedor Sakharov committed

	let (tx, rx) = oneshot::channel();

	let fut = Box::pin(async move {
		future.await;
		let _ = tx.send(());
	});

	spawner.spawn(name, fut);
Fedor Sakharov's avatar
Fedor Sakharov committed

	streams.push(from_rx);
	futures.push(Box::pin(rx.map(|_| ())));
Fedor Sakharov's avatar
Fedor Sakharov committed

	let instance = Some(SubsystemInstance {
		tx: to_tx,
	});

	Ok(OverseenSubsystem {
		instance,
	})
}

Fedor Sakharov's avatar
Fedor Sakharov committed
#[cfg(test)]
mod tests {
Fedor Sakharov's avatar
Fedor Sakharov committed
	use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};
	use polkadot_primitives::v1::{BlockData, PoV};
	use polkadot_subsystem::DummySubsystem;
	use polkadot_subsystem::messages::RuntimeApiRequest;

	use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent};

Fedor Sakharov's avatar
Fedor Sakharov committed
	use super::*;

Fedor Sakharov's avatar
Fedor Sakharov committed
	struct TestSubsystem1(mpsc::Sender<usize>);

	impl<C> Subsystem<C> for TestSubsystem1
		where C: SubsystemContext<Message=CandidateValidationMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let mut sender = self.0;
			SpawnedSubsystem {
				name: "test-subsystem-1",
				future: Box::pin(async move {
					let mut i = 0;
					loop {
						match ctx.recv().await {
							Ok(FromOverseer::Communication { .. }) => {
								let _ = sender.send(i).await;
								i += 1;
								continue;
							}
							Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
							Err(_) => return,
							_ => (),
Fedor Sakharov's avatar
Fedor Sakharov committed
						}
					}
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	}

	struct TestSubsystem2(mpsc::Sender<usize>);

	impl<C> Subsystem<C> for TestSubsystem2
		where C: SubsystemContext<Message=CandidateBackingMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let sender = self.0.clone();
			SpawnedSubsystem {
				name: "test-subsystem-2",
				future: Box::pin(async move {
					let _sender = sender;
					let mut c: usize = 0;
					loop {
						if c < 10 {
							let (tx, _) = oneshot::channel();
							ctx.send_message(
								AllMessages::CandidateValidation(
									CandidateValidationMessage::ValidateFromChainState(
										Default::default(),
										PoV {
											block_data: BlockData(Vec::new()),
										}.into(),
										tx,
									)
Fedor Sakharov's avatar
Fedor Sakharov committed
							continue;
						}
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
								break;
							}
							Ok(Some(_)) => {
								continue;
							}
							Err(_) => return,
							_ => (),
						}
						pending!();
Fedor Sakharov's avatar
Fedor Sakharov committed
					}
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	}

	struct TestSubsystem4;

	impl<C> Subsystem<C> for TestSubsystem4
		where C: SubsystemContext<Message=CandidateBackingMessage>
	{
		fn start(self, mut _ctx: C) -> SpawnedSubsystem {
			SpawnedSubsystem {
				name: "test-subsystem-4",
				future: Box::pin(async move {
					// Do nothing and exit.
				}),
			}
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	}

	// Checks that a minimal configuration of two jobs can run and exchange messages.
	#[test]
	fn overseer_works() {
		let spawner = sp_core::testing::TaskExecutor::new();
Fedor Sakharov's avatar
Fedor Sakharov committed

		executor::block_on(async move {
			let (s1_tx, mut s1_rx) = mpsc::channel(64);
			let (s2_tx, mut s2_rx) = mpsc::channel(64);

			let all_subsystems = AllSubsystems {
				candidate_validation: TestSubsystem1(s1_tx),
				candidate_backing: TestSubsystem2(s2_tx),
				candidate_selection: DummySubsystem,
				collator_protocol: DummySubsystem,
				statement_distribution: DummySubsystem,
				availability_distribution: DummySubsystem,
				bitfield_signing: DummySubsystem,
				bitfield_distribution: DummySubsystem,
				provisioner: DummySubsystem,
				pov_distribution: DummySubsystem,
				runtime_api: DummySubsystem,
				availability_store: DummySubsystem,
				network_bridge: DummySubsystem,
				chain_api: DummySubsystem,
Fedor Sakharov's avatar
Fedor Sakharov committed
			let (overseer, mut handler) = Overseer::new(
Fedor Sakharov's avatar
Fedor Sakharov committed
				spawner,
			).unwrap();
			let overseer_fut = overseer.run().fuse();

			pin_mut!(overseer_fut);

			let mut s1_results = Vec::new();
			let mut s2_results = Vec::new();

			loop {
				select! {
					a = overseer_fut => break,
					s1_next = s1_rx.next() => {
						match s1_next {
							Some(msg) => {
								s1_results.push(msg);
								if s1_results.len() == 10 {
									handler.stop().await.unwrap();
								}
							}
							None => break,
						}
					},
					s2_next = s2_rx.next() => {
						match s2_next {
							Some(msg) => s2_results.push(s2_next),
							None => break,
						}
					},
					complete => break,
				}
			}

			assert_eq!(s1_results, (0..10).collect::<Vec<_>>());
		});
	}

	// Spawn a subsystem that immediately exits.
	//
	// Should immediately conclude the overseer itself with an error.
	#[test]
	fn overseer_panics_on_subsystem_exit() {
		let spawner = sp_core::testing::TaskExecutor::new();
Fedor Sakharov's avatar
Fedor Sakharov committed

		executor::block_on(async move {
			let (s1_tx, _) = mpsc::channel(64);
			let all_subsystems = AllSubsystems {
				candidate_validation: TestSubsystem1(s1_tx),
				candidate_backing: TestSubsystem4,
				candidate_selection: DummySubsystem,
				collator_protocol: DummySubsystem,
				statement_distribution: DummySubsystem,
				availability_distribution: DummySubsystem,
				bitfield_signing: DummySubsystem,
				bitfield_distribution: DummySubsystem,
				provisioner: DummySubsystem,
				pov_distribution: DummySubsystem,
				runtime_api: DummySubsystem,
				availability_store: DummySubsystem,
				network_bridge: DummySubsystem,
				chain_api: DummySubsystem,
Fedor Sakharov's avatar
Fedor Sakharov committed
			let (overseer, _handle) = Overseer::new(
Fedor Sakharov's avatar
Fedor Sakharov committed
				spawner,
			).unwrap();
			let overseer_fut = overseer.run().fuse();
			pin_mut!(overseer_fut);

			select! {
				res = overseer_fut => assert!(res.is_err()),
				complete => (),
			}
		})
	}

	struct TestSubsystem5(mpsc::Sender<OverseerSignal>);

	impl<C> Subsystem<C> for TestSubsystem5
		where C: SubsystemContext<Message=CandidateValidationMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let mut sender = self.0.clone();

			SpawnedSubsystem {
				name: "test-subsystem-5",
				future: Box::pin(async move {
					loop {
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
							Ok(Some(FromOverseer::Signal(s))) => {
								sender.send(s).await.unwrap();
								continue;
							},
							Ok(Some(_)) => continue,
							Err(_) => return,
							_ => (),
						}
						pending!();
		}
	}

	struct TestSubsystem6(mpsc::Sender<OverseerSignal>);

	impl<C> Subsystem<C> for TestSubsystem6
		where C: SubsystemContext<Message=CandidateBackingMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let mut sender = self.0.clone();

			SpawnedSubsystem {
				name: "test-subsystem-6",
				future: Box::pin(async move {
					loop {
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
							Ok(Some(FromOverseer::Signal(s))) => {
								sender.send(s).await.unwrap();
								continue;
							},
							Ok(Some(_)) => continue,
							Err(_) => return,
							_ => (),
						}
						pending!();
		}
	}

	// Tests that starting with a defined set of leaves and receiving
	// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
	#[test]
	fn overseer_start_stop_works() {
		let spawner = sp_core::testing::TaskExecutor::new();

		executor::block_on(async move {
			let first_block_hash = [1; 32].into();
			let second_block_hash = [2; 32].into();
			let third_block_hash = [3; 32].into();

			let first_block = BlockInfo {
				hash: first_block_hash,
				parent_hash: [0; 32].into(),
				number: 1,
			};
			let second_block = BlockInfo {
				hash: second_block_hash,
				parent_hash: first_block_hash,
				number: 2,
			};
			let third_block = BlockInfo {
				hash: third_block_hash,
				parent_hash: second_block_hash,
				number: 3,
			};

			let (tx_5, mut rx_5) = mpsc::channel(64);
			let (tx_6, mut rx_6) = mpsc::channel(64);
			let all_subsystems = AllSubsystems {
				candidate_validation: TestSubsystem5(tx_5),
				candidate_backing: TestSubsystem6(tx_6),
				candidate_selection: DummySubsystem,
				collator_protocol: DummySubsystem,
				statement_distribution: DummySubsystem,
				availability_distribution: DummySubsystem,
				bitfield_signing: DummySubsystem,
				bitfield_distribution: DummySubsystem,
				provisioner: DummySubsystem,
				pov_distribution: DummySubsystem,
				runtime_api: DummySubsystem,
				availability_store: DummySubsystem,
				network_bridge: DummySubsystem,
				chain_api: DummySubsystem,
			let (overseer, mut handler) = Overseer::new(
				vec![first_block],
				spawner,
			).unwrap();

			let overseer_fut = overseer.run().fuse();
			pin_mut!(overseer_fut);

			let mut ss5_results = Vec::new();
			let mut ss6_results = Vec::new();

			handler.block_imported(second_block).await.unwrap();
			handler.block_imported(third_block).await.unwrap();

			let expected_heartbeats = vec![
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)),
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: [second_block_hash].as_ref().into(),
					deactivated: [first_block_hash].as_ref().into(),
				}),
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: [third_block_hash].as_ref().into(),
					deactivated: [second_block_hash].as_ref().into(),
				}),
			];

			loop {
				select! {
					res = overseer_fut => {
						assert!(res.is_ok());
						break;
					},
					res = rx_5.next() => {
						if let Some(res) = res {
							ss5_results.push(res);
						}
					}
					res = rx_6.next() => {
						if let Some(res) = res {
							ss6_results.push(res);
						}
					}
					complete => break,
				}

				if ss5_results.len() == expected_heartbeats.len() &&
					ss6_results.len() == expected_heartbeats.len() {
						handler.stop().await.unwrap();
				}
			}

			assert_eq!(ss5_results, expected_heartbeats);
			assert_eq!(ss6_results, expected_heartbeats);
		});
	}

	// Tests that starting with a defined set of leaves and receiving
	// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
	#[test]
	fn overseer_finalize_works() {
		let spawner = sp_core::testing::TaskExecutor::new();

		executor::block_on(async move {
			let first_block_hash = [1; 32].into();
			let second_block_hash = [2; 32].into();
			let third_block_hash = [3; 32].into();

			let first_block = BlockInfo {
				hash: first_block_hash,
				parent_hash: [0; 32].into(),
				number: 1,
			};
			let second_block = BlockInfo {
				hash: second_block_hash,
				parent_hash: [42; 32].into(),
				number: 2,
			};
			let third_block = BlockInfo {
				hash: third_block_hash,
				parent_hash: second_block_hash,
				number: 3,
			};

			let (tx_5, mut rx_5) = mpsc::channel(64);
			let (tx_6, mut rx_6) = mpsc::channel(64);

			let all_subsystems = AllSubsystems {
				candidate_validation: TestSubsystem5(tx_5),
				candidate_backing: TestSubsystem6(tx_6),
				candidate_selection: DummySubsystem,
				collator_protocol: DummySubsystem,
				statement_distribution: DummySubsystem,
				availability_distribution: DummySubsystem,
				bitfield_signing: DummySubsystem,
				bitfield_distribution: DummySubsystem,
				provisioner: DummySubsystem,
				pov_distribution: DummySubsystem,
				runtime_api: DummySubsystem,
				availability_store: DummySubsystem,
				network_bridge: DummySubsystem,
				chain_api: DummySubsystem,
			// start with two forks of different height.
			let (overseer, mut handler) = Overseer::new(
				vec![first_block, second_block],
				spawner,
			).unwrap();

			let overseer_fut = overseer.run().fuse();
			pin_mut!(overseer_fut);

			let mut ss5_results = Vec::new();
			let mut ss6_results = Vec::new();

			// this should stop work on both forks we started with earlier.
			handler.block_finalized(third_block).await.unwrap();

			let expected_heartbeats = vec![
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: [first_block_hash, second_block_hash].as_ref().into(),
					..Default::default()
				}),
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					deactivated: [first_block_hash, second_block_hash].as_ref().into(),
					..Default::default()
				}),
				OverseerSignal::BlockFinalized(third_block_hash),
			];

			loop {
				select! {
					res = overseer_fut => {
						assert!(res.is_ok());
						break;
					},
					res = rx_5.next() => {
						if let Some(res) = res {
							ss5_results.push(res);
						}
					}
					res = rx_6.next() => {
						if let Some(res) = res {
							ss6_results.push(res);
						}
					}
					complete => break,
				}

				if ss5_results.len() == expected_heartbeats.len() &&
					ss6_results.len() == expected_heartbeats.len() {
						handler.stop().await.unwrap();
				}
			}

			assert_eq!(ss5_results.len(), expected_heartbeats.len());
			assert_eq!(ss6_results.len(), expected_heartbeats.len());

			// Notifications on finality for multiple blocks at once
			// may be received in different orders.
			for expected in expected_heartbeats {
				assert!(ss5_results.contains(&expected));
				assert!(ss6_results.contains(&expected));
			}
		});
	}

	#[derive(Clone)]
	struct CounterSubsystem {
		stop_signals_received: Arc<atomic::AtomicUsize>,
		signals_received: Arc<atomic::AtomicUsize>,
		msgs_received: Arc<atomic::AtomicUsize>,
	}

	impl CounterSubsystem {
		fn new(
			stop_signals_received: Arc<atomic::AtomicUsize>,
			signals_received: Arc<atomic::AtomicUsize>,
			msgs_received: Arc<atomic::AtomicUsize>,
		) -> Self {
			Self {
				stop_signals_received,
				signals_received,
				msgs_received,
			}
		}
	}

	impl<C, M> Subsystem<C> for CounterSubsystem
		where
			C: SubsystemContext<Message=M>,
			M: Send,
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			SpawnedSubsystem {
				name: "counter-subsystem",
				future: Box::pin(async move {
					loop {
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
								self.stop_signals_received.fetch_add(1, atomic::Ordering::SeqCst);
								break;
							},
							Ok(Some(FromOverseer::Signal(_))) => {
								self.signals_received.fetch_add(1, atomic::Ordering::SeqCst);
								continue;
							},
							Ok(Some(FromOverseer::Communication { .. })) => {
								self.msgs_received.fetch_add(1, atomic::Ordering::SeqCst);
								continue;
							},
							Err(_) => (),
							_ => (),
						}
						pending!();
					}
				}),
			}
		}
	}

	fn test_candidate_validation_msg() -> CandidateValidationMessage {
		let (sender, _) = oneshot::channel();
		let pov = Arc::new(PoV { block_data: BlockData(Vec::new()) });
		CandidateValidationMessage::ValidateFromChainState(Default::default(), pov, sender)
	}

	fn test_candidate_backing_msg() -> CandidateBackingMessage {
		let (sender, _) = oneshot::channel();
		CandidateBackingMessage::GetBackedCandidates(Default::default(), sender)
	}

	fn test_candidate_selection_msg() -> CandidateSelectionMessage {
		CandidateSelectionMessage::default()
	}

	fn test_chain_api_msg() -> ChainApiMessage {
		let (sender, _) = oneshot::channel();
		ChainApiMessage::FinalizedBlockNumber(sender)
	}

	fn test_collator_protocol_msg() -> CollatorProtocolMessage {
		CollatorProtocolMessage::CollateOn(Default::default())
	}

	fn test_network_bridge_event<M>() -> NetworkBridgeEvent<M> {
		NetworkBridgeEvent::PeerDisconnected(PeerId::random())
	}

	fn test_statement_distribution_msg() -> StatementDistributionMessage {
		StatementDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
	}

	fn test_availability_distribution_msg() -> AvailabilityDistributionMessage {
		AvailabilityDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
	}

	fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
		BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
	}

	fn test_provisioner_msg() -> ProvisionerMessage {
		let (sender, _) = oneshot::channel();
		ProvisionerMessage::RequestInherentData(Default::default(), sender)
	}

	fn test_pov_distribution_msg() -> PoVDistributionMessage {
		PoVDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
	}

	fn test_runtime_api_msg() -> RuntimeApiMessage {
		let (sender, _) = oneshot::channel();
		RuntimeApiMessage::Request(Default::default(), RuntimeApiRequest::Validators(sender))
	}

	fn test_availability_store_msg() -> AvailabilityStoreMessage {
		let (sender, _) = oneshot::channel();
		AvailabilityStoreMessage::QueryAvailableData(Default::default(), sender)
	}

	fn test_network_bridge_msg() -> NetworkBridgeMessage {
		NetworkBridgeMessage::ReportPeer(PeerId::random(), ReputationChange::new(42, ""))
	}

	// Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly.
	#[test]
	fn overseer_all_subsystems_receive_signals_and_messages() {
		let spawner = sp_core::testing::TaskExecutor::new();

		executor::block_on(async move {
			let stop_signals_received = Arc::new(atomic::AtomicUsize::new(0));
			let signals_received = Arc::new(atomic::AtomicUsize::new(0));
			let msgs_received = Arc::new(atomic::AtomicUsize::new(0));

			let subsystem = CounterSubsystem::new(
				stop_signals_received.clone(),
				signals_received.clone(),
				msgs_received.clone(),
			);

			let all_subsystems = AllSubsystems {
				candidate_validation: subsystem.clone(),
				candidate_backing: subsystem.clone(),
				candidate_selection: subsystem.clone(),
				collator_protocol: subsystem.clone(),
				statement_distribution: subsystem.clone(),
				availability_distribution: subsystem.clone(),
				bitfield_signing: subsystem.clone(),
				bitfield_distribution: subsystem.clone(),
				provisioner: subsystem.clone(),
				pov_distribution: subsystem.clone(),
				runtime_api: subsystem.clone(),
				availability_store: subsystem.clone(),
				network_bridge: subsystem.clone(),
				chain_api: subsystem.clone(),
			};
			let (overseer, mut handler) = Overseer::new(
				vec![],
				all_subsystems,
				spawner,
			).unwrap();
			let overseer_fut = overseer.run().fuse();

			pin_mut!(overseer_fut);

			// send a signal to each subsystem
			handler.block_imported(BlockInfo {
				hash: Default::default(),
				parent_hash: Default::default(),
				number: Default::default(),
			}).await.unwrap();

			// send a msg to each subsystem
			// except for BitfieldSigning as the message is not instantiable
			handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap();
			handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await.unwrap();
			handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await.unwrap();
			handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await.unwrap();
			handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await.unwrap();
			handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await.unwrap();
			// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await.unwrap();
			handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await.unwrap();
			handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await.unwrap();
			handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await.unwrap();
			handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await.unwrap();
			handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await.unwrap();
			handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await.unwrap();
			handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await.unwrap();

			// send a stop signal to each subsystems
			handler.stop().await.unwrap();

			select! {
				res = overseer_fut => {
					const NUM_SUBSYSTEMS: usize = 14;

					assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
					// x2 because of broadcast_signal on startup
					assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS);
					// -1 for BitfieldSigning
					assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);

					assert!(res.is_ok());
				},
				complete => (),
			}
		});
	}
Fedor Sakharov's avatar
Fedor Sakharov committed
}