lib.rs 40.9 KiB
Newer Older
	// job structs are constructed within JobTrait::run
	// most will want to retain the sender and receiver, as well as whatever other data they like
	struct FakeCandidateSelectionJob {
		receiver: mpsc::Receiver<ToJob>,
	}

	// ToJob implementations require the following properties:
	//
	// - have a Stop variant (to impl ToJobTrait)
	// - impl ToJobTrait
	// - impl TryFrom<AllMessages>
	// - impl From<CandidateSelectionMessage> (from SubsystemContext::Message)
	//
	// Mostly, they are just a type-safe subset of AllMessages that this job is prepared to receive
	enum ToJob {
		CandidateSelection(CandidateSelectionMessage),
		Stop,
	}

	impl ToJobTrait for ToJob {
		const STOP: Self = ToJob::Stop;

		fn relay_parent(&self) -> Option<Hash> {
			match self {
				Self::CandidateSelection(csm) => csm.relay_parent(),
				Self::Stop => None,
			}
		}
	}

	impl TryFrom<AllMessages> for ToJob {
		type Error = ();

		fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
			match msg {
				AllMessages::CandidateSelection(csm) => Ok(ToJob::CandidateSelection(csm)),
				_ => Err(()),
			}
		}
	}

	impl From<CandidateSelectionMessage> for ToJob {
		fn from(csm: CandidateSelectionMessage) -> ToJob {
			ToJob::CandidateSelection(csm)
		}
	}

	// FromJob must be infallibly convertable into AllMessages.
	//
	// It exists to be a type-safe subset of AllMessages that this job is specified to send.
	//
	// Note: the Clone impl here is not generally required; it's just ueful for this test context because
	// we include it in the RunArgs
	#[derive(Clone)]
	enum FromJob {
	}

	impl From<FromJob> for AllMessages {
		fn from(from_job: FromJob) -> AllMessages {
			match from_job {
				FromJob::Test => AllMessages::CandidateSelection(CandidateSelectionMessage::default()),
			}
		}
	}

	// Error will mostly be a wrapper to make the try operator more convenient;
	// deriving From implementations for most variants is recommended.
	// It must implement Debug for logging.
	#[derive(Debug, derive_more::From)]
	enum Error {
		#[from]
		Sending(mpsc::SendError),
	}

	impl JobTrait for FakeCandidateSelectionJob {
		type ToJob = ToJob;
		type FromJob = FromJob;
		type Error = Error;
		// RunArgs can be anything that a particular job needs supplied from its external context
		// in order to create the Job. In this case, they're a hashmap of parents to the mock outputs
		// expected from that job.
		//
		// Note that it's not recommended to use something as heavy as a hashmap in production: the
		// RunArgs get cloned so that each job gets its own owned copy. If you need that, wrap it in
		// an Arc. Within a testing context, that efficiency is less important.
		type RunArgs = HashMap<Hash, Vec<FromJob>>;
		type Metrics = ();

		const NAME: &'static str = "FakeCandidateSelectionJob";

		/// Run a job for the parent block indicated
		//
		// this function is in charge of creating and executing the job's main loop
		fn run(
			parent: Hash,
			mut run_args: Self::RunArgs,
			_metrics: Self::Metrics,
			receiver: mpsc::Receiver<ToJob>,
			mut sender: mpsc::Sender<FromJob>,
		) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
			async move {
				let job = FakeCandidateSelectionJob { receiver };

				// most jobs will have a request-response cycle at the heart of their run loop.
				// however, in this case, we never receive valid messages, so we may as well
				// just send all of our (mock) output messages now
				let mock_output = run_args.remove(&parent).unwrap_or_default();
				let mut stream = stream::iter(mock_output.into_iter().map(Ok));
				sender.send_all(&mut stream).await?;

				// it isn't necessary to break run_loop into its own function,
				// but it's convenient to separate the concerns in this way
				job.run_loop().await
		}
	}

	impl FakeCandidateSelectionJob {
		async fn run_loop(mut self) -> Result<(), Error> {
			while let Some(msg) = self.receiver.next().await {
				match msg {
					ToJob::CandidateSelection(_csm) => {
						unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
					}
					ToJob::Stop => break,
				}
			}

			Ok(())
		}
	}

	// with the job defined, it's straightforward to get a subsystem implementation.
	type FakeCandidateSelectionSubsystem<Spawner, Context> =
		JobManager<Spawner, Context, FakeCandidateSelectionJob>;

	// this type lets us pretend to be the overseer
	type OverseerHandle = test_helpers::TestSubsystemContextHandle<CandidateSelectionMessage>;

	fn test_harness<T: Future<Output = ()>>(
		run_args: HashMap<Hash, Vec<FromJob>>,
		test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T,
	) {
ordian's avatar
ordian committed
		let _ = env_logger::builder()
			.is_test(true)
			.filter(
				None,
				log::LevelFilter::Trace,
			)
			.try_init();

		let pool = sp_core::testing::TaskExecutor::new();
		let (context, overseer_handle) = make_subsystem_context(pool.clone());
		let (err_tx, err_rx) = mpsc::channel(16);

		let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, (), pool, Some(err_tx));
		let test_future = test(overseer_handle, err_rx);
		let timeout = Delay::new(Duration::from_secs(2));

ordian's avatar
ordian committed
		futures::pin_mut!(subsystem, test_future, timeout);
		futures::executor::block_on(
			futures::future::select(
				// wait for both to finish
				futures::future::join(subsystem, test_future),
				timeout,
			).then(|either| match either {
				futures::future::Either::Right(_) => panic!("test timed out instead of completing"),
				futures::future::Either::Left(_) => futures::future::ready(()),
			})
		);
	}

	#[test]
	fn starting_and_stopping_job_works() {
		let relay_parent: Hash = [0; 32].into();
		let mut run_args = HashMap::new();
		run_args.insert(
			relay_parent.clone(),

		test_harness(run_args, |mut overseer_handle, err_rx| async move {
			overseer_handle
				.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
					ActiveLeavesUpdate::start_work(relay_parent),
				)))
				.await;
			assert_matches!(
				overseer_handle.recv().await,
			overseer_handle
				.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
					ActiveLeavesUpdate::stop_work(relay_parent),
				)))
				.await;

			let errs: Vec<_> = err_rx.collect().await;
			assert_eq!(errs.len(), 0);
		});
	}

	#[test]
	fn stopping_non_running_job_fails() {
		let relay_parent: Hash = [0; 32].into();
		let run_args = HashMap::new();

		test_harness(run_args, |mut overseer_handle, err_rx| async move {
			overseer_handle
				.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
					ActiveLeavesUpdate::stop_work(relay_parent),
				)))
				.await;

			let errs: Vec<_> = err_rx.collect().await;
			assert_eq!(errs.len(), 1);
			assert_eq!(errs[0].0, Some(relay_parent));
			assert_matches!(
				errs[0].1,
				JobsError::Utility(UtilError::JobNotFound(match_relay_parent)) if relay_parent == match_relay_parent
ordian's avatar
ordian committed
	#[test]
	fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
		let run_args = HashMap::new();

		test_harness(run_args, |mut overseer_handle, err_rx| async move {
			// send to a non running job
			overseer_handle
				.send(FromOverseer::Communication {
ordian's avatar
ordian committed
					msg: Default::default(),
				})
				.await;

			// the subsystem is still alive
			assert_matches!(
				overseer_handle.recv().await,
				AllMessages::CandidateSelection(_)
			);

			let errs: Vec<_> = err_rx.collect().await;
			assert_eq!(errs.len(), 0);
		});
	}

	#[test]
	fn test_subsystem_impl_and_name_derivation() {
		let pool = sp_core::testing::TaskExecutor::new();
		let (context, _) = make_subsystem_context::<CandidateSelectionMessage, _>(pool.clone());

		let SpawnedSubsystem { name, .. } =
			FakeCandidateSelectionSubsystem::new(pool, HashMap::new(), ()).start(context);