util.rs 35.7 KiB
Newer Older
				};

				// most jobs will have a request-response cycle at the heart of their run loop.
				// however, in this case, we never receive valid messages, so we may as well
				// just send all of our (mock) output messages now
				let mock_output = run_args.remove(&parent).unwrap_or_default();
				let mut stream = stream::iter(mock_output.into_iter().map(Ok));
				sender.send_all(&mut stream).await?;

				// it isn't necessary to break run_loop into its own function,
				// but it's convenient to separate the concerns in this way
				job.run_loop().await
			}.boxed()
		}
	}

	impl FakeCandidateSelectionJob {
		async fn run_loop(mut self) -> Result<(), Error> {
			while let Some(msg) = self.receiver.next().await {
				match msg {
					ToJob::CandidateSelection(_csm) => {
						unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
					}
					ToJob::Stop => break,
				}
			}

			Ok(())
		}
	}

	// with the job defined, it's straightforward to get a subsystem implementation.
	type FakeCandidateSelectionSubsystem<Spawner, Context> = JobManager<Spawner, Context, FakeCandidateSelectionJob>;

	// this type lets us pretend to be the overseer
	type OverseerHandle = test_helpers::TestSubsystemContextHandle<CandidateSelectionMessage>;

	fn test_harness<T: Future<Output=()>>(run_args: HashMap<Hash, Vec<FromJob>>, test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T) {
		let pool = sp_core::testing::TaskExecutor::new();
		let (context, overseer_handle) = make_subsystem_context(pool.clone());
		let (err_tx, err_rx) = mpsc::channel(16);

		let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, pool, Some(err_tx));
		let test_future = test(overseer_handle, err_rx);
		let timeout = Delay::new(Duration::from_secs(2));

		futures::pin_mut!(test_future);
		futures::pin_mut!(subsystem);
		futures::pin_mut!(timeout);

		executor::block_on(async move {
			futures::select! {
				_ = test_future.fuse() => (),
				_ = subsystem.fuse() => (),
				_ = timeout.fuse() => panic!("test timed out instead of completing"),
			}
		});
	}

	#[test]
	fn starting_and_stopping_job_works() {
		let relay_parent: Hash = [0; 32].into();
		let mut run_args = HashMap::new();
		let test_message = format!("greetings from {}", relay_parent);
		run_args.insert(relay_parent.clone(), vec![FromJob::Test(test_message.clone())]);

		test_harness(run_args, |mut overseer_handle, err_rx| async move {
			overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(relay_parent)))).await;
			assert_matches!(
				overseer_handle.recv().await,
				AllMessages::Test(msg) if msg == test_message
			);
			overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(relay_parent)))).await;

			let errs: Vec<_> = err_rx.collect().await;
			assert_eq!(errs.len(), 0);
		});
	}

	#[test]
	fn stopping_non_running_job_fails() {
		let relay_parent: Hash = [0; 32].into();
		let run_args = HashMap::new();

		test_harness(run_args, |mut overseer_handle, err_rx| async move {
			overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(relay_parent)))).await;

			let errs: Vec<_> = err_rx.collect().await;
			assert_eq!(errs.len(), 1);
			assert_eq!(errs[0].0, Some(relay_parent));
			assert_matches!(
				errs[0].1,
				JobsError::Utility(util::Error::JobNotFound(match_relay_parent)) if relay_parent == match_relay_parent
			);
		});
	}

	#[test]
	fn test_subsystem_impl_and_name_derivation() {
		let pool = sp_core::testing::TaskExecutor::new();
		let (context, _) = make_subsystem_context::<CandidateSelectionMessage, _>(pool.clone());

		let SpawnedSubsystem { name, .. } = FakeCandidateSelectionSubsystem::new(
			pool,
			HashMap::new(),
		).start(context);
		assert_eq!(name, "FakeCandidateSelection");
	}