diff --git a/polkadot/.gitlab-ci.yml b/polkadot/.gitlab-ci.yml
index 11a9e42d8037df6e1b8cdfc76f5d8dceaa141b31..726d9879b4d62a53c9b6488cde8685de210ab4f1 100644
--- a/polkadot/.gitlab-ci.yml
+++ b/polkadot/.gitlab-ci.yml
@@ -208,7 +208,7 @@ generate-impl-guide:
   <<:                              *rules-test
   <<:                              *docker-env
   image:
-    name: michaelfbryan/mdbook-docker-image:latest
+    name: michaelfbryan/mdbook-docker-image:v0.4.4
     entrypoint: [""]
   script:
     - mdbook build roadmap/implementers-guide
diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 7ae0baa81ed655bd41e4dd06bf3e75709a274320..3f44da84ade4814f94c44b34537fe025ebf3b8a5 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -5153,6 +5153,7 @@ version = "0.1.0"
 dependencies = [
  "assert_matches",
  "futures 0.3.12",
+ "futures-timer 3.0.2",
  "lru",
  "maplit",
  "parity-scale-codec",
@@ -5166,6 +5167,7 @@ dependencies = [
  "rand 0.8.3",
  "sc-keystore",
  "sc-network",
+ "smallvec 1.6.1",
  "sp-application-crypto",
  "sp-core",
  "sp-keyring",
diff --git a/polkadot/node/network/availability-distribution/Cargo.toml b/polkadot/node/network/availability-distribution/Cargo.toml
index 5483d87deff74d56a51bb9a583dda150aa922504..fc0f2574e4243a66f75732ccf005c6c1a4c1221f 100644
--- a/polkadot/node/network/availability-distribution/Cargo.toml
+++ b/polkadot/node/network/availability-distribution/Cargo.toml
@@ -29,5 +29,7 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master
 sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
+futures-timer = "3.0.2"
 assert_matches = "1.4.0"
 maplit = "1.0"
+smallvec = "1.6.1"
diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs
index dbe3ad56db16305c22fed8f06655c9dffee451e0..3206d20c8231ab958dee4d226a20e72a7afe2a95 100644
--- a/polkadot/node/network/availability-distribution/src/error.rs
+++ b/polkadot/node/network/availability-distribution/src/error.rs
@@ -57,6 +57,10 @@ pub enum Error {
 	/// We tried accessing a session that was not cached.
 	#[error("Session is not cached.")]
 	NoSuchCachedSession,
+	
+	/// We tried reporting bad validators, although we are not a validator ourselves.
+	#[error("Not a validator.")]
+	NotAValidator,
 
 	/// Requester stream exhausted.
 	#[error("Erasure chunk requester stream exhausted")]
diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs
index 4e8683a0920ee5c70f9465005a1e012238b4689b..a447204880e19fd2b41ad2a1f6eee995c12bcdff 100644
--- a/polkadot/node/network/availability-distribution/src/lib.rs
+++ b/polkadot/node/network/availability-distribution/src/lib.rs
@@ -43,6 +43,9 @@ mod metrics;
 /// Prometheus `Metrics` for availability distribution.
 pub use metrics::Metrics;
 
+#[cfg(test)]
+mod tests;
+
 const LOG_TARGET: &'static str = "availability_distribution";
 
 /// The availability distribution subsystem.
diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs
index b4254850563c0dae012787255e20b9f829bc5aa8..1f12000621c2671b2fb31379ef78d1e14eead7cc 100644
--- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs
+++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs
@@ -15,7 +15,7 @@
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
 use std::collections::HashMap;
-use std::sync::Arc;
+
 
 use parity_scale_codec::Encode;
 
@@ -23,15 +23,15 @@ use futures::channel::{mpsc, oneshot};
 use futures::{executor, Future, FutureExt, StreamExt, select};
 use futures::task::{Poll, Context, noop_waker};
 
-use polkadot_erasure_coding::{obtain_chunks_v1 as obtain_chunks, branches};
 use sc_network as network;
 use sp_keyring::Sr25519Keyring;
 
-use polkadot_primitives::v1::{AvailableData, BlockData, CandidateHash, HeadData, PersistedValidationData, PoV, ValidatorIndex};
+use polkadot_primitives::v1::{BlockData, CandidateHash, PoV, ValidatorIndex};
 use polkadot_node_network_protocol::request_response::v1;
 use polkadot_subsystem::messages::AllMessages;
 
 use crate::metrics::Metrics;
+use crate::tests::mock::get_valid_chunk_data;
 use super::*;
 
 #[test]
@@ -74,7 +74,10 @@ fn task_does_not_accept_invalid_chunk() {
 #[test]
 fn task_stores_valid_chunk() {
 	let (mut task, rx) = get_test_running_task();
-	let (root_hash, chunk) = get_valid_chunk_data();
+	let pov = PoV {
+		block_data: BlockData(vec![45, 46, 47]),
+	};
+	let (root_hash, chunk) = get_valid_chunk_data(pov);
 	task.erasure_root = root_hash;
 	task.request.index = chunk.index;
 
@@ -107,7 +110,10 @@ fn task_stores_valid_chunk() {
 #[test]
 fn task_does_not_accept_wrongly_indexed_chunk() {
 	let (mut task, rx) = get_test_running_task();
-	let (root_hash, chunk) = get_valid_chunk_data();
+	let pov = PoV {
+		block_data: BlockData(vec![45, 46, 47]),
+	};
+	let (root_hash, chunk) = get_valid_chunk_data(pov);
 	task.erasure_root = root_hash;
 	task.request.index = ValidatorIndex(chunk.index.0+1);
 
@@ -137,7 +143,10 @@ fn task_does_not_accept_wrongly_indexed_chunk() {
 #[test]
 fn task_stores_valid_chunk_if_there_is_one() {
 	let (mut task, rx) = get_test_running_task();
-	let (root_hash, chunk) = get_valid_chunk_data();
+	let pov = PoV {
+		block_data: BlockData(vec![45, 46, 47]),
+	};
+	let (root_hash, chunk) = get_valid_chunk_data(pov);
 	task.erasure_root = root_hash;
 	task.request.index = chunk.index;
 
@@ -287,29 +296,3 @@ fn get_test_running_task() -> (RunningTask, mpsc::Receiver<FromFetchTask>) {
 	)
 }
 
-fn get_valid_chunk_data() -> (Hash, ErasureChunk) {
-	let fake_validator_count = 10;
-	let persisted = PersistedValidationData {
-		parent_head: HeadData(vec![7, 8, 9]),
-		relay_parent_number: Default::default(),
-		max_pov_size: 1024,
-		relay_parent_storage_root: Default::default(),
-	};
-	let pov_block = PoV {
-		block_data: BlockData(vec![45, 46, 47]),
-	};
-	let available_data = AvailableData {
-		validation_data: persisted, pov: Arc::new(pov_block),
-	};
-	let chunks = obtain_chunks(fake_validator_count, &available_data).unwrap();
-	let branches = branches(chunks.as_ref());
-	let root = branches.root();
-	let chunk = branches.enumerate()
-			.map(|(index, (proof, chunk))| ErasureChunk {
-				chunk: chunk.to_vec(),
-				index: ValidatorIndex(index as _),
-				proof,
-			})
-			.next().expect("There really should be 10 chunks.");
-	(root, chunk)
-}
diff --git a/polkadot/node/network/availability-distribution/src/session_cache.rs b/polkadot/node/network/availability-distribution/src/session_cache.rs
index 395d2ae78384ba516cbf4db56323b16f27882f14..15f5cad3c757495dcf554a6d5d4df9abf47725ce 100644
--- a/polkadot/node/network/availability-distribution/src/session_cache.rs
+++ b/polkadot/node/network/availability-distribution/src/session_cache.rs
@@ -54,7 +54,10 @@ pub struct SessionCache {
 	/// to get any existing cache entry, before fetching new information, as we should not mess up
 	/// the order of validators in `SessionInfo::validator_groups`. (We want live TCP connections
 	/// wherever possible.)
-	session_info_cache: LruCache<SessionIndex, SessionInfo>,
+	///
+	/// We store `None` in case we are not a validator, so we won't do needless fetches for non
+	/// validator nodes.
+	session_info_cache: LruCache<SessionIndex, Option<SessionInfo>>,
 
 	/// Key store for determining whether we are a validator and what `ValidatorIndex` we have.
 	keystore: SyncCryptoStorePtr,
@@ -134,19 +137,31 @@ impl SessionCache {
 			}
 		};
 
-		if let Some(info) = self.session_info_cache.get(&session_index) {
-			return Ok(Some(with_info(info)));
+		if let Some(o_info) = self.session_info_cache.get(&session_index) {
+			tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
+			if let Some(info) = o_info {
+				return Ok(Some(with_info(info)));
+			} else {
+				// Info was cached - we are not a validator: return early:
+				return Ok(None)
+			}
 		}
 
 		if let Some(info) = self
 			.query_info_from_runtime(ctx, parent, session_index)
 			.await?
 		{
+			tracing::trace!(target: LOG_TARGET, session_index, "Calling `with_info`");
 			let r = with_info(&info);
-			self.session_info_cache.put(session_index, info);
-			return Ok(Some(r));
+			tracing::trace!(target: LOG_TARGET, session_index, "Storing session info in lru!");
+			self.session_info_cache.put(session_index, Some(info));
+			Ok(Some(r))
+		} else {
+			// Avoid needless fetches if we are not a validator:
+			self.session_info_cache.put(session_index, None);
+			tracing::trace!(target: LOG_TARGET, session_index, "No session info found!");
+			Ok(None)
 		}
-		Ok(None)
 	}
 
 	/// Variant of `report_bad` that never fails, but just logs errors.
@@ -172,7 +187,9 @@ impl SessionCache {
 		let session = self
 			.session_info_cache
 			.get_mut(&report.session_index)
-			.ok_or(Error::NoSuchCachedSession)?;
+			.ok_or(Error::NoSuchCachedSession)?
+			.as_mut()
+			.ok_or(Error::NotAValidator)?;
 		let group = session
 			.validator_groups
 			.get_mut(report.group_index.0 as usize)
@@ -194,6 +211,8 @@ impl SessionCache {
 	/// We need to pass in the relay parent for our call to `request_session_info_ctx`. We should
 	/// actually don't need that: I suppose it is used for internal caching based on relay parents,
 	/// which we don't use here. It should not do any harm though.
+	///
+	/// Returns: `None` if not a validator.
 	async fn query_info_from_runtime<Context>(
 		&self,
 		ctx: &mut Context,
diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs
deleted file mode 100644
index 473e47899438f9bb6fa354e0f00bb609f75f89c7..0000000000000000000000000000000000000000
--- a/polkadot/node/network/availability-distribution/src/tests.rs
+++ /dev/null
@@ -1,1278 +0,0 @@
-// Copyright 2020 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
-
-use super::*;
-use assert_matches::assert_matches;
-use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
-use polkadot_node_network_protocol::{view, ObservedRole, our_view};
-use polkadot_node_subsystem_util::TimeoutExt;
-use polkadot_primitives::v1::{
-	AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex,
-	GroupRotationInfo, HeadData, OccupiedCore, PersistedValidationData, PoV, ScheduledCore, Id as ParaId,
-	CommittedCandidateReceipt,
-};
-use polkadot_subsystem_testhelpers as test_helpers;
-
-use futures::{executor, future, Future};
-use sc_keystore::LocalKeystore;
-use sp_application_crypto::AppKey;
-use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
-use sp_keyring::Sr25519Keyring;
-use std::{sync::Arc, time::Duration};
-use maplit::hashmap;
-
-macro_rules! view {
-	( $( $hash:expr ),* $(,)? ) => {
-		// Finalized number unimportant for availability distribution.
-		View::new(vec![ $( $hash.clone() ),* ], 0)
-	};
-}
-
-fn chunk_protocol_message(
-	message: AvailabilityGossipMessage,
-) -> protocol_v1::AvailabilityDistributionMessage {
-	protocol_v1::AvailabilityDistributionMessage::Chunk(
-		message.candidate_hash,
-		message.erasure_chunk,
-	)
-}
-
-fn make_per_candidate() -> PerCandidate {
-	PerCandidate {
-		live_in: HashSet::new(),
-		message_vault: HashMap::new(),
-		received_messages: HashMap::new(),
-		sent_messages: HashMap::new(),
-		validators: Vec::new(),
-		validator_index: None,
-		descriptor: Default::default(),
-		span: jaeger::Span::Disabled,
-	}
-}
-
-struct TestHarness {
-	virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
-}
-
-fn test_harness<T: Future<Output = ()>>(
-	keystore: SyncCryptoStorePtr,
-	test_fx: impl FnOnce(TestHarness) -> T,
-) -> ProtocolState {
-	sp_tracing::try_init_simple();
-
-	let pool = sp_core::testing::TaskExecutor::new();
-	let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
-
-	let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default());
-	let mut state = ProtocolState::default();
-	{
-		let subsystem = subsystem.run_inner(context, &mut state);
-
-		let test_fut = test_fx(TestHarness { virtual_overseer });
-
-		futures::pin_mut!(test_fut);
-		futures::pin_mut!(subsystem);
-
-		executor::block_on(future::select(test_fut, subsystem));
-	}
-
-	state
-}
-
-async fn overseer_send(
-	overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
-	msg: impl Into<AvailabilityDistributionMessage>,
-) {
-	let msg = msg.into();
-	tracing::trace!(msg = ?msg, "sending message");
-	overseer.send(FromOverseer::Communication { msg }).await
-}
-
-async fn overseer_recv(
-	overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
-) -> AllMessages {
-	tracing::trace!("waiting for message ...");
-	let msg = overseer.recv().await;
-	tracing::trace!(msg = ?msg, "received message");
-	msg
-}
-
-fn occupied_core_from_candidate(receipt: &CommittedCandidateReceipt) -> CoreState {
-	CoreState::Occupied(OccupiedCore {
-		next_up_on_available: None,
-		occupied_since: 0,
-		time_out_at: 5,
-		next_up_on_time_out: None,
-		availability: Default::default(),
-		group_responsible: GroupIndex::from(0),
-		candidate_hash: receipt.hash(),
-		candidate_descriptor: receipt.descriptor().clone(),
-	})
-}
-
-#[derive(Clone)]
-struct TestState {
-	chain_ids: Vec<ParaId>,
-	validators: Vec<Sr25519Keyring>,
-	validator_public: Vec<ValidatorId>,
-	validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
-	head_data: HashMap<ParaId, HeadData>,
-	keystore: SyncCryptoStorePtr,
-	relay_parent: Hash,
-	ancestors: Vec<Hash>,
-	availability_cores: Vec<CoreState>,
-	persisted_validation_data: PersistedValidationData,
-	candidates: Vec<CommittedCandidateReceipt>,
-	pov_blocks: Vec<PoV>,
-}
-
-fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
-	val_ids.iter().map(|v| v.public().into()).collect()
-}
-
-impl Default for TestState {
-	fn default() -> Self {
-		let chain_a = ParaId::from(1);
-		let chain_b = ParaId::from(2);
-
-		let chain_ids = vec![chain_a, chain_b];
-
-		let validators = vec![
-			Sr25519Keyring::Ferdie, // <- this node, role: validator
-			Sr25519Keyring::Alice,
-			Sr25519Keyring::Bob,
-			Sr25519Keyring::Charlie,
-			Sr25519Keyring::Dave,
-		];
-
-		let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
-
-		SyncCryptoStore::sr25519_generate_new(
-			&*keystore,
-			ValidatorId::ID,
-			Some(&validators[0].to_seed()),
-		)
-		.expect("Insert key into keystore");
-
-		let validator_public = validator_pubkeys(&validators);
-
-		let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]];
-		let group_rotation_info = GroupRotationInfo {
-			session_start_block: 0,
-			group_rotation_frequency: 100,
-			now: 1,
-		};
-		let validator_groups = (validator_groups, group_rotation_info);
-
-		let availability_cores = vec![
-			CoreState::Scheduled(ScheduledCore {
-				para_id: chain_ids[0],
-				collator: None,
-			}),
-			CoreState::Scheduled(ScheduledCore {
-				para_id: chain_ids[1],
-				collator: None,
-			}),
-		];
-
-		let mut head_data = HashMap::new();
-		head_data.insert(chain_a, HeadData(vec![4, 5, 6]));
-		head_data.insert(chain_b, HeadData(vec![7, 8, 9]));
-
-		let ancestors = vec![
-			Hash::repeat_byte(0x44),
-			Hash::repeat_byte(0x33),
-			Hash::repeat_byte(0x22),
-		];
-		let relay_parent = Hash::repeat_byte(0x05);
-
-		let persisted_validation_data = PersistedValidationData {
-			parent_head: HeadData(vec![7, 8, 9]),
-			relay_parent_number: Default::default(),
-			max_pov_size: 1024,
-			relay_parent_storage_root: Default::default(),
-		};
-
-		let pov_block_a = PoV {
-			block_data: BlockData(vec![42, 43, 44]),
-		};
-
-		let pov_block_b = PoV {
-			block_data: BlockData(vec![45, 46, 47]),
-		};
-
-		let candidates = vec![
-			TestCandidateBuilder {
-				para_id: chain_ids[0],
-				relay_parent: relay_parent,
-				pov_hash: pov_block_a.hash(),
-				erasure_root: make_erasure_root(persisted_validation_data.clone(), validators.len(), pov_block_a.clone()),
-				head_data: head_data.get(&chain_ids[0]).unwrap().clone(),
-				..Default::default()
-			}
-			.build(),
-			TestCandidateBuilder {
-				para_id: chain_ids[1],
-				relay_parent: relay_parent,
-				pov_hash: pov_block_b.hash(),
-				erasure_root: make_erasure_root(persisted_validation_data.clone(), validators.len(), pov_block_b.clone()),
-				head_data: head_data.get(&chain_ids[1]).unwrap().clone(),
-				..Default::default()
-			}
-			.build(),
-		];
-
-		let pov_blocks = vec![pov_block_a, pov_block_b];
-
-		Self {
-			chain_ids,
-			keystore,
-			validators,
-			validator_public,
-			validator_groups,
-			availability_cores,
-			head_data,
-			persisted_validation_data,
-			relay_parent,
-			ancestors,
-			candidates,
-			pov_blocks,
-		}
-	}
-}
-
-fn make_available_data(validation_data: PersistedValidationData, pov: PoV) -> AvailableData {
-	AvailableData {
-		validation_data,
-		pov: Arc::new(pov),
-	}
-}
-
-fn make_erasure_root(peristed: PersistedValidationData, validator_count: usize, pov: PoV) -> Hash {
-	let available_data = make_available_data(peristed, pov);
-
-	let chunks = obtain_chunks(validator_count, &available_data).unwrap();
-	branches(&chunks).root()
-}
-
-fn make_erasure_chunks(peristed: PersistedValidationData, validator_count: usize, pov: PoV) -> Vec<ErasureChunk> {
-	let available_data = make_available_data(peristed, pov);
-
-	derive_erasure_chunks_with_proofs(validator_count, &available_data)
-}
-
-fn make_valid_availability_gossip(
-	test: &TestState,
-	candidate: usize,
-	erasure_chunk_index: u32,
-) -> AvailabilityGossipMessage {
-	let erasure_chunks = make_erasure_chunks(
-		test.persisted_validation_data.clone(),
-		test.validator_public.len(),
-		test.pov_blocks[candidate].clone(),
-	);
-
-	let erasure_chunk: ErasureChunk = erasure_chunks
-		.get(erasure_chunk_index as usize)
-		.expect("Must be valid or input is oob")
-		.clone();
-
-	AvailabilityGossipMessage {
-		candidate_hash: test.candidates[candidate].hash(),
-		erasure_chunk,
-	}
-}
-
-#[derive(Default)]
-struct TestCandidateBuilder {
-	para_id: ParaId,
-	head_data: HeadData,
-	pov_hash: Hash,
-	relay_parent: Hash,
-	erasure_root: Hash,
-}
-
-impl TestCandidateBuilder {
-	fn build(self) -> CommittedCandidateReceipt {
-		CommittedCandidateReceipt {
-			descriptor: CandidateDescriptor {
-				para_id: self.para_id,
-				pov_hash: self.pov_hash,
-				relay_parent: self.relay_parent,
-				erasure_root: self.erasure_root,
-				..Default::default()
-			},
-			commitments: CandidateCommitments {
-				head_data: self.head_data,
-				..Default::default()
-			},
-		}
-	}
-}
-
-#[test]
-fn helper_integrity() {
-	let test_state = TestState::default();
-
-	let message = make_valid_availability_gossip(
-		&test_state,
-		0,
-		2,
-	);
-
-	let root = &test_state.candidates[0].descriptor.erasure_root;
-
-	let anticipated_hash = branch_hash(
-		root,
-		&message.erasure_chunk.proof,
-		dbg!(message.erasure_chunk.index as usize),
-	)
-	.expect("Must be able to derive branch hash");
-	assert_eq!(
-		anticipated_hash,
-		BlakeTwo256::hash(&message.erasure_chunk.chunk)
-	);
-}
-
-fn derive_erasure_chunks_with_proofs(
-	n_validators: usize,
-	available_data: &AvailableData,
-) -> Vec<ErasureChunk> {
-	let chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();
-
-	// create proofs for each erasure chunk
-	let branches = branches(chunks.as_ref());
-
-	let erasure_chunks = branches
-		.enumerate()
-		.map(|(index, (proof, chunk))| ErasureChunk {
-			chunk: chunk.to_vec(),
-			index: index as _,
-			proof,
-		})
-		.collect::<Vec<ErasureChunk>>();
-
-	erasure_chunks
-}
-
-async fn expect_chunks_network_message(
-	virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
-	peers: &[Vec<PeerId>],
-	candidates: &[CandidateHash],
-	chunks: &[ErasureChunk],
-) {
-	if chunks.is_empty() { return }
-
-	assert_matches!(
-		overseer_recv(virtual_overseer).await,
-		AllMessages::NetworkBridge(
-			NetworkBridgeMessage::SendValidationMessages(msgs)
-		) => {
-			assert_eq!(msgs.len(), chunks.len());
-			for (send_peers, msg) in msgs {
-				assert_matches!(
-					msg,
-					protocol_v1::ValidationProtocol::AvailabilityDistribution(
-						protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk)
-					) => {
-						let i = chunks.iter().position(|c| c == &send_chunk).unwrap();
-						assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate));
-						assert_eq!(&peers[i], &send_peers);
-					}
-				);
-			}
-		}
-	)
-}
-
-async fn change_our_view(
-	virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
-	view: OurView,
-	validator_public: &[ValidatorId],
-	ancestors: Vec<Hash>,
-	session_per_relay_parent: HashMap<Hash, SessionIndex>,
-	availability_cores_per_relay_parent: HashMap<Hash, Vec<CoreState>>,
-	data_availability: HashMap<CandidateHash, bool>,
-	chunk_data_per_candidate: HashMap<CandidateHash, (PoV, PersistedValidationData)>,
-	send_chunks_to: HashMap<CandidateHash, Vec<PeerId>>,
-) {
-	overseer_send(virtual_overseer, NetworkBridgeEvent::OurViewChange(view.clone())).await;
-
-	// obtain the validators per relay parent
-	assert_matches!(
-		overseer_recv(virtual_overseer).await,
-		AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-			relay_parent,
-			RuntimeApiRequest::Validators(tx),
-		)) => {
-			assert!(view.contains(&relay_parent));
-			tx.send(Ok(validator_public.to_vec())).unwrap();
-		}
-	);
-
-	// query of k ancestors, we only provide one
-	assert_matches!(
-		overseer_recv(virtual_overseer).await,
-		AllMessages::ChainApi(ChainApiMessage::Ancestors {
-			hash: relay_parent,
-			k,
-			response_channel: tx,
-		}) => {
-			assert!(view.contains(&relay_parent));
-			assert_eq!(k, AvailabilityDistributionSubsystem::K + 1);
-			tx.send(Ok(ancestors.clone())).unwrap();
-		}
-	);
-
-	for _ in 0..session_per_relay_parent.len() {
-		assert_matches!(
-			overseer_recv(virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				relay_parent,
-				RuntimeApiRequest::SessionIndexForChild(tx)
-			)) => {
-				let index = session_per_relay_parent.get(&relay_parent)
-					.expect(&format!("Session index for relay parent {:?} does not exist", relay_parent));
-				tx.send(Ok(*index)).unwrap();
-			}
-		);
-	}
-
-	for _ in 0..availability_cores_per_relay_parent.len() {
-		assert_matches!(
-			overseer_recv(virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				relay_parent,
-				RuntimeApiRequest::AvailabilityCores(tx)
-			)) => {
-				let cores = availability_cores_per_relay_parent.get(&relay_parent)
-					.expect(&format!("Availability core for relay parent {:?} does not exist", relay_parent));
-
-				tx.send(Ok(cores.clone())).unwrap();
-			}
-		);
-	}
-
-	let mut send_peers = Vec::new();
-	let mut send_chunks = Vec::new();
-	let mut candidates = Vec::new();
-	for _ in 0..data_availability.len() {
-		let (available, candidate_hash) = assert_matches!(
-			overseer_recv(virtual_overseer).await,
-			AllMessages::AvailabilityStore(
-				AvailabilityStoreMessage::QueryDataAvailability(
-					candidate_hash,
-					tx,
-				)
-			) => {
-				let available = data_availability.get(&candidate_hash)
-					.expect(&format!("No data availability for candidate {:?}", candidate_hash));
-
-				tx.send(*available).unwrap();
-				(available, candidate_hash)
-			}
-		);
-
-		if !available {
-			continue;
-		}
-
-		candidates.push(candidate_hash);
-		if let Some((pov, persisted)) = chunk_data_per_candidate.get(&candidate_hash) {
-			let chunks = make_erasure_chunks(persisted.clone(), validator_public.len(), pov.clone());
-
-			for _ in 0..chunks.len() {
-				let chunk = assert_matches!(
-					overseer_recv(virtual_overseer).await,
-					AllMessages::AvailabilityStore(
-						AvailabilityStoreMessage::QueryChunk(
-							candidate_hash,
-							index,
-							tx,
-						)
-					) => {
-						tracing::trace!("Query chunk {} for candidate {:?}", index, candidate_hash);
-						let chunk = chunks[index as usize].clone();
-						tx.send(Some(chunk.clone())).unwrap();
-						chunk
-					}
-				);
-
-				if let Some(peers) = send_chunks_to.get(&candidate_hash) {
-					send_peers.push(peers.clone());
-					send_chunks.push(chunk);
-				}
-			}
-
-		}
-	}
-
-	expect_chunks_network_message(virtual_overseer, &send_peers, &candidates, &send_chunks).await;
-}
-
-async fn setup_peer_with_view(
-	virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
-	peer: PeerId,
-	view: View,
-) {
-	overseer_send(virtual_overseer, NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full)).await;
-
-	overseer_send(virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer, view)).await;
-}
-
-async fn peer_send_message(
-	virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
-	peer: PeerId,
-	message: AvailabilityGossipMessage,
-	expected_reputation_change: Rep,
-) {
-	overseer_send(virtual_overseer, NetworkBridgeEvent::PeerMessage(peer.clone(), chunk_protocol_message(message))).await;
-
-	assert_matches!(
-		overseer_recv(virtual_overseer).await,
-		AllMessages::NetworkBridge(
-			NetworkBridgeMessage::ReportPeer(
-				rep_peer,
-				rep,
-			)
-		) => {
-			assert_eq!(peer, rep_peer);
-			assert_eq!(expected_reputation_change, rep);
-		}
-	);
-}
-
-#[test]
-fn check_views() {
-	let test_state = TestState::default();
-
-	let peer_a = PeerId::random();
-	let peer_a_2 = peer_a.clone();
-	let peer_b = PeerId::random();
-	let peer_b_2 = peer_b.clone();
-	assert_ne!(&peer_a, &peer_b);
-
-	let keystore = test_state.keystore.clone();
-	let current = test_state.relay_parent;
-	let ancestors = test_state.ancestors.clone();
-
-	let state = test_harness(keystore, move |test_harness| async move {
-		let mut virtual_overseer = test_harness.virtual_overseer;
-
-		let TestState {
-			validator_public,
-			relay_parent: current,
-			ancestors,
-			candidates,
-			pov_blocks,
-			..
-		} = test_state.clone();
-
-		let genesis = Hash::repeat_byte(0xAA);
-		change_our_view(
-			&mut virtual_overseer,
-			our_view![current],
-			&validator_public,
-			vec![ancestors[0], genesis],
-			hashmap! { current => 1, genesis => 1 },
-			hashmap! {
-				ancestors[0] => vec![
-					occupied_core_from_candidate(&candidates[0]),
-					occupied_core_from_candidate(&candidates[1]),
-				],
-				current => vec![
-					CoreState::Occupied(OccupiedCore {
-						next_up_on_available: None,
-						occupied_since: 0,
-						time_out_at: 10,
-						next_up_on_time_out: None,
-						availability: Default::default(),
-						group_responsible: GroupIndex::from(0),
-						candidate_hash: candidates[0].hash(),
-						candidate_descriptor: candidates[0].descriptor().clone(),
-					}),
-					CoreState::Free,
-					CoreState::Free,
-					CoreState::Occupied(OccupiedCore {
-						next_up_on_available: None,
-						occupied_since: 1,
-						time_out_at: 7,
-						next_up_on_time_out: None,
-						availability: Default::default(),
-						group_responsible: GroupIndex::from(0),
-						candidate_hash: candidates[1].hash(),
-						candidate_descriptor: candidates[1].descriptor().clone(),
-					}),
-					CoreState::Free,
-					CoreState::Free,
-				]
-			},
-			hashmap! {
-				candidates[0].hash() => true,
-				candidates[1].hash() => false,
-			},
-			hashmap! {
-				candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone()),
-			},
-			hashmap! {},
-		).await;
-
-		// setup peer a with interest in current
-		setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await;
-
-		// setup peer b with interest in ancestor
-		setup_peer_with_view(&mut virtual_overseer, peer_b.clone(), view![ancestors[0]]).await;
-	});
-
-	assert_matches! {
-		state,
-		ProtocolState {
-			peer_views,
-			view,
-			..
-		} => {
-			assert_eq!(
-				peer_views,
-				hashmap! {
-					peer_a_2 => view![current],
-					peer_b_2 => view![ancestors[0]],
-				},
-			);
-			assert_eq!(view, our_view![current]);
-		}
-	};
-}
-
-#[test]
-fn reputation_verification() {
-	let test_state = TestState::default();
-
-	let peer_a = PeerId::random();
-	let peer_b = PeerId::random();
-	assert_ne!(&peer_a, &peer_b);
-
-	let keystore = test_state.keystore.clone();
-
-	test_harness(keystore, move |test_harness| async move {
-		let mut virtual_overseer = test_harness.virtual_overseer;
-
-		let TestState {
-			relay_parent: current,
-			validator_public,
-			ancestors,
-			candidates,
-			pov_blocks,
-			..
-		} = test_state.clone();
-
-		let valid = make_valid_availability_gossip(
-			&test_state,
-			0,
-			2,
-		);
-
-		change_our_view(
-			&mut virtual_overseer,
-			our_view![current],
-			&validator_public,
-			vec![ancestors[0]],
-			hashmap! { current => 1 },
-			hashmap! {
-				current => vec![
-					occupied_core_from_candidate(&candidates[0]),
-					occupied_core_from_candidate(&candidates[1]),
-				],
-			},
-			hashmap! { candidates[0].hash() => true, candidates[1].hash() => false },
-			hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
-			hashmap! {},
-		).await;
-
-		// valid (first, from b)
-		peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await;
-
-		// valid (duplicate, from b)
-		peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), COST_PEER_DUPLICATE_MESSAGE).await;
-
-		// valid (second, from a)
-		peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await;
-
-		// send the a message again, so we should detect the duplicate
-		peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), COST_PEER_DUPLICATE_MESSAGE).await;
-
-		// peer b sends a message before we have the view
-		// setup peer a with interest in parent x
-		overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerDisconnected(peer_b.clone())).await;
-
-		overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)).await;
-
-		{
-			// send another message
-			let valid = make_valid_availability_gossip(&test_state, 1, 2);
-
-			// Make peer a and b listen on `current`
-			overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current])).await;
-
-			let mut chunks = make_erasure_chunks(
-				test_state.persisted_validation_data.clone(),
-				validator_public.len(),
-				pov_blocks[0].clone(),
-			);
-
-			// Both peers send us this chunk already
-			chunks.remove(2);
-
-			let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
-			expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await;
-
-			overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current])).await;
-
-			let send_peers = chunks.iter().map(|_| vec![peer_b.clone()]).collect::<Vec<_>>();
-			expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await;
-
-			peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
-
-			expect_chunks_network_message(
-				&mut virtual_overseer,
-				&[vec![peer_b.clone()]],
-				&[candidates[1].hash()],
-				&[valid.erasure_chunk.clone()],
-			).await;
-
-			// Let B send the same message
-			peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await;
-		}
-	});
-}
-
-#[test]
-fn not_a_live_candidate_is_detected() {
-	let test_state = TestState::default();
-
-	let peer_a = PeerId::random();
-
-	let keystore = test_state.keystore.clone();
-
-	test_harness(keystore, move |test_harness| async move {
-		let mut virtual_overseer = test_harness.virtual_overseer;
-
-		let TestState {
-			relay_parent: current,
-			validator_public,
-			ancestors,
-			candidates,
-			pov_blocks,
-			..
-		} = test_state.clone();
-
-		change_our_view(
-			&mut virtual_overseer,
-			our_view![current],
-			&validator_public,
-			vec![ancestors[0]],
-			hashmap! { current => 1 },
-			hashmap! {
-				current => vec![
-					occupied_core_from_candidate(&candidates[0]),
-				],
-			},
-			hashmap! { candidates[0].hash() => true },
-			hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
-			hashmap! {},
-		).await;
-
-		let valid = make_valid_availability_gossip(
-			&test_state,
-			1,
-			1,
-		);
-
-		peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), COST_NOT_A_LIVE_CANDIDATE).await;
-	});
-}
-
-#[test]
-fn peer_change_view_before_us() {
-	let test_state = TestState::default();
-
-	let peer_a = PeerId::random();
-
-	let keystore = test_state.keystore.clone();
-
-	test_harness(keystore, move |test_harness| async move {
-		let mut virtual_overseer = test_harness.virtual_overseer;
-
-		let TestState {
-			relay_parent: current,
-			validator_public,
-			ancestors,
-			candidates,
-			pov_blocks,
-			..
-		} = test_state.clone();
-
-		setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await;
-
-		change_our_view(
-			&mut virtual_overseer,
-			our_view![current],
-			&validator_public,
-			vec![ancestors[0]],
-			hashmap! { current => 1 },
-			hashmap! {
-				current => vec![
-					occupied_core_from_candidate(&candidates[0]),
-				],
-			},
-			hashmap! { candidates[0].hash() => true },
-			hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
-			hashmap! { candidates[0].hash() => vec![peer_a.clone()] },
-		).await;
-
-		let valid = make_valid_availability_gossip(
-			&test_state,
-			0,
-			0,
-		);
-
-		// We send peer a all the chunks of candidate0, so we just benefit him for sending a valid message
-		peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await;
-	});
-}
-
-#[test]
-fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() {
-	let test_state = TestState::default();
-
-	let peer_a = PeerId::random();
-
-	let keystore = test_state.keystore.clone();
-
-	test_harness(keystore, move |test_harness| async move {
-		let mut virtual_overseer = test_harness.virtual_overseer;
-
-		let TestState {
-			relay_parent: current,
-			validator_public,
-			ancestors,
-			candidates,
-			pov_blocks,
-			..
-		} = test_state.clone();
-
-		change_our_view(
-			&mut virtual_overseer,
-			our_view![ancestors[0]],
-			&validator_public,
-			vec![ancestors[1]],
-			hashmap! { ancestors[0] => 1 },
-			hashmap! {
-				ancestors[0] => vec![
-					occupied_core_from_candidate(&candidates[0]),
-				],
-			},
-			hashmap! { candidates[0].hash() => true },
-			hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
-			hashmap! {},
-		).await;
-
-		change_our_view(
-			&mut virtual_overseer,
-			our_view![current],
-			&validator_public,
-			vec![ancestors[0]],
-			hashmap! { current => 1 },
-			hashmap! {
-				current => vec![
-					occupied_core_from_candidate(&candidates[0]),
-				],
-			},
-			hashmap! { candidates[0].hash() => true },
-			hashmap! {},
-			hashmap! {},
-		).await;
-
-		// Let peera connect, we should send him all the chunks of the candidate
-		setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await;
-
-		let chunks = make_erasure_chunks(
-			test_state.persisted_validation_data.clone(),
-			validator_public.len(),
-			pov_blocks[0].clone(),
-		);
-		let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
-		expect_chunks_network_message(
-			&mut virtual_overseer,
-			&send_peers,
-			&[candidates[0].hash()],
-			&chunks,
-		).await;
-	});
-}
-
-#[test]
-fn k_ancestors_in_session() {
-	let pool = sp_core::testing::TaskExecutor::new();
-	let (mut ctx, mut virtual_overseer) =
-		test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, _>(pool);
-
-	const DATA: &[(Hash, SessionIndex)] = &[
-		(Hash::repeat_byte(0x32), 3), // relay parent
-		(Hash::repeat_byte(0x31), 3), // grand parent
-		(Hash::repeat_byte(0x30), 3), // great ...
-		(Hash::repeat_byte(0x20), 2),
-		(Hash::repeat_byte(0x12), 1),
-		(Hash::repeat_byte(0x11), 1),
-		(Hash::repeat_byte(0x10), 1),
-	];
-	const K: usize = 5;
-
-	const EXPECTED: &[Hash] = &[DATA[1].0, DATA[2].0];
-
-	let test_fut = async move {
-		assert_matches!(
-			overseer_recv(&mut virtual_overseer).await,
-			AllMessages::ChainApi(ChainApiMessage::Ancestors {
-				hash: relay_parent,
-				k,
-				response_channel: tx,
-			}) => {
-				assert_eq!(k, K+1);
-				assert_eq!(relay_parent, DATA[0].0);
-				tx.send(Ok(DATA[1..=k].into_iter().map(|x| x.0).collect::<Vec<_>>())).unwrap();
-			}
-		);
-
-		// query the desired session index of the relay parent
-		assert_matches!(
-			overseer_recv(&mut virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				relay_parent,
-				RuntimeApiRequest::SessionIndexForChild(tx),
-			)) => {
-				assert_eq!(relay_parent, DATA[0].0);
-				let session: SessionIndex = DATA[0].1;
-				tx.send(Ok(session)).unwrap();
-			}
-		);
-
-		// query ancestors
-		for i in 2usize..=(EXPECTED.len() + 1 + 1) {
-			assert_matches!(
-				overseer_recv(&mut virtual_overseer).await,
-				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					relay_parent,
-					RuntimeApiRequest::SessionIndexForChild(tx),
-				)) => {
-					// query is for ancestor_parent
-					let x = &DATA[i];
-					assert_eq!(relay_parent, x.0);
-					// but needs to yield ancestor_parent's child's session index
-					let x = &DATA[i-1];
-					tx.send(Ok(x.1)).unwrap();
-				}
-			);
-		}
-	};
-
-	let sut = async move {
-		let ancestors = query_up_to_k_ancestors_in_same_session(&mut ctx, DATA[0].0, K)
-			.await
-			.unwrap();
-		assert_eq!(ancestors, EXPECTED.to_vec());
-	};
-
-	futures::pin_mut!(test_fut);
-	futures::pin_mut!(sut);
-
-	executor::block_on(future::join(test_fut, sut).timeout(Duration::from_millis(1000)));
-}
-
-#[test]
-fn clean_up_receipts_cache_unions_ancestors_and_view() {
-	let mut state = ProtocolState::default();
-
-	let hash_a = [0u8; 32].into();
-	let hash_b = [1u8; 32].into();
-	let hash_c = [2u8; 32].into();
-	let hash_d = [3u8; 32].into();
-
-	state.live_under.insert(hash_a, HashSet::new());
-	state.live_under.insert(hash_b, HashSet::new());
-	state.live_under.insert(hash_c, HashSet::new());
-	state.live_under.insert(hash_d, HashSet::new());
-
-	state.per_relay_parent.insert(hash_a, PerRelayParent {
-		ancestors: vec![hash_b],
-		live_candidates: HashSet::new(),
-		span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
-	});
-
-	state.per_relay_parent.insert(hash_c, PerRelayParent {
-		ancestors: Vec::new(),
-		live_candidates: HashSet::new(),
-		span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
-	});
-
-	state.clean_up_live_under_cache();
-
-	assert_eq!(state.live_under.len(), 3);
-	assert!(state.live_under.contains_key(&hash_a));
-	assert!(state.live_under.contains_key(&hash_b));
-	assert!(state.live_under.contains_key(&hash_c));
-	assert!(!state.live_under.contains_key(&hash_d));
-}
-
-#[test]
-fn remove_relay_parent_only_removes_per_candidate_if_final() {
-	let mut state = ProtocolState::default();
-
-	let hash_a = [0u8; 32].into();
-	let hash_b = [1u8; 32].into();
-
-	let candidate_hash_a = CandidateHash([46u8; 32].into());
-
-	state.per_relay_parent.insert(hash_a, PerRelayParent {
-		ancestors: vec![],
-		live_candidates: std::iter::once(candidate_hash_a).collect(),
-		span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
-	});
-
-	state.per_relay_parent.insert(hash_b, PerRelayParent {
-		ancestors: vec![],
-		live_candidates: std::iter::once(candidate_hash_a).collect(),
-		span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
-	});
-
-	state.per_candidate.insert(candidate_hash_a, {
-		let mut per_candidate = make_per_candidate();
-		per_candidate.live_in = vec![hash_a, hash_b].into_iter().collect();
-		per_candidate
-	});
-
-	state.remove_relay_parent(&hash_a);
-
-	assert!(!state.per_relay_parent.contains_key(&hash_a));
-	assert!(!state.per_candidate.get(&candidate_hash_a).unwrap().live_in.contains(&hash_a));
-	assert!(state.per_candidate.get(&candidate_hash_a).unwrap().live_in.contains(&hash_b));
-
-	state.remove_relay_parent(&hash_b);
-
-	assert!(!state.per_relay_parent.contains_key(&hash_b));
-	assert!(!state.per_candidate.contains_key(&candidate_hash_a));
-}
-
-#[test]
-fn add_relay_parent_includes_all_live_candidates() {
-	let relay_parent = [0u8; 32].into();
-
-	let mut state = ProtocolState::default();
-
-	let ancestor_a = [1u8; 32].into();
-
-	let candidate_hash_a = CandidateHash([10u8; 32].into());
-	let candidate_hash_b = CandidateHash([11u8; 32].into());
-
-	state.per_candidate.insert(candidate_hash_b, make_per_candidate());
-
-	let candidates = vec![
-		(candidate_hash_a, FetchedLiveCandidate::Fresh(Default::default())),
-		(candidate_hash_b, FetchedLiveCandidate::Cached),
-	].into_iter().collect();
-
-	state.add_relay_parent(
-		relay_parent,
-		Vec::new(),
-		None,
-		candidates,
-		vec![ancestor_a],
-		PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
-	);
-
-	assert!(
-		state.per_candidate.get(&candidate_hash_a).unwrap().live_in.contains(&relay_parent)
-	);
-	assert!(
-		state.per_candidate.get(&candidate_hash_b).unwrap().live_in.contains(&relay_parent)
-	);
-
-	let per_relay_parent = state.per_relay_parent.get(&relay_parent).unwrap();
-
-	assert!(per_relay_parent.live_candidates.contains(&candidate_hash_a));
-	assert!(per_relay_parent.live_candidates.contains(&candidate_hash_b));
-}
-
-#[test]
-fn query_pending_availability_at_pulls_from_and_updates_receipts() {
-	let hash_a = [0u8; 32].into();
-	let hash_b = [1u8; 32].into();
-
-	let para_a = ParaId::from(1);
-	let para_b = ParaId::from(2);
-	let para_c = ParaId::from(3);
-
-	let make_candidate = |para_id| {
-		let mut candidate = CommittedCandidateReceipt::default();
-		candidate.descriptor.para_id = para_id;
-		candidate.descriptor.relay_parent = [69u8; 32].into();
-		candidate
-	};
-
-	let candidate_a = make_candidate(para_a);
-	let candidate_b = make_candidate(para_b);
-	let candidate_c = make_candidate(para_c);
-
-	let candidate_hash_a = candidate_a.hash();
-	let candidate_hash_b = candidate_b.hash();
-	let candidate_hash_c = candidate_c.hash();
-
-	// receipts has an initial entry for hash_a but not hash_b.
-	let mut receipts = HashMap::new();
-	receipts.insert(hash_a, vec![candidate_hash_a, candidate_hash_b].into_iter().collect());
-
-	let pool = sp_core::testing::TaskExecutor::new();
-
-	let (mut ctx, mut virtual_overseer) =
-		test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, _>(pool);
-
-	let test_fut = async move {
-		let live_candidates = query_pending_availability_at(
-			&mut ctx,
-			vec![hash_a, hash_b],
-			&mut receipts,
-		).await.unwrap();
-
-		// although 'b' is cached from the perspective of hash_a, it gets overwritten when we query what's happening in
-		//
-		assert_eq!(live_candidates.len(), 3);
-		assert_matches!(live_candidates.get(&candidate_hash_a).unwrap(), FetchedLiveCandidate::Cached);
-		assert_matches!(live_candidates.get(&candidate_hash_b).unwrap(), FetchedLiveCandidate::Cached);
-		assert_matches!(live_candidates.get(&candidate_hash_c).unwrap(), FetchedLiveCandidate::Fresh(_));
-
-		assert!(receipts.get(&hash_b).unwrap().contains(&candidate_hash_b));
-		assert!(receipts.get(&hash_b).unwrap().contains(&candidate_hash_c));
-	};
-
-	let answer = async move {
-		// hash_a should be answered out of cache, so we should just have
-		// queried for hash_b.
-		assert_matches!(
-			overseer_recv(&mut virtual_overseer).await,
-			AllMessages::RuntimeApi(
-				RuntimeApiMessage::Request(
-					r,
-					RuntimeApiRequest::AvailabilityCores(tx),
-				)
-			) if r == hash_b => {
-				let _ = tx.send(Ok(vec![
-					CoreState::Occupied(OccupiedCore {
-						next_up_on_available: None,
-						occupied_since: 0,
-						time_out_at: 0,
-						next_up_on_time_out: None,
-						availability: Default::default(),
-						group_responsible: GroupIndex::from(0),
-						candidate_hash: candidate_hash_b,
-						candidate_descriptor: candidate_b.descriptor.clone(),
-					}),
-					CoreState::Occupied(OccupiedCore {
-						next_up_on_available: None,
-						occupied_since: 0,
-						time_out_at: 0,
-						next_up_on_time_out: None,
-						availability: Default::default(),
-						group_responsible: GroupIndex::from(0),
-						candidate_hash: candidate_hash_c,
-						candidate_descriptor: candidate_c.descriptor.clone(),
-					}),
-				]));
-			}
-		);
-	};
-
-	futures::pin_mut!(test_fut);
-	futures::pin_mut!(answer);
-
-	executor::block_on(future::join(test_fut, answer));
-}
-
-#[test]
-fn new_peer_gets_all_chunks_send() {
-	let test_state = TestState::default();
-
-	let peer_a = PeerId::random();
-	let peer_b = PeerId::random();
-	assert_ne!(&peer_a, &peer_b);
-
-	let keystore = test_state.keystore.clone();
-
-	test_harness(keystore, move |test_harness| async move {
-		let mut virtual_overseer = test_harness.virtual_overseer;
-
-		let TestState {
-			relay_parent: current,
-			validator_public,
-			ancestors,
-			candidates,
-			pov_blocks,
-			..
-		} = test_state.clone();
-
-		let valid = make_valid_availability_gossip(
-			&test_state,
-			1,
-			2,
-		);
-
-		change_our_view(
-			&mut virtual_overseer,
-			our_view![current],
-			&validator_public,
-			vec![ancestors[0]],
-			hashmap! { current => 1 },
-			hashmap! {
-				current => vec![
-					occupied_core_from_candidate(&candidates[0]),
-					occupied_core_from_candidate(&candidates[1])
-				],
-			},
-			hashmap! { candidates[0].hash() => true, candidates[1].hash() => false },
-			hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
-			hashmap! {},
-		).await;
-
-		peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
-
-		setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await;
-
-		let mut chunks = make_erasure_chunks(
-			test_state.persisted_validation_data.clone(),
-			validator_public.len(),
-			pov_blocks[0].clone(),
-		);
-
-		chunks.push(valid.erasure_chunk);
-
-		let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
-
-		expect_chunks_network_message(
-			&mut virtual_overseer,
-			&send_peers,
-			&[candidates[0].hash(), candidates[1].hash()],
-			&chunks,
-		).await;
-	});
-}
diff --git a/polkadot/node/network/availability-distribution/src/tests/mock.rs b/polkadot/node/network/availability-distribution/src/tests/mock.rs
new file mode 100644
index 0000000000000000000000000000000000000000..9d9896d8fd59b893bc28d77e21de317070d1c471
--- /dev/null
+++ b/polkadot/node/network/availability-distribution/src/tests/mock.rs
@@ -0,0 +1,148 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+
+//! Helper functions and tools to generate mock data useful for testing this subsystem.
+
+use std::sync::Arc;
+
+use sp_keyring::Sr25519Keyring;
+
+use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
+use polkadot_primitives::v1::{AvailableData, BlockData, CandidateCommitments, CandidateDescriptor,
+	CandidateHash, CommittedCandidateReceipt, ErasureChunk, GroupIndex, Hash, HeadData, Id
+	as ParaId, OccupiedCore, PersistedValidationData, PoV, SessionInfo,
+	ValidatorIndex
+};
+
+/// Create dummy session info with two validator groups.
+pub fn make_session_info() -> SessionInfo {
+		let validators = vec![
+			Sr25519Keyring::Ferdie, // <- this node, role: validator
+			Sr25519Keyring::Alice,
+			Sr25519Keyring::Bob,
+			Sr25519Keyring::Charlie,
+			Sr25519Keyring::Dave,
+			Sr25519Keyring::Eve,
+			Sr25519Keyring::One,
+		];
+
+		let validator_groups: Vec<Vec<ValidatorIndex>> = [vec![5, 0, 3], vec![1, 6, 2, 4]]
+			.iter().map(|g| g.into_iter().map(|v| ValidatorIndex(*v)).collect()).collect();
+
+		SessionInfo {
+			discovery_keys: validators.iter().map(|k| k.public().into()).collect(),
+			// Not used:
+			n_cores: validator_groups.len() as u32,
+			validator_groups,
+			// Not used values:
+			validators: validators.iter().map(|k| k.public().into()).collect(),
+			assignment_keys: Vec::new(),
+			zeroth_delay_tranche_width: 0,
+			relay_vrf_modulo_samples: 0,
+			n_delay_tranches: 0,
+			no_show_slots: 0,
+			needed_approvals: 0,
+		}
+}
+
+/// Builder for constructing occupied cores.
+///
+/// Takes all the values we care about and fills the rest with dummy values on `build`.
+pub struct OccupiedCoreBuilder {
+	pub group_responsible: GroupIndex,
+	pub para_id: ParaId,
+	pub relay_parent: Hash,
+}
+
+impl OccupiedCoreBuilder {
+	pub fn build(self) -> (OccupiedCore, (CandidateHash, ErasureChunk)) {
+		let pov = PoV {
+			block_data: BlockData(vec![45, 46, 47]),
+		};
+		let pov_hash = pov.hash();
+		let (erasure_root, chunk) = get_valid_chunk_data(pov.clone());
+		let candidate_receipt = TestCandidateBuilder {
+			para_id: self.para_id,
+			pov_hash,
+			relay_parent: self.relay_parent,
+			erasure_root,
+			..Default::default()
+		}.build();
+		let core = OccupiedCore {
+			next_up_on_available: None,
+			occupied_since: 0,
+			time_out_at: 0,
+			next_up_on_time_out: None,
+			availability: Default::default(),
+			group_responsible: self.group_responsible,
+			candidate_hash: candidate_receipt.hash(),
+			candidate_descriptor: candidate_receipt.descriptor().clone(),
+		};
+		(core, (candidate_receipt.hash(), chunk))
+	}
+}
+
+#[derive(Default)]
+pub struct TestCandidateBuilder {
+	para_id: ParaId,
+	head_data: HeadData,
+	pov_hash: Hash,
+	relay_parent: Hash,
+	erasure_root: Hash,
+}
+
+impl TestCandidateBuilder {
+	pub fn build(self) -> CommittedCandidateReceipt {
+		CommittedCandidateReceipt {
+			descriptor: CandidateDescriptor {
+				para_id: self.para_id,
+				pov_hash: self.pov_hash,
+				relay_parent: self.relay_parent,
+				erasure_root: self.erasure_root,
+				..Default::default()
+			},
+			commitments: CandidateCommitments {
+				head_data: self.head_data,
+				..Default::default()
+			},
+		}
+	}
+}
+
+pub fn get_valid_chunk_data(pov: PoV) -> (Hash, ErasureChunk) {
+	let fake_validator_count = 10;
+	let persisted = PersistedValidationData {
+		parent_head: HeadData(vec![7, 8, 9]),
+		relay_parent_number: Default::default(),
+		max_pov_size: 1024,
+		relay_parent_storage_root: Default::default(),
+	};
+	let available_data = AvailableData {
+		validation_data: persisted, pov: Arc::new(pov),
+	};
+	let chunks = obtain_chunks(fake_validator_count, &available_data).unwrap();
+	let branches = branches(chunks.as_ref());
+	let root = branches.root();
+	let chunk = branches.enumerate()
+			.map(|(index, (proof, chunk))| ErasureChunk {
+				chunk: chunk.to_vec(),
+				index: ValidatorIndex(index as _),
+				proof,
+			})
+			.next().expect("There really should be 10 chunks.");
+	(root, chunk)
+}
diff --git a/polkadot/node/network/availability-distribution/src/tests/mod.rs b/polkadot/node/network/availability-distribution/src/tests/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..be9d4c7d1e69be655ea06ea8033b81411ab8e261
--- /dev/null
+++ b/polkadot/node/network/availability-distribution/src/tests/mod.rs
@@ -0,0 +1,63 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use futures::{executor, future, Future};
+
+use sp_keystore::SyncCryptoStorePtr;
+
+use polkadot_subsystem_testhelpers as test_helpers;
+
+use super::*;
+
+mod state;
+/// State for test harnesses.
+use state::{TestState, TestHarness};
+
+/// Mock data useful for testing.
+pub(crate) mod mock;
+
+fn test_harness<T: Future<Output = ()>>(
+	keystore: SyncCryptoStorePtr,
+	test_fx: impl FnOnce(TestHarness) -> T,
+) {
+	sp_tracing::try_init_simple();
+
+	let pool = sp_core::testing::TaskExecutor::new();
+	let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
+
+	let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default());
+	{
+		let subsystem = subsystem.run(context);
+
+		let test_fut = test_fx(TestHarness { virtual_overseer, pool });
+
+		futures::pin_mut!(test_fut);
+		futures::pin_mut!(subsystem);
+
+		executor::block_on(future::select(test_fut, subsystem));
+	}
+}
+
+/// Simple basic check, whether the subsystem works as expected.
+///
+/// Exceptional cases are tested as unit tests in `fetch_task`.
+#[test]
+fn check_basic() {
+	let state = TestState::default();
+	test_harness(state.keystore.clone(), move |harness| {
+		state.run(harness)
+	});
+}
diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs
new file mode 100644
index 0000000000000000000000000000000000000000..1d96dd1adf4cad4c27e59c3e64ace5faf9bd67da
--- /dev/null
+++ b/polkadot/node/network/availability-distribution/src/tests/state.rs
@@ -0,0 +1,317 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration};
+
+use polkadot_node_subsystem_util::TimeoutExt;
+use polkadot_subsystem_testhelpers::TestSubsystemContextHandle;
+use smallvec::smallvec;
+
+use futures::{FutureExt, channel::oneshot, SinkExt, channel::mpsc, StreamExt};
+use futures_timer::Delay;
+
+use sc_keystore::LocalKeystore;
+use sp_application_crypto::AppKey;
+use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
+use sp_keyring::Sr25519Keyring;
+use sp_core::{traits::SpawnNamed, testing::TaskExecutor};
+use sc_network as network;
+use sc_network::config as netconfig;
+
+use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, messages::{AllMessages,
+	AvailabilityDistributionMessage, AvailabilityStoreMessage, NetworkBridgeMessage, RuntimeApiMessage,
+	RuntimeApiRequest}
+};
+use polkadot_primitives::v1::{CandidateHash, CoreState, ErasureChunk, GroupIndex, Hash, Id
+	as ParaId, ScheduledCore, SessionInfo, ValidatorId,
+	ValidatorIndex
+};
+use polkadot_node_network_protocol::{jaeger,
+	request_response::{IncomingRequest, OutgoingRequest, Requests, v1}
+};
+use polkadot_subsystem_testhelpers as test_helpers;
+use test_helpers::SingleItemSink;
+
+use super::mock::{make_session_info, OccupiedCoreBuilder, };
+use crate::LOG_TARGET;
+
+pub struct TestHarness {
+	pub virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
+	pub pool: TaskExecutor,
+}
+
+/// TestState for mocking execution of this subsystem.
+///
+/// The `Default` instance provides data, which makes the system succeed by providing a couple of
+/// valid occupied cores. You can tune the data before calling `TestState::run`. E.g. modify some
+/// chunks to be invalid, the test will then still pass if you remove that chunk from
+/// `valid_chunks`.
+#[derive(Clone)]
+pub struct TestState {
+	// Simulated relay chain heads:
+	pub relay_chain: Vec<Hash>,
+	pub chunks: HashMap<(CandidateHash, ValidatorIndex), ErasureChunk>,
+	/// All chunks that are valid and should be accepted.
+	pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>,
+	pub session_info: SessionInfo,
+	/// Cores per relay chain block.
+	pub cores: HashMap<Hash, Vec<CoreState>>,
+	pub keystore: SyncCryptoStorePtr,
+}
+
+impl Default for TestState {
+	fn default() -> Self {
+		let relay_chain: Vec<_> = (1u8..10).map(Hash::repeat_byte).collect();
+		let chain_a = ParaId::from(1);
+		let chain_b = ParaId::from(2);
+
+		let chain_ids = vec![chain_a, chain_b];
+
+		let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
+
+		let session_info = make_session_info();
+
+		SyncCryptoStore::sr25519_generate_new(
+			&*keystore,
+			ValidatorId::ID,
+			Some(&Sr25519Keyring::Ferdie.to_seed()),
+		)
+		.expect("Insert key into keystore");
+
+		let (cores, chunks) = {
+			let mut cores = HashMap::new();
+			let mut chunks = HashMap::new();
+
+			cores.insert(relay_chain[0], 
+				vec![
+					CoreState::Scheduled(ScheduledCore {
+						para_id: chain_ids[0],
+						collator: None,
+					}),
+					CoreState::Scheduled(ScheduledCore {
+						para_id: chain_ids[1],
+						collator: None,
+					}),
+				]
+			);
+
+			let heads =  {
+				let mut advanced = relay_chain.iter();
+				advanced.next();
+				relay_chain.iter().zip(advanced)
+			};
+			for (relay_parent, relay_child) in heads {
+				let (p_cores, p_chunks): (Vec<_>, Vec<_>) = chain_ids.iter().enumerate()
+					.map(|(i, para_id)| {
+						let (core, chunk) = OccupiedCoreBuilder {
+							group_responsible: GroupIndex(i as _),
+							para_id: *para_id,
+							relay_parent: relay_parent.clone(),
+						}.build();
+						(CoreState::Occupied(core), chunk)
+					}
+					)
+					.unzip();
+				cores.insert(relay_child.clone(), p_cores);
+				// Skip chunks for our own group (won't get fetched):
+				let mut chunks_other_groups = p_chunks.into_iter();
+				chunks_other_groups.next();
+				for (validator_index, chunk) in chunks_other_groups {
+					chunks.insert((validator_index, chunk.index), chunk);
+				}
+			}
+			(cores, chunks)
+		};
+		Self {
+			relay_chain,
+			valid_chunks: chunks.clone().keys().map(Clone::clone).collect(),
+			chunks,
+			session_info,
+			cores,
+			keystore,
+		}
+	}
+}
+
+impl TestState {
+	
+	/// Run, but fail after some timeout.
+	pub async fn run(self, harness: TestHarness) {
+		// Make sure test won't run forever.
+		let f = self.run_inner(harness.pool, harness.virtual_overseer).timeout(Duration::from_secs(10));
+		assert!(f.await.is_some(), "Test ran into timeout");
+	}
+
+	/// Run tests with the given mock values in `TestState`.
+	///
+	/// This will simply advance through the simulated chain and examines whether the subsystem
+	/// behaves as expected: It will succeed if all valid chunks of other backing groups get stored
+	/// and no other.
+	async fn run_inner(self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle<AvailabilityDistributionMessage>) {
+		// We skip genesis here (in reality ActiveLeavesUpdate can also skip a block:
+		let updates = {
+			let mut advanced = self.relay_chain.iter();
+			advanced.next();
+			self
+			.relay_chain.iter().zip(advanced)
+			.map(|(old, new)| ActiveLeavesUpdate {
+				activated: smallvec![(new.clone(), Arc::new(jaeger::Span::Disabled))],
+				deactivated: smallvec![old.clone()],
+			}).collect::<Vec<_>>()
+		};
+
+		// We should be storing all valid chunks during execution:
+		//
+		// Test will fail if this does not happen until timeout.
+		let mut remaining_stores = self.valid_chunks.len();
+		
+		let TestSubsystemContextHandle { tx, mut rx } = virtual_overseer;
+
+		// Spawning necessary as incoming queue can only hold a single item, we don't want to dead
+		// lock ;-)
+		let update_tx = tx.clone();
+		executor.spawn("Sending active leaves updates", async move {
+			for update in updates {
+				overseer_signal(
+					update_tx.clone(),
+					OverseerSignal::ActiveLeaves(update)
+				).await;
+				// We need to give the subsystem a little time to do its job, otherwise it will
+				// cancel jobs as obsolete:
+				Delay::new(Duration::from_millis(20)).await;
+			}
+		}.boxed()
+		);
+
+		while remaining_stores > 0
+		{
+			tracing::trace!(target: LOG_TARGET, remaining_stores, "Stores left to go");
+			let msg = overseer_recv(&mut rx).await;
+			match msg {
+				AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs)) => {
+					for req in reqs {
+						// Forward requests:
+						let in_req = to_incoming_req(&executor, req);
+
+						executor.spawn("Request forwarding",
+									overseer_send(
+										tx.clone(),
+										AvailabilityDistributionMessage::AvailabilityFetchingRequest(in_req)
+									).boxed()
+						);
+					}
+				}
+				AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk(candidate_hash,	validator_index, tx)) => {
+					let chunk = self.chunks.get(&(candidate_hash, validator_index));
+					tx.send(chunk.map(Clone::clone))
+						.expect("Receiver is expected to be alive");
+				}
+				AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreChunk{candidate_hash,	chunk, tx, ..}) => {
+					assert!(
+						self.valid_chunks.contains(&(candidate_hash, chunk.index)),
+						"Only valid chunks should ever get stored."
+					);
+					tx.send(Ok(()))
+						.expect("Receiver is expected to be alive");
+					tracing::trace!(target: LOG_TARGET, "'Stored' fetched chunk.");
+					remaining_stores -= 1;
+				}
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, req)) => {
+					match req {
+						RuntimeApiRequest::SessionIndexForChild(tx) => {
+							// Always session index 1 for now:
+							tx.send(Ok(1))
+							.expect("Receiver should still be alive");
+						}
+						RuntimeApiRequest::SessionInfo(_, tx) => {
+							tx.send(Ok(Some(self.session_info.clone())))
+							.expect("Receiver should be alive.");
+						}
+						RuntimeApiRequest::AvailabilityCores(tx) => {
+							tracing::trace!(target: LOG_TARGET, cores= ?self.cores[&hash], hash = ?hash, "Sending out cores for hash");
+							tx.send(Ok(self.cores[&hash].clone()))
+							.expect("Receiver should still be alive");
+						}
+						_ => {
+							panic!("Unexpected runtime request: {:?}", req);
+						}
+					}
+				}
+				_ => {
+					panic!("Unexpected message received: {:?}", msg);
+				}
+			}
+		}
+	}
+}
+
+
+
+async fn overseer_signal(
+	mut tx: SingleItemSink<FromOverseer<AvailabilityDistributionMessage>>,
+	msg: impl Into<OverseerSignal>,
+) {
+	let msg = msg.into();
+	tracing::trace!(target: LOG_TARGET, msg = ?msg, "sending message");
+	tx.send(FromOverseer::Signal(msg))
+		.await
+		.expect("Test subsystem no longer live");
+}
+
+async fn overseer_send(
+	mut tx: SingleItemSink<FromOverseer<AvailabilityDistributionMessage>>,
+	msg: impl Into<AvailabilityDistributionMessage>,
+) {
+	let msg = msg.into();
+	tracing::trace!(target: LOG_TARGET, msg = ?msg, "sending message");
+	tx.send(FromOverseer::Communication { msg }).await
+		.expect("Test subsystem no longer live");
+	tracing::trace!(target: LOG_TARGET, "sent message");
+}
+
+
+async fn overseer_recv(
+	rx: &mut mpsc::UnboundedReceiver<AllMessages>,
+) -> AllMessages {
+	tracing::trace!(target: LOG_TARGET, "waiting for message ...");
+	rx.next().await.expect("Test subsystem no longer live")
+}
+
+fn to_incoming_req(
+	executor: &TaskExecutor,
+	outgoing: Requests
+) -> IncomingRequest<v1::AvailabilityFetchingRequest> {
+	match outgoing {
+		Requests::AvailabilityFetching(OutgoingRequest { payload, pending_response, .. }) => {
+			let (tx, rx): (oneshot::Sender<netconfig::OutgoingResponse>, oneshot::Receiver<_>)
+			   = oneshot::channel();
+			executor.spawn("Message forwarding", async {
+				let response = rx.await;
+				let payload = response.expect("Unexpected canceled request").result;
+				pending_response.send(payload.map_err(|_| network::RequestFailure::Refused))
+					.expect("Sending response is expected to work");
+			}.boxed()
+			);
+
+			IncomingRequest::new(
+				// We don't really care:
+				network::PeerId::random(),
+				payload,
+				tx
+			)
+		}
+	}
+}
diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs
index 512d761afed17797f40622e5dd97583a5ffeeda3..71d9631fd8ad65106a374f49b4a7386181f796d3 100644
--- a/polkadot/node/subsystem-test-helpers/src/lib.rs
+++ b/polkadot/node/subsystem-test-helpers/src/lib.rs
@@ -51,6 +51,13 @@ enum SinkState<T> {
 /// The sink half of a single-item sink that does not resolve until the item has been read.
 pub struct SingleItemSink<T>(Arc<Mutex<SinkState<T>>>);
 
+// Derive clone not possible, as it puts `Clone` constraint on `T` which is not sensible here.
+impl<T> Clone for SingleItemSink<T> {
+	fn clone(&self) -> Self {
+		Self(self.0.clone())
+	}
+}
+
 /// The stream half of a single-item sink.
 pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
 
@@ -213,8 +220,14 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
 
 /// A handle for interacting with the subsystem context.
 pub struct TestSubsystemContextHandle<M> {
-	tx: SingleItemSink<FromOverseer<M>>,
-	rx: mpsc::UnboundedReceiver<AllMessages>,
+	/// Direct access to sender of messages.
+	///
+	/// Useful for shared ownership situations (one can have multiple senders, but only one
+	/// receiver.
+	pub tx: SingleItemSink<FromOverseer<M>>,
+
+	/// Direct access to the receiver.
+	pub rx: mpsc::UnboundedReceiver<AllMessages>,
 }
 
 impl<M> TestSubsystemContextHandle<M> {