lib.rs 74.4 KiB
Newer Older
			handler.block_imported(second_block).await;
			handler.block_imported(third_block).await;

			let expected_heartbeats = vec![
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)),
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: [second_block_hash].as_ref().into(),
					deactivated: [first_block_hash].as_ref().into(),
				}),
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: [third_block_hash].as_ref().into(),
					deactivated: [second_block_hash].as_ref().into(),
				}),
			];

			loop {
				select! {
					res = overseer_fut => {
						assert!(res.is_ok());
						break;
					},
					res = rx_5.next() => {
						if let Some(res) = res {
							ss5_results.push(res);
						}
					}
					res = rx_6.next() => {
						if let Some(res) = res {
							ss6_results.push(res);
						}
					}
					complete => break,
				}

				if ss5_results.len() == expected_heartbeats.len() &&
					ss6_results.len() == expected_heartbeats.len() {
						handler.stop().await;
				}
			}

			assert_eq!(ss5_results, expected_heartbeats);
			assert_eq!(ss6_results, expected_heartbeats);
		});
	}

	// Tests that starting with a defined set of leaves and receiving
	// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
	#[test]
	fn overseer_finalize_works() {
		let spawner = sp_core::testing::TaskExecutor::new();

		executor::block_on(async move {
			let first_block_hash = [1; 32].into();
			let second_block_hash = [2; 32].into();
			let third_block_hash = [3; 32].into();

			let first_block = BlockInfo {
				hash: first_block_hash,
				parent_hash: [0; 32].into(),
				number: 1,
			};
			let second_block = BlockInfo {
				hash: second_block_hash,
				parent_hash: [42; 32].into(),
				number: 2,
			};
			let third_block = BlockInfo {
				hash: third_block_hash,
				parent_hash: second_block_hash,
				number: 3,
			};

			let (tx_5, mut rx_5) = mpsc::channel(64);
			let (tx_6, mut rx_6) = mpsc::channel(64);

			let all_subsystems = AllSubsystems::<()>::dummy()
				.replace_candidate_validation(TestSubsystem5(tx_5))
				.replace_candidate_backing(TestSubsystem6(tx_6));

			// start with two forks of different height.
			let (overseer, mut handler) = Overseer::new(
				vec![first_block, second_block],
				None,
				spawner,
			).unwrap();

			let overseer_fut = overseer.run().fuse();
			pin_mut!(overseer_fut);

			let mut ss5_results = Vec::new();
			let mut ss6_results = Vec::new();

			// this should stop work on both forks we started with earlier.
			handler.block_finalized(third_block).await;

			let expected_heartbeats = vec![
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: [first_block_hash, second_block_hash].as_ref().into(),
					..Default::default()
				}),
				OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					deactivated: [first_block_hash, second_block_hash].as_ref().into(),
					..Default::default()
				}),
				OverseerSignal::BlockFinalized(third_block_hash, 3),
			];

			loop {
				select! {
					res = overseer_fut => {
						assert!(res.is_ok());
						break;
					},
					res = rx_5.next() => {
						if let Some(res) = res {
							ss5_results.push(res);
						}
					}
					res = rx_6.next() => {
						if let Some(res) = res {
							ss6_results.push(res);
						}
					}
					complete => break,
				}

				if ss5_results.len() == expected_heartbeats.len() &&
					ss6_results.len() == expected_heartbeats.len() {
						handler.stop().await;
				}
			}

			assert_eq!(ss5_results.len(), expected_heartbeats.len());
			assert_eq!(ss6_results.len(), expected_heartbeats.len());

			// Notifications on finality for multiple blocks at once
			// may be received in different orders.
			for expected in expected_heartbeats {
				assert!(ss5_results.contains(&expected));
				assert!(ss6_results.contains(&expected));
			}
		});
	}

	#[derive(Clone)]
	struct CounterSubsystem {
		stop_signals_received: Arc<atomic::AtomicUsize>,
		signals_received: Arc<atomic::AtomicUsize>,
		msgs_received: Arc<atomic::AtomicUsize>,
	}

	impl CounterSubsystem {
		fn new(
			stop_signals_received: Arc<atomic::AtomicUsize>,
			signals_received: Arc<atomic::AtomicUsize>,
			msgs_received: Arc<atomic::AtomicUsize>,
		) -> Self {
			Self {
				stop_signals_received,
				signals_received,
				msgs_received,
			}
		}
	}

	impl<C, M> Subsystem<C> for CounterSubsystem
		where
			C: SubsystemContext<Message=M>,
			M: Send,
	{
		fn start(self, mut ctx: C) -> SpawnedSubsystem {
			SpawnedSubsystem {
				name: "counter-subsystem",
				future: Box::pin(async move {
					loop {
						match ctx.try_recv().await {
							Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
								self.stop_signals_received.fetch_add(1, atomic::Ordering::SeqCst);
								break;
							},
							Ok(Some(FromOverseer::Signal(_))) => {
								self.signals_received.fetch_add(1, atomic::Ordering::SeqCst);
								continue;
							},
							Ok(Some(FromOverseer::Communication { .. })) => {
								self.msgs_received.fetch_add(1, atomic::Ordering::SeqCst);
								continue;
							},
							Err(_) => (),
							_ => (),
						}
						pending!();
					}
				}),
			}
		}
	}

	fn test_candidate_validation_msg() -> CandidateValidationMessage {
		let (sender, _) = oneshot::channel();
		let pov = Arc::new(PoV { block_data: BlockData(Vec::new()) });
		CandidateValidationMessage::ValidateFromChainState(Default::default(), pov, sender)
	}

	fn test_candidate_backing_msg() -> CandidateBackingMessage {
		let (sender, _) = oneshot::channel();
		CandidateBackingMessage::GetBackedCandidates(Default::default(), Vec::new(), sender)
	}

	fn test_candidate_selection_msg() -> CandidateSelectionMessage {
		CandidateSelectionMessage::default()
	}

	fn test_chain_api_msg() -> ChainApiMessage {
		let (sender, _) = oneshot::channel();
		ChainApiMessage::FinalizedBlockNumber(sender)
	}

ordian's avatar
ordian committed
	fn test_collator_generation_msg() -> CollationGenerationMessage {
		CollationGenerationMessage::Initialize(CollationGenerationConfig {
			key: CollatorPair::generate().0,
			collator: Box::new(|_, _| TestCollator.boxed()),
ordian's avatar
ordian committed
			para_id: Default::default(),
		})
	}
	struct TestCollator;

	impl Future for TestCollator {
		type Output = Option<Collation>;
ordian's avatar
ordian committed

		fn poll(self: Pin<&mut Self>, _cx: &mut futures::task::Context) -> Poll<Self::Output> {
			panic!("at the Disco")
		}
	}

	impl Unpin for TestCollator {}

	fn test_collator_protocol_msg() -> CollatorProtocolMessage {
		CollatorProtocolMessage::CollateOn(Default::default())
	}

	fn test_network_bridge_event<M>() -> NetworkBridgeEvent<M> {
		NetworkBridgeEvent::PeerDisconnected(PeerId::random())
	}

	fn test_statement_distribution_msg() -> StatementDistributionMessage {
		StatementDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
	}

	fn test_availability_distribution_msg() -> AvailabilityDistributionMessage {
		AvailabilityDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
	}

	fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
		BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
	}

	fn test_provisioner_msg() -> ProvisionerMessage {
		let (sender, _) = oneshot::channel();
		ProvisionerMessage::RequestInherentData(Default::default(), sender)
	}

	fn test_pov_distribution_msg() -> PoVDistributionMessage {
		PoVDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
	}

	fn test_runtime_api_msg() -> RuntimeApiMessage {
		let (sender, _) = oneshot::channel();
		RuntimeApiMessage::Request(Default::default(), RuntimeApiRequest::Validators(sender))
	}

	fn test_availability_store_msg() -> AvailabilityStoreMessage {
		let (sender, _) = oneshot::channel();
		AvailabilityStoreMessage::QueryAvailableData(CandidateHash(Default::default()), sender)
	}

	fn test_network_bridge_msg() -> NetworkBridgeMessage {
		NetworkBridgeMessage::ReportPeer(PeerId::random(), ReputationChange::new(42, ""))
	}

	// Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly.
	#[test]
	fn overseer_all_subsystems_receive_signals_and_messages() {
		let spawner = sp_core::testing::TaskExecutor::new();

		executor::block_on(async move {
			let stop_signals_received = Arc::new(atomic::AtomicUsize::new(0));
			let signals_received = Arc::new(atomic::AtomicUsize::new(0));
			let msgs_received = Arc::new(atomic::AtomicUsize::new(0));

			let subsystem = CounterSubsystem::new(
				stop_signals_received.clone(),
				signals_received.clone(),
				msgs_received.clone(),
			);

			let all_subsystems = AllSubsystems {
				candidate_validation: subsystem.clone(),
				candidate_backing: subsystem.clone(),
				candidate_selection: subsystem.clone(),
ordian's avatar
ordian committed
				collation_generation: subsystem.clone(),
				collator_protocol: subsystem.clone(),
				statement_distribution: subsystem.clone(),
				availability_distribution: subsystem.clone(),
				bitfield_signing: subsystem.clone(),
				bitfield_distribution: subsystem.clone(),
				provisioner: subsystem.clone(),
				pov_distribution: subsystem.clone(),
				runtime_api: subsystem.clone(),
				availability_store: subsystem.clone(),
				network_bridge: subsystem.clone(),
				chain_api: subsystem.clone(),
			};
			let (overseer, mut handler) = Overseer::new(
				vec![],
				all_subsystems,
				None,
				spawner,
			).unwrap();
			let overseer_fut = overseer.run().fuse();

			pin_mut!(overseer_fut);

			// send a signal to each subsystem
			handler.block_imported(BlockInfo {
				hash: Default::default(),
				parent_hash: Default::default(),
				number: Default::default(),

			// send a msg to each subsystem
			// except for BitfieldSigning as the message is not instantiable
			handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
			handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
			handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await;
			handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
			handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
			handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
			handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await;
			// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
			handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
			handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await;
			handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await;
			handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
			handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
			handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
			handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await;

			// send a stop signal to each subsystems
			handler.stop().await;
ordian's avatar
ordian committed
					const NUM_SUBSYSTEMS: usize = 15;

					assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
					// x2 because of broadcast_signal on startup
					assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS);
					// -1 for BitfieldSigning
					assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);

					assert!(res.is_ok());
				},
				complete => (),
			}
		});
	}
Fedor Sakharov's avatar
Fedor Sakharov committed
}