lib.rs 44.2 KiB
Newer Older

	let fut = Box::pin(async move {
		future.await;
		let _ = tx.send(());
	});

	spawner.spawn(name, fut);
Fedor Sakharov's avatar
Fedor Sakharov committed

	streams.push(from_rx);
	futures.push(Box::pin(rx.map(|_| ())));
Fedor Sakharov's avatar
Fedor Sakharov committed

	let instance = Some(SubsystemInstance {
		tx: to_tx,
	});

	Ok(OverseenSubsystem {
		instance,
	})
}

Fedor Sakharov's avatar
Fedor Sakharov committed
#[cfg(test)]
mod tests {
	use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};
	use polkadot_primitives::v1::{BlockData, PoV};
	use polkadot_subsystem::DummySubsystem;
Fedor Sakharov's avatar
Fedor Sakharov committed
	use super::*;

Fedor Sakharov's avatar
Fedor Sakharov committed
	struct TestSubsystem1(mpsc::Sender<usize>);

	impl<C> Subsystem<C> for TestSubsystem1
		where C: SubsystemContext<Message=CandidateValidationMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let mut sender = self.0;
			SpawnedSubsystem {
				name: "test-subsystem-1",
				future: Box::pin(async move {
					let mut i = 0;
					loop {
						match ctx.recv().await {
							Ok(FromOverseer::Communication { .. }) => {
								let _ = sender.send(i).await;
								i += 1;
								continue;
							}
							Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
							Err(_) => return,
							_ => (),
Fedor Sakharov's avatar
Fedor Sakharov committed
						}
					}
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	}

	struct TestSubsystem2(mpsc::Sender<usize>);

	impl<C> Subsystem<C> for TestSubsystem2
		where C: SubsystemContext<Message=CandidateBackingMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let sender = self.0.clone();
			SpawnedSubsystem {
				name: "test-subsystem-2",
				future: Box::pin(async move {
					let _sender = sender;
					let mut c: usize = 0;
					loop {
						if c < 10 {
							let (tx, _) = oneshot::channel();
							ctx.send_message(
								AllMessages::CandidateValidation(
									CandidateValidationMessage::ValidateFromChainState(
										Default::default(),
										PoV {
											block_data: BlockData(Vec::new()),
										}.into(),
										tx,
									)
Fedor Sakharov's avatar
Fedor Sakharov committed
							continue;
						}
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
								break;
							}
							Ok(Some(_)) => {
								continue;
							}
							Err(_) => return,
							_ => (),
						}
						pending!();
Fedor Sakharov's avatar
Fedor Sakharov committed
					}
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	}

	struct TestSubsystem4;

	impl<C> Subsystem<C> for TestSubsystem4
		where C: SubsystemContext<Message=CandidateBackingMessage>
	{
		fn start(self, mut _ctx: C) -> SpawnedSubsystem {
			SpawnedSubsystem {
				name: "test-subsystem-4",
				future: Box::pin(async move {
					// Do nothing and exit.
				}),
			}
Fedor Sakharov's avatar
Fedor Sakharov committed
		}
	}

	// Checks that a minimal configuration of two jobs can run and exchange messages.
	#[test]
	fn overseer_works() {
		let spawner = sp_core::testing::TaskExecutor::new();
Fedor Sakharov's avatar
Fedor Sakharov committed

		executor::block_on(async move {
			let (s1_tx, mut s1_rx) = mpsc::channel(64);
			let (s2_tx, mut s2_rx) = mpsc::channel(64);

			let all_subsystems = AllSubsystems {
				candidate_validation: TestSubsystem1(s1_tx),
				candidate_backing: TestSubsystem2(s2_tx),
				candidate_selection: DummySubsystem,
				collator_protocol: DummySubsystem,
				statement_distribution: DummySubsystem,
				availability_distribution: DummySubsystem,
				bitfield_signing: DummySubsystem,
				bitfield_distribution: DummySubsystem,
				provisioner: DummySubsystem,
				pov_distribution: DummySubsystem,
				runtime_api: DummySubsystem,
				availability_store: DummySubsystem,
				network_bridge: DummySubsystem,
				chain_api: DummySubsystem,
Fedor Sakharov's avatar
Fedor Sakharov committed
			let (overseer, mut handler) = Overseer::new(
Fedor Sakharov's avatar
Fedor Sakharov committed
				spawner,
			).unwrap();
			let overseer_fut = overseer.run().fuse();

			pin_mut!(overseer_fut);

			let mut s1_results = Vec::new();
			let mut s2_results = Vec::new();

			loop {
				select! {
					a = overseer_fut => break,
					s1_next = s1_rx.next() => {
						match s1_next {
							Some(msg) => {
								s1_results.push(msg);
								if s1_results.len() == 10 {
									handler.stop().await.unwrap();
								}
							}
							None => break,
						}
					},
					s2_next = s2_rx.next() => {
						match s2_next {
							Some(msg) => s2_results.push(s2_next),
							None => break,
						}
					},
					complete => break,
				}
			}

			assert_eq!(s1_results, (0..10).collect::<Vec<_>>());
		});
	}

	// Spawn a subsystem that immediately exits.
	//
	// Should immediately conclude the overseer itself with an error.
	#[test]
	fn overseer_panics_on_subsystem_exit() {
		let spawner = sp_core::testing::TaskExecutor::new();
Fedor Sakharov's avatar
Fedor Sakharov committed

		executor::block_on(async move {
			let (s1_tx, _) = mpsc::channel(64);
			let all_subsystems = AllSubsystems {
				candidate_validation: TestSubsystem1(s1_tx),
				candidate_backing: TestSubsystem4,
				candidate_selection: DummySubsystem,
				collator_protocol: DummySubsystem,
				statement_distribution: DummySubsystem,
				availability_distribution: DummySubsystem,
				bitfield_signing: DummySubsystem,
				bitfield_distribution: DummySubsystem,
				provisioner: DummySubsystem,
				pov_distribution: DummySubsystem,
				runtime_api: DummySubsystem,
				availability_store: DummySubsystem,
				network_bridge: DummySubsystem,
				chain_api: DummySubsystem,
Fedor Sakharov's avatar
Fedor Sakharov committed
			let (overseer, _handle) = Overseer::new(
Fedor Sakharov's avatar
Fedor Sakharov committed
				spawner,
			).unwrap();
			let overseer_fut = overseer.run().fuse();
			pin_mut!(overseer_fut);

			select! {
				res = overseer_fut => assert!(res.is_err()),
				complete => (),
			}
		})
	}

	struct TestSubsystem5(mpsc::Sender<OverseerSignal>);

	impl<C> Subsystem<C> for TestSubsystem5
		where C: SubsystemContext<Message=CandidateValidationMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let mut sender = self.0.clone();

			SpawnedSubsystem {
				name: "test-subsystem-5",
				future: Box::pin(async move {
					loop {
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
							Ok(Some(FromOverseer::Signal(s))) => {
								sender.send(s).await.unwrap();
								continue;
							},
							Ok(Some(_)) => continue,
							Err(_) => return,
							_ => (),
						}
						pending!();
		}
	}

	struct TestSubsystem6(mpsc::Sender<OverseerSignal>);

	impl<C> Subsystem<C> for TestSubsystem6
		where C: SubsystemContext<Message=CandidateBackingMessage>
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			let mut sender = self.0.clone();

			SpawnedSubsystem {
				name: "test-subsystem-6",
				future: Box::pin(async move {
					loop {
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
							Ok(Some(FromOverseer::Signal(s))) => {
								sender.send(s).await.unwrap();
								continue;
							},
							Ok(Some(_)) => continue,
							Err(_) => return,
							_ => (),
						}
						pending!();
		}
	}

	// Tests that starting with a defined set of leaves and receiving
	// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
	#[test]
	fn overseer_start_stop_works() {
		let spawner = sp_core::testing::TaskExecutor::new();

		executor::block_on(async move {
			let first_block_hash = [1; 32].into();
			let second_block_hash = [2; 32].into();
			let third_block_hash = [3; 32].into();

			let first_block = BlockInfo {
				hash: first_block_hash,
				parent_hash: [0; 32].into(),
				number: 1,
			};
			let second_block = BlockInfo {
				hash: second_block_hash,
				parent_hash: first_block_hash,
				number: 2,
			};
			let third_block = BlockInfo {
				hash: third_block_hash,
				parent_hash: second_block_hash,
				number: 3,
			};

			let (tx_5, mut rx_5) = mpsc::channel(64);
			let (tx_6, mut rx_6) = mpsc::channel(64);
			let all_subsystems = AllSubsystems {
				candidate_validation: TestSubsystem5(tx_5),
				candidate_backing: TestSubsystem6(tx_6),
				candidate_selection: DummySubsystem,
				collator_protocol: DummySubsystem,
				statement_distribution: DummySubsystem,
				availability_distribution: DummySubsystem,
				bitfield_signing: DummySubsystem,
				bitfield_distribution: DummySubsystem,
				provisioner: DummySubsystem,
				pov_distribution: DummySubsystem,
				runtime_api: DummySubsystem,
				availability_store: DummySubsystem,
				network_bridge: DummySubsystem,
				chain_api: DummySubsystem,
			let (overseer, mut handler) = Overseer::new(
				vec![first_block],
				spawner,
			).unwrap();

			let overseer_fut = overseer.run().fuse();
			pin_mut!(overseer_fut);

			let mut ss5_results = Vec::new();
			let mut ss6_results = Vec::new();

			handler.block_imported(second_block).await.unwrap();
			handler.block_imported(third_block).await.unwrap();

			let expected_heartbeats = vec![
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)),
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: [second_block_hash].as_ref().into(),
					deactivated: [first_block_hash].as_ref().into(),
				}),
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: [third_block_hash].as_ref().into(),
					deactivated: [second_block_hash].as_ref().into(),
				}),
			];

			loop {
				select! {
					res = overseer_fut => {
						assert!(res.is_ok());
						break;
					},
					res = rx_5.next() => {
						if let Some(res) = res {
							ss5_results.push(res);
						}
					}
					res = rx_6.next() => {
						if let Some(res) = res {
							ss6_results.push(res);
						}
					}
					complete => break,
				}

				if ss5_results.len() == expected_heartbeats.len() &&
					ss6_results.len() == expected_heartbeats.len() {
						handler.stop().await.unwrap();
				}
			}

			assert_eq!(ss5_results, expected_heartbeats);
			assert_eq!(ss6_results, expected_heartbeats);
		});
	}

	// Tests that starting with a defined set of leaves and receiving
	// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
	#[test]
	fn overseer_finalize_works() {
		let spawner = sp_core::testing::TaskExecutor::new();

		executor::block_on(async move {
			let first_block_hash = [1; 32].into();
			let second_block_hash = [2; 32].into();
			let third_block_hash = [3; 32].into();

			let first_block = BlockInfo {
				hash: first_block_hash,
				parent_hash: [0; 32].into(),
				number: 1,
			};
			let second_block = BlockInfo {
				hash: second_block_hash,
				parent_hash: [42; 32].into(),
				number: 2,
			};
			let third_block = BlockInfo {
				hash: third_block_hash,
				parent_hash: second_block_hash,
				number: 3,
			};

			let (tx_5, mut rx_5) = mpsc::channel(64);
			let (tx_6, mut rx_6) = mpsc::channel(64);

			let all_subsystems = AllSubsystems {
				candidate_validation: TestSubsystem5(tx_5),
				candidate_backing: TestSubsystem6(tx_6),
				candidate_selection: DummySubsystem,
				collator_protocol: DummySubsystem,
				statement_distribution: DummySubsystem,
				availability_distribution: DummySubsystem,
				bitfield_signing: DummySubsystem,
				bitfield_distribution: DummySubsystem,
				provisioner: DummySubsystem,
				pov_distribution: DummySubsystem,
				runtime_api: DummySubsystem,
				availability_store: DummySubsystem,
				network_bridge: DummySubsystem,
				chain_api: DummySubsystem,
			// start with two forks of different height.
			let (overseer, mut handler) = Overseer::new(
				vec![first_block, second_block],
				spawner,
			).unwrap();

			let overseer_fut = overseer.run().fuse();
			pin_mut!(overseer_fut);

			let mut ss5_results = Vec::new();
			let mut ss6_results = Vec::new();

			// this should stop work on both forks we started with earlier.
			handler.block_finalized(third_block).await.unwrap();

			let expected_heartbeats = vec![
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: [first_block_hash, second_block_hash].as_ref().into(),
					..Default::default()
				}),
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					deactivated: [first_block_hash, second_block_hash].as_ref().into(),
					..Default::default()
				}),
				OverseerSignal::BlockFinalized(third_block_hash),
			];

			loop {
				select! {
					res = overseer_fut => {
						assert!(res.is_ok());
						break;
					},
					res = rx_5.next() => {
						if let Some(res) = res {
							ss5_results.push(res);
						}
					}
					res = rx_6.next() => {
						if let Some(res) = res {
							ss6_results.push(res);
						}
					}
					complete => break,
				}

				if ss5_results.len() == expected_heartbeats.len() &&
					ss6_results.len() == expected_heartbeats.len() {
						handler.stop().await.unwrap();
				}
			}

			assert_eq!(ss5_results.len(), expected_heartbeats.len());
			assert_eq!(ss6_results.len(), expected_heartbeats.len());

			// Notifications on finality for multiple blocks at once
			// may be received in different orders.
			for expected in expected_heartbeats {
				assert!(ss5_results.contains(&expected));
				assert!(ss6_results.contains(&expected));
			}
		});
	}
Fedor Sakharov's avatar
Fedor Sakharov committed
}