Newer
Older
Peter Goodspeed-Niklaus
committed
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
// job structs are constructed within JobTrait::run
// most will want to retain the sender and receiver, as well as whatever other data they like
struct FakeCandidateSelectionJob {
receiver: mpsc::Receiver<ToJob>,
}
// ToJob implementations require the following properties:
//
// - have a Stop variant (to impl ToJobTrait)
// - impl ToJobTrait
// - impl TryFrom<AllMessages>
// - impl From<CandidateSelectionMessage> (from SubsystemContext::Message)
//
// Mostly, they are just a type-safe subset of AllMessages that this job is prepared to receive
enum ToJob {
CandidateSelection(CandidateSelectionMessage),
Stop,
}
impl ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CandidateSelection(csm) => csm.relay_parent(),
Self::Stop => None,
}
}
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateSelection(csm) => Ok(ToJob::CandidateSelection(csm)),
Peter Goodspeed-Niklaus
committed
}
}
}
impl From<CandidateSelectionMessage> for ToJob {
fn from(csm: CandidateSelectionMessage) -> ToJob {
ToJob::CandidateSelection(csm)
}
}
// FromJob must be infallibly convertable into AllMessages.
//
// It exists to be a type-safe subset of AllMessages that this job is specified to send.
//
// Note: the Clone impl here is not generally required; it's just ueful for this test context because
// we include it in the RunArgs
#[derive(Clone)]
enum FromJob {
Peter Goodspeed-Niklaus
committed
Test,
Peter Goodspeed-Niklaus
committed
}
impl From<FromJob> for AllMessages {
fn from(from_job: FromJob) -> AllMessages {
match from_job {
Peter Goodspeed-Niklaus
committed
FromJob::Test => AllMessages::CandidateSelection(CandidateSelectionMessage::default()),
Peter Goodspeed-Niklaus
committed
}
}
}
// Error will mostly be a wrapper to make the try operator more convenient;
// deriving From implementations for most variants is recommended.
// It must implement Debug for logging.
#[derive(Debug, derive_more::From)]
enum Error {
#[from]
Peter Goodspeed-Niklaus
committed
}
impl JobTrait for FakeCandidateSelectionJob {
type ToJob = ToJob;
type FromJob = FromJob;
type Error = Error;
// RunArgs can be anything that a particular job needs supplied from its external context
// in order to create the Job. In this case, they're a hashmap of parents to the mock outputs
// expected from that job.
//
// Note that it's not recommended to use something as heavy as a hashmap in production: the
// RunArgs get cloned so that each job gets its own owned copy. If you need that, wrap it in
// an Arc. Within a testing context, that efficiency is less important.
type RunArgs = HashMap<Hash, Vec<FromJob>>;
Peter Goodspeed-Niklaus
committed
const NAME: &'static str = "FakeCandidateSelectionJob";
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
fn run(
parent: Hash,
mut run_args: Self::RunArgs,
Peter Goodspeed-Niklaus
committed
receiver: mpsc::Receiver<ToJob>,
mut sender: mpsc::Sender<FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = FakeCandidateSelectionJob { receiver };
Peter Goodspeed-Niklaus
committed
// most jobs will have a request-response cycle at the heart of their run loop.
// however, in this case, we never receive valid messages, so we may as well
// just send all of our (mock) output messages now
let mock_output = run_args.remove(&parent).unwrap_or_default();
let mut stream = stream::iter(mock_output.into_iter().map(Ok));
sender.send_all(&mut stream).await?;
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
job.run_loop().await
Peter Goodspeed-Niklaus
committed
}
}
impl FakeCandidateSelectionJob {
async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.receiver.next().await {
match msg {
ToJob::CandidateSelection(_csm) => {
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
}
ToJob::Stop => break,
}
}
Ok(())
}
}
// with the job defined, it's straightforward to get a subsystem implementation.
type FakeCandidateSelectionSubsystem<Spawner, Context> =
JobManager<Spawner, Context, FakeCandidateSelectionJob>;
Peter Goodspeed-Niklaus
committed
// this type lets us pretend to be the overseer
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CandidateSelectionMessage>;
fn test_harness<T: Future<Output = ()>>(
run_args: HashMap<Hash, Vec<FromJob>>,
test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
None,
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
Peter Goodspeed-Niklaus
committed
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let (err_tx, err_rx) = mpsc::channel(16);
let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, (), pool, Some(err_tx));
Peter Goodspeed-Niklaus
committed
let test_future = test(overseer_handle, err_rx);
let timeout = Delay::new(Duration::from_secs(2));
futures::pin_mut!(subsystem, test_future, timeout);
Peter Goodspeed-Niklaus
committed
futures::executor::block_on(
futures::future::select(
// wait for both to finish
futures::future::join(subsystem, test_future),
timeout,
).then(|either| match either {
futures::future::Either::Right(_) => panic!("test timed out instead of completing"),
futures::future::Either::Left(_) => futures::future::ready(()),
})
);
Peter Goodspeed-Niklaus
committed
}
#[test]
fn starting_and_stopping_job_works() {
let relay_parent: Hash = [0; 32].into();
let mut run_args = HashMap::new();
run_args.insert(
relay_parent.clone(),
Peter Goodspeed-Niklaus
committed
vec![FromJob::Test],
Peter Goodspeed-Niklaus
committed
test_harness(run_args, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(relay_parent),
)))
.await;
Peter Goodspeed-Niklaus
committed
assert_matches!(
overseer_handle.recv().await,
Peter Goodspeed-Niklaus
committed
AllMessages::CandidateSelection(_)
Peter Goodspeed-Niklaus
committed
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::stop_work(relay_parent),
)))
.await;
Peter Goodspeed-Niklaus
committed
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
}
#[test]
fn stopping_non_running_job_fails() {
let relay_parent: Hash = [0; 32].into();
let run_args = HashMap::new();
test_harness(run_args, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::stop_work(relay_parent),
)))
.await;
Peter Goodspeed-Niklaus
committed
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 1);
assert_eq!(errs[0].0, Some(relay_parent));
assert_matches!(
errs[0].1,
Peter Goodspeed-Niklaus
committed
JobsError::Utility(UtilError::JobNotFound(match_relay_parent)) if relay_parent == match_relay_parent
Peter Goodspeed-Niklaus
committed
);
});
}
Peter Goodspeed-Niklaus
committed
#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let run_args = HashMap::new();
test_harness(run_args, |mut overseer_handle, err_rx| async move {
// send to a non running job
overseer_handle
.send(FromOverseer::Communication {
msg: Default::default(),
})
.await;
// the subsystem is still alive
assert_matches!(
overseer_handle.recv().await,
AllMessages::CandidateSelection(_)
);
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
}
Peter Goodspeed-Niklaus
committed
#[test]
fn test_subsystem_impl_and_name_derivation() {
let pool = sp_core::testing::TaskExecutor::new();
let (context, _) = make_subsystem_context::<CandidateSelectionMessage, _>(pool.clone());
let SpawnedSubsystem { name, .. } =
FakeCandidateSelectionSubsystem::new(pool, HashMap::new(), ()).start(context);
Peter Goodspeed-Niklaus
committed
assert_eq!(name, "FakeCandidateSelection");
}