diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 9f8d5c94f08cdb268dedf467da3f42b0268ada91..7f60ff2254f412eb0f02b3bd95a30a4782d44c94 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -4488,8 +4488,6 @@ dependencies = [
  "bitvec",
  "derive_more 0.99.9",
  "futures 0.3.5",
- "futures-timer 3.0.2",
- "log 0.4.8",
  "polkadot-erasure-coding",
  "polkadot-node-primitives",
  "polkadot-node-subsystem",
@@ -4502,7 +4500,6 @@ dependencies = [
  "sp-blockchain",
  "sp-core",
  "sp-keyring",
- "streamunordered",
 ]
 
 [[package]]
@@ -4547,11 +4544,19 @@ name = "polkadot-node-subsystem"
 version = "0.1.0"
 dependencies = [
  "async-trait",
+ "derive_more 0.99.9",
  "futures 0.3.5",
+ "futures-timer 3.0.2",
+ "log 0.4.8",
+ "parity-scale-codec",
+ "pin-project",
  "polkadot-node-primitives",
  "polkadot-primitives",
  "polkadot-statement-table",
+ "sc-keystore",
  "sc-network",
+ "sp-core",
+ "streamunordered",
 ]
 
 [[package]]
diff --git a/polkadot/node/core/backing/Cargo.toml b/polkadot/node/core/backing/Cargo.toml
index b25055293add0e3a92c8b9b7edda92cdb06e6438..720b8af418d25a54864d64171bf507536a412dd7 100644
--- a/polkadot/node/core/backing/Cargo.toml
+++ b/polkadot/node/core/backing/Cargo.toml
@@ -6,20 +6,16 @@ edition = "2018"
 
 [dependencies]
 futures = "0.3.5"
-log = "0.4.8"
 sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
 keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
 primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" }
-
 polkadot-primitives = { path = "../../../primitives" }
 polkadot-node-primitives = { path = "../../primitives" }
 polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
 erasure-coding = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
 statement-table = { package = "polkadot-statement-table", path = "../../../statement-table" }
-futures-timer = "3.0.2"
-streamunordered = "0.5.1"
 derive_more = "0.99.9"
 bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
 
diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs
index 281147847e19ef678ec9e28acd9896bc607e00de..e0309ec8428e8106790839d06722e0de261a45e5 100644
--- a/polkadot/node/core/backing/src/lib.rs
+++ b/polkadot/node/core/backing/src/lib.rs
@@ -16,45 +16,43 @@
 
 //! Implements a `CandidateBackingSubsystem`.
 
-#![recursion_limit="256"]
-
 use std::collections::{HashMap, HashSet};
 use std::convert::TryFrom;
 use std::pin::Pin;
 use std::sync::Arc;
-use std::time::Duration;
 
 use bitvec::vec::BitVec;
-use log;
 use futures::{
-	select, FutureExt, SinkExt, StreamExt,
-	channel::{oneshot, mpsc},
-	future::{self, Either},
-	task::{Spawn, SpawnError, SpawnExt},
+	channel::{mpsc, oneshot},
+	task::{Spawn, SpawnError},
+	Future, FutureExt, SinkExt, StreamExt,
 };
-use futures_timer::Delay;
-use streamunordered::{StreamUnordered, StreamYield};
 
-use primitives::Pair;
 use keystore::KeyStorePtr;
 use polkadot_primitives::v1::{
-	CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorPair, ValidatorId,
+	CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
 	ValidatorIndex, SigningContext, PoV, OmittedValidationData,
 	CandidateDescriptor, AvailableData, ErasureChunk, ValidatorSignature, Hash, CandidateReceipt,
 	CandidateCommitments,
 };
 use polkadot_node_primitives::{
-	FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
-	ValidationOutputs,
+	FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport,
+	ValidationOutputs, ValidationResult,
 };
 use polkadot_subsystem::{
-	FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem,
-};
-use polkadot_subsystem::messages::{
-	AllMessages, CandidateBackingMessage, CandidateSelectionMessage, SchedulerRoster,
-	RuntimeApiMessage, RuntimeApiRequest, CandidateValidationMessage, ValidationFailed,
-	StatementDistributionMessage, NewBackedCandidate, ProvisionerMessage, ProvisionableData,
-	PoVDistributionMessage, AvailabilityStoreMessage,
+	Subsystem, SubsystemContext, SpawnedSubsystem,
+	messages::{
+		AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
+		CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
+		ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
+	},
+	util::{
+		self,
+		request_signing_context,
+		request_validator_groups,
+		request_validators,
+		Validator,
+	},
 };
 use statement_table::{
 	generic::AttestedCandidate as TableAttestedCandidate,
@@ -68,9 +66,7 @@ use statement_table::{
 
 #[derive(Debug, derive_more::From)]
 enum Error {
-	NotInValidatorSet,
 	CandidateNotFound,
-	JobNotFound(Hash),
 	InvalidSignature,
 	#[from]
 	Erasure(erasure_coding::Error),
@@ -82,6 +78,8 @@ enum Error {
 	Mpsc(mpsc::SendError),
 	#[from]
 	Spawn(SpawnError),
+	#[from]
+	UtilError(util::Error),
 }
 
 /// Holds all data needed for candidate backing job operation.
@@ -92,7 +90,6 @@ struct CandidateBackingJob {
 	rx_to: mpsc::Receiver<ToJob>,
 	/// Outbound message channel sending part.
 	tx_from: mpsc::Sender<FromJob>,
-
 	/// The `ParaId`s assigned to this validator.
 	assignment: ParaId,
 	/// We issued `Valid` or `Invalid` statements on about these candidates.
@@ -101,7 +98,6 @@ struct CandidateBackingJob {
 	seconded: Option<Hash>,
 	/// We have already reported misbehaviors for these validators.
 	reported_misbehavior_for: HashSet<ValidatorIndex>,
-
 	table: Table<TableContext>,
 	table_context: TableContext,
 }
@@ -113,7 +109,7 @@ const fn group_quorum(n_validators: usize) -> usize {
 #[derive(Default)]
 struct TableContext {
 	signing_context: SigningContext,
-	key: Option<ValidatorPair>,
+	validator: Option<Validator>,
 	groups: HashMap<ParaId, Vec<ValidatorIndex>>,
 	validators: Vec<ValidatorId>,
 }
@@ -142,30 +138,40 @@ impl TableContextTrait for TableContext {
 	}
 }
 
-impl TableContext {
-	fn local_id(&self) -> Option<ValidatorId> {
-		self.key.as_ref().map(|k| k.public())
+/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
+pub enum ToJob {
+	/// A `CandidateBackingMessage`.
+	CandidateBacking(CandidateBackingMessage),
+	/// Stop working.
+	Stop,
+}
+
+impl TryFrom<AllMessages> for ToJob {
+	type Error = ();
+
+	fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
+		match msg {
+			AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)),
+			_ => Err(()),
+		}
 	}
+}
 
-	fn local_index(&self) -> Option<ValidatorIndex> {
-		self.local_id().and_then(|id|
-			self.validators
-				.iter()
-				.enumerate()
-				.find(|(_, k)| k == &&id)
-				.map(|(i, _)| i as ValidatorIndex)
-		)
+impl From<CandidateBackingMessage> for ToJob {
+	fn from(msg: CandidateBackingMessage) -> Self {
+		Self::CandidateBacking(msg)
 	}
 }
 
-const CHANNEL_CAPACITY: usize = 64;
+impl util::ToJobTrait for ToJob {
+	const STOP: Self = ToJob::Stop;
 
-/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
-enum ToJob {
-	/// A `CandidateBackingMessage`.
-	CandidateBacking(CandidateBackingMessage),
-	/// Stop working.
-	Stop,
+	fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::CandidateBacking(cb) => cb.relay_parent(),
+			Self::Stop => None,
+		}
+	}
 }
 
 /// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`.
@@ -193,6 +199,23 @@ impl From<FromJob> for AllMessages {
 	}
 }
 
+impl TryFrom<AllMessages> for FromJob {
+	type Error = &'static str;
+
+	fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
+		match f {
+			AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
+			AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
+			AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
+			AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
+			AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
+			AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
+			AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
+			_ => Err("can't convert this AllMessages variant to FromJob"),
+		}
+	}
+}
+
 // It looks like it's not possible to do an `impl From` given the current state of
 // the code. So this does the necessary conversion.
 fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
@@ -209,19 +232,9 @@ fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement
 	}
 }
 
-// finds the first key we are capable of signing with out of the given set of validators,
-// if any.
-fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<ValidatorPair> {
-	let keystore = keystore.read();
-	validators.iter()
-		.find_map(|v| {
-			keystore.key_pair::<ValidatorPair>(&v).ok()
-		})
-}
-
 impl CandidateBackingJob {
 	/// Run asynchronously.
-	async fn run(mut self) -> Result<(), Error> {
+	async fn run_loop(mut self) -> Result<(), Error> {
 		while let Some(msg) = self.rx_to.next().await {
 			match msg {
 				ToJob::CandidateBacking(msg) => {
@@ -328,9 +341,7 @@ impl CandidateBackingJob {
 				None => continue,
 			};
 
-			let mut validator_indices = BitVec::with_capacity(
-				group.len()
-			);
+			let mut validator_indices = BitVec::with_capacity(group.len());
 
 			validator_indices.resize(group.len(), false);
 
@@ -371,7 +382,7 @@ impl CandidateBackingJob {
 
 				if let Ok(report) = MisbehaviorReport::try_from(f) {
 					let message = ProvisionerMessage::ProvisionableData(
-						ProvisionableData::MisbehaviorReport(self.parent, report)
+						ProvisionableData::MisbehaviorReport(self.parent, report),
 					);
 
 					reports.push(message);
@@ -513,18 +524,7 @@ impl CandidateBackingJob {
 	}
 
 	fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
-		let local_index = self.table_context.local_index()?;
-
-		let signing_key = self.table_context.key.as_ref()?;
-
-		let signed_statement = SignedFullStatement::sign(
-			statement,
-			&self.table_context.signing_context,
-			local_index,
-			signing_key,
-		);
-
-		Some(signed_statement)
+		Some(self.table_context.validator.as_ref()?.sign(statement))
 	}
 
 	fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
@@ -657,329 +657,133 @@ impl CandidateBackingJob {
 	}
 }
 
-struct JobHandle {
-	abort_handle: future::AbortHandle,
-	to_job: mpsc::Sender<ToJob>,
-	finished: oneshot::Receiver<()>,
-	su_handle: usize,
-}
+impl util::JobTrait for CandidateBackingJob {
+	type ToJob = ToJob;
+	type FromJob = FromJob;
+	type Error = Error;
+	type RunArgs = KeyStorePtr;
 
-impl JobHandle {
-	async fn stop(mut self) {
-		let _ = self.to_job.send(ToJob::Stop).await;
-		let stop_timer = Delay::new(Duration::from_secs(1));
-
-		match future::select(stop_timer, self.finished).await {
-			Either::Left((_, _)) => {
-			},
-			Either::Right((_, _)) => {
-				self.abort_handle.abort();
-			},
-		}
-	}
+	const NAME: &'static str = "CandidateBackingJob";
 
-	async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
-		Ok(self.to_job.send(msg).await?)
-	}
-}
-
-struct Jobs<S> {
-	spawner: S,
-	running: HashMap<Hash, JobHandle>,
-	outgoing_msgs: StreamUnordered<mpsc::Receiver<FromJob>>,
-}
-
-async fn run_job(
-	parent: Hash,
-	keystore: KeyStorePtr,
-	rx_to: mpsc::Receiver<ToJob>,
-	mut tx_from: mpsc::Sender<FromJob>,
-) -> Result<(), Error> {
-	let (validators, roster) = futures::try_join!(
-		request_validators(parent, &mut tx_from).await?,
-		request_validator_groups(parent, &mut tx_from).await?,
-	)?;
-
-	let key = signing_key(&validators[..], &keystore).ok_or(Error::NotInValidatorSet)?;
-	let mut groups = HashMap::new();
-
-	for assignment in roster.scheduled {
-		if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) {
-			groups.insert(
-				assignment.para_id,
-				g.clone(),
-			);
-		}
-	}
-
-	let mut assignment = Default::default();
-
-	if let Some(idx) = validators.iter().position(|k| *k == key.public()) {
-		let idx = idx as u32;
-		for (para_id, group) in groups.iter() {
-			if group.contains(&idx) {
-				assignment = *para_id;
-				break;
+	fn run(
+		parent: Hash,
+		keystore: KeyStorePtr,
+		rx_to: mpsc::Receiver<Self::ToJob>,
+		mut tx_from: mpsc::Sender<Self::FromJob>,
+	) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
+		async move {
+			let (validators, roster, signing_context) = futures::try_join!(
+				request_validators(parent, &mut tx_from).await?,
+				request_validator_groups(parent, &mut tx_from).await?,
+				request_signing_context(parent, &mut tx_from).await?,
+			)?;
+
+			let validator = Validator::construct(&validators, signing_context, keystore.clone())?;
+
+			let mut groups = HashMap::new();
+
+			for assignment in roster.scheduled {
+				if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) {
+					groups.insert(assignment.para_id, g.clone());
+				}
 			}
-		}
-	}
-
-	let signing_context = request_signing_context(parent, &mut tx_from).await?.await?;
-
-	let table_context = TableContext {
-		signing_context,
-		key: Some(key),
-		groups,
-		validators,
-	};
-
-	let job = CandidateBackingJob {
-		parent,
-		rx_to,
-		tx_from,
-		assignment,
-		issued_statements: HashSet::new(),
-		seconded: None,
-		reported_misbehavior_for: HashSet::new(),
-		table: Table::default(),
-		table_context,
-	};
-
-	job.run().await
-}
-
-/// Request a validator set from the `RuntimeApi`.
-async fn request_validators(
-	parent: Hash,
-	s: &mut mpsc::Sender<FromJob>,
-) -> Result<oneshot::Receiver<Vec<ValidatorId>>, Error> {
-	let (tx, rx) = oneshot::channel();
 
-	s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
-			parent,
-			RuntimeApiRequest::Validators(tx),
-		)
-	)).await?;
+			let mut assignment = Default::default();
 
-	Ok(rx)
-}
-
-/// Request the scheduler roster from `RuntimeApi`.
-async fn request_validator_groups(
-	parent: Hash,
-	s: &mut mpsc::Sender<FromJob>,
-) -> Result<oneshot::Receiver<SchedulerRoster>, Error> {
-	let (tx, rx) = oneshot::channel();
-
-	s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
-			parent,
-			RuntimeApiRequest::ValidatorGroups(tx),
-		)
-	)).await?;
-
-	Ok(rx)
-}
-
-/// Request a `SigningContext` from the `RuntimeApi`.
-async fn request_signing_context(
-	parent: Hash,
-	s: &mut mpsc::Sender<FromJob>,
-) -> Result<oneshot::Receiver<SigningContext>, Error> {
-	let (tx, rx) = oneshot::channel();
-
-	s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
-			parent,
-			RuntimeApiRequest::SigningContext(tx),
-		)
-	)).await?;
-
-	Ok(rx)
-}
-
-impl<S: Spawn> Jobs<S> {
-	fn new(spawner: S) -> Self {
-		Self {
-			spawner,
-			running: HashMap::default(),
-			outgoing_msgs: StreamUnordered::new(),
-		}
-	}
-
-	fn spawn_job(&mut self, parent_hash: Hash, keystore: KeyStorePtr) -> Result<(), Error> {
-		let (to_job_tx, to_job_rx) = mpsc::channel(CHANNEL_CAPACITY);
-		let (from_job_tx, from_job_rx) = mpsc::channel(CHANNEL_CAPACITY);
-
-		let (future, abort_handle) = future::abortable(async move {
-			if let Err(e) = run_job(parent_hash, keystore, to_job_rx, from_job_tx).await {
-				log::error!(
-					"CandidateBackingJob({}) finished with an error {:?}",
-					parent_hash,
-					e,
-				);
+			if let Some(idx) = validators.iter().position(|k| *k == validator.id()) {
+				let idx = idx as u32;
+				for (para_id, group) in groups.iter() {
+					if group.contains(&idx) {
+						assignment = *para_id;
+						break;
+					}
+				}
 			}
-		});
-
-		let (finished_tx, finished) = oneshot::channel();
-
-		let future = async move {
-			let _ = future.await;
-			let _ = finished_tx.send(());
-		};
-		self.spawner.spawn(future)?;
-
-		let su_handle = self.outgoing_msgs.push(from_job_rx);
-
-		let handle = JobHandle {
-			abort_handle,
-			to_job: to_job_tx,
-			finished,
-			su_handle,
-		};
-
-		self.running.insert(parent_hash, handle);
 
-		Ok(())
-	}
+			let table_context = TableContext {
+				groups,
+				validators,
+				signing_context: validator.signing_context().clone(),
+				validator: Some(validator),
+			};
 
-	async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> {
-		match self.running.remove(&parent_hash) {
-			Some(handle) => {
-				Pin::new(&mut self.outgoing_msgs).remove(handle.su_handle);
-				handle.stop().await;
-				Ok(())
-			}
-			None => Err(Error::JobNotFound(parent_hash))
-		}
-	}
+			let job = CandidateBackingJob {
+				parent,
+				rx_to,
+				tx_from,
+				assignment,
+				issued_statements: HashSet::new(),
+				seconded: None,
+				reported_misbehavior_for: HashSet::new(),
+				table: Table::default(),
+				table_context,
+			};
 
-	async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) -> Result<(), Error> {
-		if let Some(job) = self.running.get_mut(&parent_hash) {
-			job.send_msg(msg).await?;
+			job.run_loop().await
 		}
-		Ok(())
-	}
-
-	async fn next(&mut self) -> Option<FromJob> {
-		self.outgoing_msgs.next().await.and_then(|(e, _)| match e {
-			StreamYield::Item(e) => Some(e),
-			_ => None,
-		})
+		.boxed()
 	}
 }
 
+/// Manager type for the CandidateBackingSubsystem
+type Manager<Spawner, Context> = util::JobManager<Spawner, Context, CandidateBackingJob>;
+
 /// An implementation of the Candidate Backing subsystem.
-pub struct CandidateBackingSubsystem<S, Context> {
-	spawner: S,
-	keystore: KeyStorePtr,
-	_context: std::marker::PhantomData<Context>,
+pub struct CandidateBackingSubsystem<Spawner, Context> {
+	manager: Manager<Spawner, Context>,
 }
 
-impl<S, Context> CandidateBackingSubsystem<S, Context>
-	where
-		S: Spawn + Clone,
-		Context: SubsystemContext<Message=CandidateBackingMessage>,
+impl<Spawner, Context> CandidateBackingSubsystem<Spawner, Context>
+where
+	Spawner: Clone + Spawn + Send + Unpin,
+	Context: SubsystemContext,
+	ToJob: From<<Context as SubsystemContext>::Message>,
 {
 	/// Creates a new `CandidateBackingSubsystem`.
-	pub fn new(keystore: KeyStorePtr, spawner: S) -> Self {
-		Self {
-			spawner,
-			keystore,
-			_context: std::marker::PhantomData,
+	pub fn new(spawner: Spawner, keystore: KeyStorePtr) -> Self {
+		CandidateBackingSubsystem {
+			manager: util::JobManager::new(spawner, keystore)
 		}
 	}
 
-	async fn run(
-		mut ctx: Context,
-		keystore: KeyStorePtr,
-		spawner: S,
-	) {
-		let mut jobs = Jobs::new(spawner.clone());
-
-		loop {
-			select! {
-				incoming = ctx.recv().fuse() => {
-					match incoming {
-						Ok(msg) => match msg {
-							FromOverseer::Signal(OverseerSignal::StartWork(hash)) => {
-								if let Err(e) = jobs.spawn_job(hash, keystore.clone()) {
-									log::error!("Failed to spawn a job: {:?}", e);
-									break;
-								}
-							}
-							FromOverseer::Signal(OverseerSignal::StopWork(hash)) => {
-								if let Err(e) = jobs.stop_job(hash).await {
-									log::error!("Failed to spawn a job: {:?}", e);
-									break;
-								}
-							}
-							FromOverseer::Communication { msg } => {
-								match msg {
-									CandidateBackingMessage::Second(hash, _, _) |
-									CandidateBackingMessage::Statement(hash, _) |
-									CandidateBackingMessage::GetBackedCandidates(hash, _) => {
-										let res = jobs.send_msg(
-											hash.clone(),
-											ToJob::CandidateBacking(msg),
-										).await;
-
-										if let Err(e) = res {
-											log::error!(
-												"Failed to send a message to a job: {:?}",
-												e,
-											);
-
-											break;
-										}
-									}
-									_ => (),
-								}
-							}
-							_ => (),
-						},
-						Err(_) => break,
-					}
-				}
-				outgoing = jobs.next().fuse() => {
-					match outgoing {
-						Some(msg) => {
-							let _ = ctx.send_message(msg.into()).await;
-						}
-						None => break,
-					}
-				}
-				complete => break,
-			}
-		}
+	/// Run this subsystem
+	pub async fn run(ctx: Context, keystore: KeyStorePtr, spawner: Spawner) {
+		<Manager<Spawner, Context>>::run(ctx, keystore, spawner).await
 	}
 }
 
-impl<S, Context> Subsystem<Context> for CandidateBackingSubsystem<S, Context>
-	where
-		S: Spawn + Send + Clone + 'static,
-		Context: SubsystemContext<Message=CandidateBackingMessage>,
+impl<Spawner, Context> Subsystem<Context> for CandidateBackingSubsystem<Spawner, Context>
+where
+	Spawner: Spawn + Send + Clone + Unpin + 'static,
+	Context: SubsystemContext,
+	<Context as SubsystemContext>::Message: Into<ToJob>,
 {
 	fn start(self, ctx: Context) -> SpawnedSubsystem {
-		let keystore = self.keystore.clone();
-		let spawner = self.spawner.clone();
-
-		SpawnedSubsystem(Box::pin(async move {
-			Self::run(ctx, keystore, spawner).await;
-		}))
+		self.manager.start(ctx)
 	}
 }
 
+
+
 #[cfg(test)]
 mod tests {
 	use super::*;
-	use futures::{Future, executor::{self, ThreadPool}};
-	use std::collections::HashMap;
-	use std::sync::Arc;
-	use sp_keyring::Sr25519Keyring;
+	use assert_matches::assert_matches;
+	use futures::{
+		executor::{self, ThreadPool},
+		future, Future,
+	};
 	use polkadot_primitives::v1::{
-		AssignmentKind, CollatorId, CoreAssignment, BlockData, CoreIndex, GroupIndex, ValidityAttestation,
-		CandidateCommitments, LocalValidationData, GlobalValidationSchedule, HeadData,
+		AssignmentKind, BlockData, CandidateCommitments, CollatorId, CoreAssignment, CoreIndex,
+		LocalValidationData, GlobalValidationSchedule, GroupIndex, HeadData,
+		ValidatorPair, ValidityAttestation,
 	};
-	use assert_matches::assert_matches;
+	use polkadot_subsystem::{
+		messages::{RuntimeApiRequest, SchedulerRoster},
+		FromOverseer, OverseerSignal,
+	};
+	use sp_keyring::Sr25519Keyring;
+	use std::collections::HashMap;
 
 	fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
 		val_ids.iter().map(|v| v.public().into()).collect()
@@ -1545,7 +1349,6 @@ mod tests {
 					).unwrap();
 				}
 			);
-
 		});
 	}
 
@@ -1626,8 +1429,6 @@ mod tests {
 
 			virtual_overseer.send(FromOverseer::Communication{ msg: second }).await;
 
-			let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
-
 			assert_matches!(
 				virtual_overseer.recv().await,
 				AllMessages::CandidateValidation(
diff --git a/polkadot/node/subsystem/Cargo.toml b/polkadot/node/subsystem/Cargo.toml
index 43712319cb716c0f48e266eb5904f5d0f2b9daf0..188e7cbfa7645c6fdd7067f87109cd1c5c02b6fd 100644
--- a/polkadot/node/subsystem/Cargo.toml
+++ b/polkadot/node/subsystem/Cargo.toml
@@ -6,9 +6,17 @@ edition = "2018"
 description = "Subsystem traits and message definitions"
 
 [dependencies]
+async-trait = "0.1"
+derive_more = "0.99.9"
+futures = "0.3.5"
+futures-timer = "3.0.2"
+keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
+log = "0.4.8"
+parity-scale-codec = "1.3.0"
+pin-project = "0.4.22"
+polkadot-node-primitives = { path = "../primitives" }
 polkadot-primitives = { path = "../../primitives" }
 polkadot-statement-table = { path = "../../statement-table" }
-polkadot-node-primitives = { path = "../primitives" }
 sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
-futures = "0.3.5"
-async-trait = "0.1"
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+streamunordered = "0.5.1"
diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs
index e374eb9cfcdd60a631893617fb3f74f607aaf59a..b6c3a79ef3d97e40383dbbc8d4a789152a8d9c9b 100644
--- a/polkadot/node/subsystem/src/lib.rs
+++ b/polkadot/node/subsystem/src/lib.rs
@@ -20,6 +20,8 @@
 //! that communicate via message-passing. They are coordinated by an overseer, provided by a
 //! separate crate.
 
+#![warn(missing_docs)]
+
 use std::pin::Pin;
 
 use futures::prelude::*;
@@ -32,6 +34,7 @@ use async_trait::async_trait;
 use crate::messages::AllMessages;
 
 pub mod messages;
+pub mod util;
 
 /// Signals sent by an overseer to a subsystem.
 #[derive(PartialEq, Clone, Debug)]
@@ -56,6 +59,7 @@ pub enum FromOverseer<M> {
 
 	/// Some other `Subsystem`'s message.
 	Communication {
+		/// Contained message
 		msg: M,
 	},
 }
diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs
index 10c861f1410cf8e7bbb15b26cb3bcd1c21b18837..d3c630cb56f09a58e8c3d122866b70338171f05d 100644
--- a/polkadot/node/subsystem/src/messages.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -51,6 +51,15 @@ pub enum CandidateSelectionMessage {
 	Invalid(Hash, CandidateReceipt),
 }
 
+impl CandidateSelectionMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::Invalid(hash, _) => Some(*hash),
+		}
+	}
+}
+
 /// Messages received by the Candidate Backing subsystem.
 #[derive(Debug)]
 pub enum CandidateBackingMessage {
@@ -65,6 +74,18 @@ pub enum CandidateBackingMessage {
 	Statement(Hash, SignedFullStatement),
 }
 
+
+impl CandidateBackingMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::GetBackedCandidates(hash, _) => Some(*hash),
+			Self::Second(hash, _, _) => Some(*hash),
+			Self::Statement(hash, _) => Some(*hash),
+		}
+	}
+}
+
 /// Blanket error for validation failing.
 #[derive(Debug)]
 pub struct ValidationFailed;
@@ -102,6 +123,16 @@ pub enum CandidateValidationMessage {
 	),
 }
 
+impl CandidateValidationMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::ValidateFromChainState(_, _, _) => None,
+			Self::ValidateFromExhaustive(_, _, _, _, _) => None,
+		}
+	}
+}
+
 /// Events from network.
 #[derive(Debug, Clone)]
 pub enum NetworkBridgeEvent {
@@ -134,6 +165,17 @@ pub enum NetworkBridgeMessage {
 	SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
 }
 
+impl NetworkBridgeMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::RegisterEventProducer(_, _) => None,
+			Self::ReportPeer(_, _) => None,
+			Self::SendMessage(_, _, _) => None,
+		}
+	}
+}
+
 /// Availability Distribution Message.
 #[derive(Debug)]
 pub enum AvailabilityDistributionMessage {
@@ -147,6 +189,17 @@ pub enum AvailabilityDistributionMessage {
 	NetworkBridgeUpdate(NetworkBridgeEvent),
 }
 
+impl AvailabilityDistributionMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::DistributeChunk(hash, _) => Some(*hash),
+			Self::FetchChunk(hash, _) => Some(*hash),
+			Self::NetworkBridgeUpdate(_) => None,
+		}
+	}
+}
+
 /// Bitfield distribution message.
 #[derive(Debug)]
 pub enum BitfieldDistributionMessage {
@@ -157,6 +210,16 @@ pub enum BitfieldDistributionMessage {
 	NetworkBridgeUpdate(NetworkBridgeEvent),
 }
 
+impl BitfieldDistributionMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::DistributeBitfield(hash, _) => Some(*hash),
+			Self::NetworkBridgeUpdate(_) => None,
+		}
+	}
+}
+
 /// Availability store subsystem message.
 #[derive(Debug)]
 pub enum AvailabilityStoreMessage {
@@ -170,6 +233,17 @@ pub enum AvailabilityStoreMessage {
 	StoreChunk(Hash, ValidatorIndex, ErasureChunk),
 }
 
+impl AvailabilityStoreMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::QueryPoV(hash, _) => Some(*hash),
+			Self::QueryChunk(hash, _, _) => Some(*hash),
+			Self::StoreChunk(hash, _, _) => Some(*hash),
+		}
+	}
+}
+
 /// The information on scheduler assignments that some somesystems may be querying.
 #[derive(Debug, Clone)]
 pub struct SchedulerRoster {
@@ -207,6 +281,15 @@ pub enum RuntimeApiMessage {
 	Request(Hash, RuntimeApiRequest),
 }
 
+impl RuntimeApiMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::Request(hash, _) => Some(*hash),
+		}
+	}
+}
+
 /// Statement distribution message.
 #[derive(Debug)]
 pub enum StatementDistributionMessage {
@@ -217,6 +300,16 @@ pub enum StatementDistributionMessage {
 	NetworkBridgeUpdate(NetworkBridgeEvent),
 }
 
+impl StatementDistributionMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::Share(hash, _) => Some(*hash),
+			Self::NetworkBridgeUpdate(_) => None,
+		}
+	}
+}
+
 /// This data becomes intrinsics or extrinsics which should be included in a future relay chain block.
 #[derive(Debug)]
 pub enum ProvisionableData {
@@ -253,6 +346,17 @@ pub enum ProvisionerMessage {
 	ProvisionableData(ProvisionableData),
 }
 
+impl ProvisionerMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::RequestBlockAuthorshipData(hash, _) => Some(*hash),
+			Self::RequestInherentData(hash, _) => Some(*hash),
+			Self::ProvisionableData(_) => None,
+		}
+	}
+}
+
 /// Message to the PoV Distribution Subsystem.
 #[derive(Debug)]
 pub enum PoVDistributionMessage {
@@ -268,6 +372,17 @@ pub enum PoVDistributionMessage {
 	NetworkBridgeUpdate(NetworkBridgeEvent),
 }
 
+impl PoVDistributionMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::FetchPoV(hash, _, _) => Some(*hash),
+			Self::DistributePoV(hash, _, _) => Some(*hash),
+			Self::NetworkBridgeUpdate(_) => None,
+		}
+	}
+}
+
 /// A message type tying together all message types that are used across Subsystems.
 #[derive(Debug)]
 pub enum AllMessages {
diff --git a/polkadot/node/subsystem/src/util.rs b/polkadot/node/subsystem/src/util.rs
new file mode 100644
index 0000000000000000000000000000000000000000..1b89bfb053a6e2a2b733cc395038d5e705fbce62
--- /dev/null
+++ b/polkadot/node/subsystem/src/util.rs
@@ -0,0 +1,613 @@
+// Copyright 2017-2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Utility module for subsystems
+//!
+//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
+//! or determining what their validator ID is. These common interests are factored into
+//! this module.
+
+use crate::{
+	messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, SchedulerRoster},
+	FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
+};
+use futures::{
+	channel::{mpsc, oneshot},
+	future::Either,
+	prelude::*,
+	select,
+	stream::Stream,
+	task::{self, Spawn, SpawnError, SpawnExt},
+};
+use futures_timer::Delay;
+use keystore::KeyStorePtr;
+use parity_scale_codec::Encode;
+use pin_project::{pin_project, pinned_drop};
+use polkadot_primitives::v1::{
+	EncodeAs, Hash, HeadData, Id as ParaId, Signed, SigningContext,
+	ValidatorId, ValidatorIndex, ValidatorPair,
+};
+use sp_core::Pair;
+use std::{
+	collections::HashMap,
+	convert::{TryFrom, TryInto},
+	marker::Unpin,
+	pin::Pin,
+	time::Duration,
+};
+use streamunordered::{StreamUnordered, StreamYield};
+
+/// Duration a job will wait after sending a stop signal before hard-aborting.
+pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
+/// Capacity of channels to and from individual jobs
+pub const JOB_CHANNEL_CAPACITY: usize = 64;
+
+/// Utility errors
+#[derive(Debug, derive_more::From)]
+pub enum Error {
+	/// Attempted to send or receive on a oneshot channel which had been canceled
+	#[from]
+	Oneshot(oneshot::Canceled),
+	/// Attempted to send on a MPSC channel which has been canceled
+	#[from]
+	Mpsc(mpsc::SendError),
+	/// Attempted to spawn a new task, and failed
+	#[from]
+	Spawn(SpawnError),
+	/// Attempted to convert from an AllMessages to a FromJob, and failed.
+	SenderConversion(String),
+	/// The local node is not a validator.
+	NotAValidator,
+	/// The desired job is not present in the jobs list.
+	JobNotFound(Hash),
+}
+
+/// Request some data from the `RuntimeApi`.
+pub async fn request_from_runtime<RequestBuilder, Response, FromJob>(
+	parent: Hash,
+	sender: &mut mpsc::Sender<FromJob>,
+	request_builder: RequestBuilder,
+) -> Result<oneshot::Receiver<Response>, Error>
+where
+	RequestBuilder: FnOnce(oneshot::Sender<Response>) -> RuntimeApiRequest,
+	FromJob: TryFrom<AllMessages>,
+	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
+{
+	let (tx, rx) = oneshot::channel();
+
+	sender
+		.send(
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
+				.try_into()
+				.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
+		)
+		.await?;
+
+	Ok(rx)
+}
+
+/// Request a validator set from the `RuntimeApi`.
+pub async fn request_validators<FromJob>(
+	parent: Hash,
+	s: &mut mpsc::Sender<FromJob>,
+) -> Result<oneshot::Receiver<Vec<ValidatorId>>, Error>
+where
+	FromJob: TryFrom<AllMessages>,
+	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
+{
+	request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await
+}
+
+/// Request the scheduler roster from `RuntimeApi`.
+pub async fn request_validator_groups<FromJob>(
+	parent: Hash,
+	s: &mut mpsc::Sender<FromJob>,
+) -> Result<oneshot::Receiver<SchedulerRoster>, Error>
+where
+	FromJob: TryFrom<AllMessages>,
+	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
+{
+	request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await
+}
+
+/// Request a `SigningContext` from the `RuntimeApi`.
+pub async fn request_signing_context<FromJob>(
+	parent: Hash,
+	s: &mut mpsc::Sender<FromJob>,
+) -> Result<oneshot::Receiver<SigningContext>, Error>
+where
+	FromJob: TryFrom<AllMessages>,
+	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
+{
+	request_from_runtime(parent, s, |tx| RuntimeApiRequest::SigningContext(tx)).await
+}
+
+/// Request `HeadData` for some `ParaId` from `RuntimeApi`.
+pub async fn request_head_data<FromJob>(
+	parent: Hash,
+	s: &mut mpsc::Sender<FromJob>,
+	id: ParaId,
+) -> Result<oneshot::Receiver<HeadData>, Error>
+where
+	FromJob: TryFrom<AllMessages>,
+	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
+{
+	request_from_runtime(parent, s, |tx| RuntimeApiRequest::HeadData(id, tx)).await
+}
+
+/// From the given set of validators, find the first key we can sign with, if any.
+pub fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<ValidatorPair> {
+	let keystore = keystore.read();
+	validators
+		.iter()
+		.find_map(|v| keystore.key_pair::<ValidatorPair>(&v).ok())
+}
+
+/// Local validator information
+///
+/// It can be created if the local node is a validator in the context of a particular
+/// relay chain block.
+pub struct Validator {
+	signing_context: SigningContext,
+	key: ValidatorPair,
+	index: ValidatorIndex,
+}
+
+impl Validator {
+	/// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block.
+	pub async fn new<FromJob>(
+		parent: Hash,
+		keystore: KeyStorePtr,
+		mut sender: mpsc::Sender<FromJob>,
+	) -> Result<Self, Error>
+	where
+		FromJob: TryFrom<AllMessages>,
+		<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
+	{
+		// Note: request_validators and request_signing_context do not and cannot run concurrently: they both
+		// have a mutable handle to the same sender.
+		// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
+		let (validators, signing_context) = futures::try_join!(
+			request_validators(parent, &mut sender).await?,
+			request_signing_context(parent, &mut sender).await?,
+		)?;
+
+		Self::construct(&validators, signing_context, keystore)
+	}
+
+	/// Construct a validator instance without performing runtime fetches.
+	///
+	/// This can be useful if external code also needs the same data.
+	pub fn construct(
+		validators: &[ValidatorId],
+		signing_context: SigningContext,
+		keystore: KeyStorePtr,
+	) -> Result<Self, Error> {
+		let key = signing_key(validators, &keystore).ok_or(Error::NotAValidator)?;
+		let index = validators
+			.iter()
+			.enumerate()
+			.find(|(_, k)| k == &&key.public())
+			.map(|(idx, _)| idx as ValidatorIndex)
+			.expect("signing_key would have already returned NotAValidator if the item we're searching for isn't in this list; qed");
+
+		Ok(Validator {
+			signing_context,
+			key,
+			index,
+		})
+	}
+
+	/// Get this validator's id.
+	pub fn id(&self) -> ValidatorId {
+		self.key.public()
+	}
+
+	/// Get this validator's local index.
+	pub fn index(&self) -> ValidatorIndex {
+		self.index
+	}
+
+	/// Get the current signing context.
+	pub fn signing_context(&self) -> &SigningContext {
+		&self.signing_context
+	}
+
+	/// Sign a payload with this validator
+	pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
+		&self,
+		payload: Payload,
+	) -> Signed<Payload, RealPayload> {
+		Signed::sign(payload, &self.signing_context, self.index, &self.key)
+	}
+
+	/// Validate the payload with this validator
+	///
+	/// Validation can only succeed if `signed.validator_index() == self.index()`.
+	/// Normally, this will always be the case for a properly operating program,
+	/// but it's double-checked here anyway.
+	pub fn check_payload<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
+		&self,
+		signed: Signed<Payload, RealPayload>,
+	) -> Result<(), ()> {
+		if signed.validator_index() != self.index {
+			return Err(());
+		}
+		signed.check_signature(&self.signing_context, &self.id())
+	}
+}
+
+/// ToJob is expected to be an enum declaring the set of messages of interest to a particular job.
+///
+/// Normally, this will be some subset of `Allmessages`, and a `Stop` variant.
+pub trait ToJobTrait: TryFrom<AllMessages> {
+	/// The `Stop` variant of the ToJob enum.
+	const STOP: Self;
+
+	/// If the message variant contains its relay parent, return it here
+	fn relay_parent(&self) -> Option<Hash>;
+}
+
+/// A JobHandle manages a particular job for a subsystem.
+pub struct JobHandle<ToJob> {
+	abort_handle: future::AbortHandle,
+	to_job: mpsc::Sender<ToJob>,
+	finished: oneshot::Receiver<()>,
+	outgoing_msgs_handle: usize,
+}
+
+impl<ToJob> JobHandle<ToJob> {
+	/// Send a message to the job.
+	pub async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
+		self.to_job.send(msg).await.map_err(Into::into)
+	}
+
+	/// Abort the job without waiting for a graceful shutdown
+	pub fn abort(self) {
+		self.abort_handle.abort();
+	}
+}
+
+impl<ToJob: ToJobTrait> JobHandle<ToJob> {
+	/// Stop this job gracefully.
+	///
+	/// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it.
+	pub async fn stop(mut self) {
+		// we don't actually care if the message couldn't be sent
+		let _ = self.to_job.send(ToJob::STOP).await;
+		let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION);
+
+		match future::select(stop_timer, self.finished).await {
+			Either::Left((_, _)) => {}
+			Either::Right((_, _)) => {
+				self.abort_handle.abort();
+			}
+		}
+	}
+}
+
+/// This trait governs jobs.
+///
+/// Jobs are instantiated and killed automatically on appropriate overseer messages.
+/// Other messages are passed along to and from the job via the overseer to other
+/// subsystems.
+pub trait JobTrait: Unpin {
+	/// Message type to the job. Typically a subset of AllMessages.
+	type ToJob: 'static + ToJobTrait + Send;
+	/// Message type from the job. Typically a subset of AllMessages.
+	type FromJob: 'static + Into<AllMessages> + Send;
+	/// Job runtime error.
+	type Error: std::fmt::Debug;
+	/// Extra arguments this job needs to run properly.
+	///
+	/// If no extra information is needed, it is perfectly acceptable to set it to `()`.
+	type RunArgs: 'static + Send;
+
+	/// Name of the job, i.e. `CandidateBackingJob`
+	const NAME: &'static str;
+
+	/// Run a job for the parent block indicated
+	fn run(
+		parent: Hash,
+		run_args: Self::RunArgs,
+		rx_to: mpsc::Receiver<Self::ToJob>,
+		tx_from: mpsc::Sender<Self::FromJob>,
+	) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
+
+	/// Handle a message which has no relay parent, and therefore can't be dispatched to a particular job
+	///
+	/// By default, this is implemented with a NOP function. However, if
+	/// ToJob occasionally has messages which do not correspond to a particular
+	/// parent relay hash, then this function will be spawned as a one-off
+	/// task to handle those messages.
+	// TODO: the API here is likely not precisely what we want; figure it out more
+	// once we're implementing a subsystem which actually needs this feature.
+	// In particular, we're quite likely to want this to return a future instead of
+	// interrupting the active thread for the duration of the handler.
+	fn handle_unanchored_msg(_msg: Self::ToJob) -> Result<(), Self::Error> {
+		Ok(())
+	}
+}
+
+/// Jobs manager for a subsystem
+///
+/// - Spawns new jobs for a given relay-parent on demand.
+/// - Closes old jobs for a given relay-parent on demand.
+/// - Dispatches messages to the appropriate job for a given relay-parent.
+/// - When dropped, aborts all remaining jobs.
+/// - implements `Stream<Item=Job::FromJob>`, collecting all messages from subordinate jobs.
+#[pin_project(PinnedDrop)]
+pub struct Jobs<Spawner, Job: JobTrait> {
+	spawner: Spawner,
+	running: HashMap<Hash, JobHandle<Job::ToJob>>,
+	#[pin]
+	outgoing_msgs: StreamUnordered<mpsc::Receiver<Job::FromJob>>,
+	job: std::marker::PhantomData<Job>,
+}
+
+impl<Spawner: Spawn, Job: JobTrait> Jobs<Spawner, Job> {
+	/// Create a new Jobs manager which handles spawning appropriate jobs.
+	pub fn new(spawner: Spawner) -> Self {
+		Self {
+			spawner,
+			running: HashMap::new(),
+			outgoing_msgs: StreamUnordered::new(),
+			job: std::marker::PhantomData,
+		}
+	}
+
+	/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
+	fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs) -> Result<(), Error> {
+		let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
+		let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
+		let (finished_tx, finished) = oneshot::channel();
+
+		let (future, abort_handle) = future::abortable(async move {
+			if let Err(e) = Job::run(parent_hash, run_args, to_job_rx, from_job_tx).await {
+				log::error!(
+					"{}({}) finished with an error {:?}",
+					Job::NAME,
+					parent_hash,
+					e,
+				);
+			}
+		});
+
+		// discard output
+		let future = async move {
+			let _ = future.await;
+			let _ = finished_tx.send(());
+		};
+		self.spawner.spawn(future)?;
+
+		// this handle lets us remove the appropriate receiver from self.outgoing_msgs
+		// when it's time to stop the job.
+		let outgoing_msgs_handle = self.outgoing_msgs.push(from_job_rx);
+
+		let handle = JobHandle {
+			abort_handle,
+			to_job: to_job_tx,
+			finished,
+			outgoing_msgs_handle,
+		};
+
+		self.running.insert(parent_hash, handle);
+
+		Ok(())
+	}
+
+	/// Stop the job associated with this `parent_hash`.
+	pub async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> {
+		match self.running.remove(&parent_hash) {
+			Some(handle) => {
+				Pin::new(&mut self.outgoing_msgs).remove(handle.outgoing_msgs_handle);
+				handle.stop().await;
+				Ok(())
+			}
+			None => Err(Error::JobNotFound(parent_hash)),
+		}
+	}
+
+	/// Send a message to the appropriate job for this `parent_hash`.
+	async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> {
+		match self.running.get_mut(&parent_hash) {
+			Some(job) => job.send_msg(msg).await?,
+			None => return Err(Error::JobNotFound(parent_hash)),
+		}
+		Ok(())
+	}
+}
+
+// Note that on drop, we don't have the chance to gracefully spin down each of the remaining handles;
+// we just abort them all. Still better than letting them dangle.
+#[pinned_drop]
+impl<Spawner, Job: JobTrait> PinnedDrop for Jobs<Spawner, Job> {
+	fn drop(self: Pin<&mut Self>) {
+		for job_handle in self.running.values() {
+			job_handle.abort_handle.abort();
+		}
+	}
+}
+
+impl<Spawner, Job> Stream for Jobs<Spawner, Job>
+where
+	Spawner: Spawn,
+	Job: JobTrait,
+{
+	type Item = Job::FromJob;
+
+	fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
+		// pin-project the outgoing messages
+		self.project()
+			.outgoing_msgs
+			.poll_next(cx)
+			.map(|opt| opt.and_then(|(stream_yield, _)| match stream_yield {
+				StreamYield::Item(msg) => Some(msg),
+				StreamYield::Finished(_) => None,
+		}))
+	}
+}
+
+/// A basic implementation of a subsystem.
+///
+/// This struct is responsible for handling message traffic between
+/// this subsystem and the overseer. It spawns and kills jobs on the
+/// appropriate Overseer messages, and dispatches standard traffic to
+/// the appropriate job the rest of the time.
+pub struct JobManager<Spawner, Context, Job: JobTrait> {
+	spawner: Spawner,
+	run_args: Job::RunArgs,
+	context: std::marker::PhantomData<Context>,
+	job: std::marker::PhantomData<Job>,
+}
+
+impl<Spawner, Context, Job> JobManager<Spawner, Context, Job>
+where
+	Spawner: Spawn + Clone + Send + Unpin,
+	Context: SubsystemContext,
+	Job: JobTrait,
+	Job::RunArgs: Clone,
+	Job::ToJob: TryFrom<AllMessages> + TryFrom<<Context as SubsystemContext>::Message> + Sync,
+{
+	/// Creates a new `Subsystem`.
+	pub fn new(spawner: Spawner, run_args: Job::RunArgs) -> Self {
+		Self {
+			spawner,
+			run_args,
+			context: std::marker::PhantomData,
+			job: std::marker::PhantomData,
+		}
+	}
+
+	/// Run this subsystem
+	///
+	/// Conceptually, this is very simple: it just loops forever.
+	///
+	/// - On incoming overseer messages, it starts or stops jobs as appropriate.
+	/// - On other incoming messages, if they can be converted into Job::ToJob and
+	///   include a hash, then they're forwarded to the appropriate individual job.
+	/// - On outgoing messages from the jobs, it forwards them to the overseer.
+	pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner) {
+		let mut jobs = Jobs::new(spawner.clone());
+
+		loop {
+			select! {
+				incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args).await { break },
+				outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx).await { break },
+				complete => break,
+			}
+		}
+	}
+
+	// handle an incoming message. return true if we should break afterwards.
+	async fn handle_incoming(
+		incoming: SubsystemResult<FromOverseer<Context::Message>>,
+		jobs: &mut Jobs<Spawner, Job>,
+		run_args: &Job::RunArgs,
+	) -> bool {
+		use crate::FromOverseer::{Communication, Signal};
+		use crate::OverseerSignal::{Conclude, StartWork, StopWork};
+
+		match incoming {
+			Ok(Signal(StartWork(hash))) => {
+				if let Err(e) = jobs.spawn_job(hash, run_args.clone()) {
+					log::error!("Failed to spawn a job: {:?}", e);
+					return true;
+				}
+			}
+			Ok(Signal(StopWork(hash))) => {
+				if let Err(e) = jobs.stop_job(hash).await {
+					log::error!("Failed to stop a job: {:?}", e);
+					return true;
+				}
+			}
+			Ok(Signal(Conclude)) => {
+				// Breaking the loop ends fn run, which drops `jobs`, which immediately drops all ongoing work.
+				// We can afford to wait a little while to shut them all down properly before doing that.
+				//
+				// Forwarding the stream to a drain means we wait until all of the items in the stream
+				// have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`.
+				use futures::stream::StreamExt;
+				use futures::stream::FuturesUnordered;
+
+				let unordered = jobs.running
+					.drain()
+					.map(|(_, handle)| handle.stop())
+					.collect::<FuturesUnordered<_>>();
+				// now wait for all the futures to complete; collect a vector of their results
+				// this is strictly less efficient than draining them into oblivion, but this compiles, and that doesn't
+				// https://github.com/paritytech/polkadot/pull/1376#pullrequestreview-446488645
+				let _ = async move { unordered.collect::<Vec<_>>() }.await;
+
+				return true;
+			}
+			Ok(Communication { msg }) => {
+				if let Ok(to_job) = <Job::ToJob>::try_from(msg) {
+					match to_job.relay_parent() {
+						Some(hash) => {
+							if let Err(err) = jobs.send_msg(hash, to_job).await {
+								log::error!("Failed to send a message to a job: {:?}", err);
+								return true;
+							}
+						}
+						None => {
+							if let Err(err) = Job::handle_unanchored_msg(to_job) {
+								log::error!("Failed to handle unhashed message: {:?}", err);
+								return true;
+							}
+						}
+					}
+				}
+			}
+			Err(err) => {
+				log::error!("error receiving message from subsystem context: {:?}", err);
+				return true;
+			}
+		}
+		false
+	}
+
+	// handle an outgoing message. return true if we should break afterwards.
+	async fn handle_outgoing(outgoing: Option<Job::FromJob>, ctx: &mut Context) -> bool {
+		match outgoing {
+			Some(msg) => {
+				// discard errors when sending the message upstream
+				let _ = ctx.send_message(msg.into()).await;
+			}
+			None => return true,
+		}
+		false
+	}
+}
+
+impl<Spawner, Context, Job> Subsystem<Context> for JobManager<Spawner, Context, Job>
+where
+	Spawner: Spawn + Send + Clone + Unpin + 'static,
+	Context: SubsystemContext,
+	<Context as SubsystemContext>::Message: Into<Job::ToJob>,
+	Job: JobTrait + Send,
+	Job::RunArgs: Clone + Sync,
+	Job::ToJob: TryFrom<AllMessages> + Sync,
+{
+	fn start(self, ctx: Context) -> SpawnedSubsystem {
+		let spawner = self.spawner.clone();
+		let run_args = self.run_args.clone();
+
+		SpawnedSubsystem(Box::pin(async move {
+			Self::run(ctx, run_args, spawner).await;
+		}))
+	}
+}