diff --git a/Cargo.lock b/Cargo.lock
index 9dfff599300d68098b089751b7b50eaed93b251b..ce0d70112042bc5cf4bb47e7425a0e212250d0d2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -11508,6 +11508,7 @@ name = "polkadot-availability-recovery"
 version = "1.0.0"
 dependencies = [
  "assert_matches",
+ "async-trait",
  "env_logger 0.9.3",
  "fatality",
  "futures",
diff --git a/polkadot/node/network/availability-recovery/Cargo.toml b/polkadot/node/network/availability-recovery/Cargo.toml
index 07ff09c7e70e0c48bf144703b571acc21e9b1fb9..42c3abef547b93e4187ed92764a91b4415602f06 100644
--- a/polkadot/node/network/availability-recovery/Cargo.toml
+++ b/polkadot/node/network/availability-recovery/Cargo.toml
@@ -11,6 +11,7 @@ schnellru = "0.2.1"
 rand = "0.8.5"
 fatality = "0.0.6"
 thiserror = "1.0.48"
+async-trait = "0.1.73"
 gum = { package = "tracing-gum", path = "../../gum" }
 
 polkadot-erasure-coding = { path = "../../../erasure-coding" }
diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs
index 99f42f4bf9fe668132cb020dd3e74d4faca34c46..e2146981da926d2c7027c902ba5be4716382f4da 100644
--- a/polkadot/node/network/availability-recovery/src/lib.rs
+++ b/polkadot/node/network/availability-recovery/src/lib.rs
@@ -23,11 +23,10 @@ use std::{
 	iter::Iterator,
 	num::NonZeroUsize,
 	pin::Pin,
-	time::Duration,
 };
 
 use futures::{
-	channel::oneshot::{self, channel},
+	channel::oneshot,
 	future::{Future, FutureExt, RemoteHandle},
 	pin_mut,
 	prelude::*,
@@ -35,77 +34,55 @@ use futures::{
 	stream::{FuturesUnordered, StreamExt},
 	task::{Context, Poll},
 };
-use rand::seq::SliceRandom;
 use schnellru::{ByLength, LruMap};
+use task::{FetchChunks, FetchChunksParams, FetchFull, FetchFullParams};
 
 use fatality::Nested;
 use polkadot_erasure_coding::{
 	branch_hash, branches, obtain_chunks_v1, recovery_threshold, Error as ErasureEncodingError,
 };
-#[cfg(not(test))]
-use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
+use task::{RecoveryParams, RecoveryStrategy, RecoveryTask};
+
 use polkadot_node_network_protocol::{
-	request_response::{
-		self as req_res, outgoing::RequestError, v1 as request_v1, IncomingRequestReceiver,
-		OutgoingRequest, Recipient, Requests,
-	},
-	IfDisconnected, UnifiedReputationChange as Rep,
+	request_response::{v1 as request_v1, IncomingRequestReceiver},
+	UnifiedReputationChange as Rep,
 };
 use polkadot_node_primitives::{AvailableData, ErasureChunk};
 use polkadot_node_subsystem::{
 	errors::RecoveryError,
 	jaeger,
-	messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage, NetworkBridgeTxMessage},
-	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
-	SubsystemResult,
+	messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage},
+	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
+	SubsystemContext, SubsystemError, SubsystemResult,
 };
 use polkadot_node_subsystem_util::request_session_info;
 use polkadot_primitives::{
-	AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex,
-	Hash, HashT, IndexedVec, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex,
+	BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HashT,
+	SessionIndex, SessionInfo, ValidatorIndex,
 };
 
 mod error;
 mod futures_undead;
 mod metrics;
+mod task;
 use metrics::Metrics;
 
-use futures_undead::FuturesUndead;
-use sc_network::{OutboundFailure, RequestFailure};
-
 #[cfg(test)]
 mod tests;
 
 const LOG_TARGET: &str = "parachain::availability-recovery";
 
-// How many parallel recovery tasks should be running at once.
-const N_PARALLEL: usize = 50;
-
 // Size of the LRU cache where we keep recovered data.
 const LRU_SIZE: u32 = 16;
 
 const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
 
-/// Time after which we consider a request to have failed
-///
-/// and we should try more peers. Note in theory the request times out at the network level,
-/// measurements have shown, that in practice requests might actually take longer to fail in
-/// certain occasions. (The very least, authority discovery is not part of the timeout.)
-///
-/// For the time being this value is the same as the timeout on the networking layer, but as this
-/// timeout is more soft than the networking one, it might make sense to pick different values as
-/// well.
-#[cfg(not(test))]
-const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
-#[cfg(test)]
-const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
-
 /// PoV size limit in bytes for which prefer fetching from backers.
 const SMALL_POV_LIMIT: usize = 128 * 1024;
 
 #[derive(Clone, PartialEq)]
 /// The strategy we use to recover the PoV.
-pub enum RecoveryStrategy {
+pub enum RecoveryStrategyKind {
 	/// We always try the backing group first, then fallback to validator chunks.
 	BackersFirstAlways,
 	/// We try the backing group first if PoV size is lower than specified, then fallback to
@@ -113,101 +90,25 @@ pub enum RecoveryStrategy {
 	BackersFirstIfSizeLower(usize),
 	/// We always recover using validator chunks.
 	ChunksAlways,
-	/// Do not request data from the availability store.
-	/// This is the useful for nodes where the
-	/// availability-store subsystem is not expected to run,
-	/// such as collators.
-	BypassAvailabilityStore,
 }
 
-impl RecoveryStrategy {
-	/// Returns true if the strategy needs backing group index.
-	pub fn needs_backing_group(&self) -> bool {
-		match self {
-			RecoveryStrategy::BackersFirstAlways | RecoveryStrategy::BackersFirstIfSizeLower(_) =>
-				true,
-			_ => false,
-		}
-	}
-
-	/// Returns the PoV size limit in bytes for `BackersFirstIfSizeLower` strategy, otherwise
-	/// `None`.
-	pub fn pov_size_limit(&self) -> Option<usize> {
-		match *self {
-			RecoveryStrategy::BackersFirstIfSizeLower(limit) => Some(limit),
-			_ => None,
-		}
-	}
-}
 /// The Availability Recovery Subsystem.
 pub struct AvailabilityRecoverySubsystem {
 	/// PoV recovery strategy to use.
-	recovery_strategy: RecoveryStrategy,
+	recovery_strategy_kind: RecoveryStrategyKind,
+	// If this is true, do not request data from the availability store.
+	/// This is the useful for nodes where the
+	/// availability-store subsystem is not expected to run,
+	/// such as collators.
+	bypass_availability_store: bool,
 	/// Receiver for available data requests.
 	req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
 	/// Metrics for this subsystem.
 	metrics: Metrics,
 }
 
-struct RequestFromBackers {
-	// a random shuffling of the validators from the backing group which indicates the order
-	// in which we connect to them and request the chunk.
-	shuffled_backers: Vec<ValidatorIndex>,
-	// channel to the erasure task handler.
-	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-}
-
-struct RequestChunksFromValidators {
-	/// How many request have been unsuccessful so far.
-	error_count: usize,
-	/// Total number of responses that have been received.
-	///
-	/// including failed ones.
-	total_received_responses: usize,
-	/// a random shuffling of the validators which indicates the order in which we connect to the
-	/// validators and request the chunk from them.
-	shuffling: VecDeque<ValidatorIndex>,
-	/// Chunks received so far.
-	received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
-	/// Pending chunk requests with soft timeout.
-	requesting_chunks: FuturesUndead<Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
-	// channel to the erasure task handler.
-	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-}
-
-struct RecoveryParams {
-	/// Discovery ids of `validators`.
-	validator_authority_keys: Vec<AuthorityDiscoveryId>,
-
-	/// Validators relevant to this `RecoveryTask`.
-	validators: IndexedVec<ValidatorIndex, ValidatorId>,
-
-	/// The number of pieces needed.
-	threshold: usize,
-
-	/// A hash of the relevant candidate.
-	candidate_hash: CandidateHash,
-
-	/// The root of the erasure encoding of the para block.
-	erasure_root: Hash,
-
-	/// Metrics to report
-	metrics: Metrics,
-
-	/// Do not request data from availability-store
-	bypass_availability_store: bool,
-}
-
-/// Source the availability data either by means
-/// of direct request response protocol to
-/// backers (a.k.a. fast-path), or recover from chunks.
-enum Source {
-	RequestFromBackers(RequestFromBackers),
-	RequestChunks(RequestChunksFromValidators),
-}
-
 /// Expensive erasure coding computations that we want to run on a blocking thread.
-enum ErasureTask {
+pub enum ErasureTask {
 	/// Reconstructs `AvailableData` from chunks given `n_validators`.
 	Reconstruct(
 		usize,
@@ -219,486 +120,6 @@ enum ErasureTask {
 	Reencode(usize, Hash, AvailableData, oneshot::Sender<Option<AvailableData>>),
 }
 
-/// A stateful reconstruction of availability data in reference to
-/// a candidate hash.
-struct RecoveryTask<Sender> {
-	sender: Sender,
-
-	/// The parameters of the recovery process.
-	params: RecoveryParams,
-
-	/// The source to obtain the availability data from.
-	source: Source,
-
-	// channel to the erasure task handler.
-	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-}
-
-impl RequestFromBackers {
-	fn new(
-		mut backers: Vec<ValidatorIndex>,
-		erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-	) -> Self {
-		backers.shuffle(&mut rand::thread_rng());
-
-		RequestFromBackers { shuffled_backers: backers, erasure_task_tx }
-	}
-
-	// Run this phase to completion.
-	async fn run(
-		&mut self,
-		params: &RecoveryParams,
-		sender: &mut impl overseer::AvailabilityRecoverySenderTrait,
-	) -> Result<AvailableData, RecoveryError> {
-		gum::trace!(
-			target: LOG_TARGET,
-			candidate_hash = ?params.candidate_hash,
-			erasure_root = ?params.erasure_root,
-			"Requesting from backers",
-		);
-		loop {
-			// Pop the next backer, and proceed to next phase if we're out.
-			let validator_index =
-				self.shuffled_backers.pop().ok_or_else(|| RecoveryError::Unavailable)?;
-
-			// Request data.
-			let (req, response) = OutgoingRequest::new(
-				Recipient::Authority(
-					params.validator_authority_keys[validator_index.0 as usize].clone(),
-				),
-				req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash },
-			);
-
-			sender
-				.send_message(NetworkBridgeTxMessage::SendRequests(
-					vec![Requests::AvailableDataFetchingV1(req)],
-					IfDisconnected::ImmediateError,
-				))
-				.await;
-
-			match response.await {
-				Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
-					let (reencode_tx, reencode_rx) = channel();
-					self.erasure_task_tx
-						.send(ErasureTask::Reencode(
-							params.validators.len(),
-							params.erasure_root,
-							data,
-							reencode_tx,
-						))
-						.await
-						.map_err(|_| RecoveryError::ChannelClosed)?;
-
-					let reencode_response =
-						reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
-
-					if let Some(data) = reencode_response {
-						gum::trace!(
-							target: LOG_TARGET,
-							candidate_hash = ?params.candidate_hash,
-							"Received full data",
-						);
-
-						return Ok(data)
-					} else {
-						gum::debug!(
-							target: LOG_TARGET,
-							candidate_hash = ?params.candidate_hash,
-							?validator_index,
-							"Invalid data response",
-						);
-
-						// it doesn't help to report the peer with req/res.
-					}
-				},
-				Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {},
-				Err(e) => gum::debug!(
-					target: LOG_TARGET,
-					candidate_hash = ?params.candidate_hash,
-					?validator_index,
-					err = ?e,
-					"Error fetching full available data."
-				),
-			}
-		}
-	}
-}
-
-impl RequestChunksFromValidators {
-	fn new(
-		n_validators: u32,
-		erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-	) -> Self {
-		let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect();
-		shuffling.shuffle(&mut rand::thread_rng());
-
-		RequestChunksFromValidators {
-			error_count: 0,
-			total_received_responses: 0,
-			shuffling: shuffling.into(),
-			received_chunks: HashMap::new(),
-			requesting_chunks: FuturesUndead::new(),
-			erasure_task_tx,
-		}
-	}
-
-	fn is_unavailable(&self, params: &RecoveryParams) -> bool {
-		is_unavailable(
-			self.chunk_count(),
-			self.requesting_chunks.total_len(),
-			self.shuffling.len(),
-			params.threshold,
-		)
-	}
-
-	fn chunk_count(&self) -> usize {
-		self.received_chunks.len()
-	}
-
-	fn insert_chunk(&mut self, validator_index: ValidatorIndex, chunk: ErasureChunk) {
-		self.received_chunks.insert(validator_index, chunk);
-	}
-
-	fn can_conclude(&self, params: &RecoveryParams) -> bool {
-		self.chunk_count() >= params.threshold || self.is_unavailable(params)
-	}
-
-	/// Desired number of parallel requests.
-	///
-	/// For the given threshold (total required number of chunks) get the desired number of
-	/// requests we want to have running in parallel at this time.
-	fn get_desired_request_count(&self, threshold: usize) -> usize {
-		// Upper bound for parallel requests.
-		// We want to limit this, so requests can be processed within the timeout and we limit the
-		// following feedback loop:
-		// 1. Requests fail due to timeout
-		// 2. We request more chunks to make up for it
-		// 3. Bandwidth is spread out even more, so we get even more timeouts
-		// 4. We request more chunks to make up for it ...
-		let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
-		// How many chunks are still needed?
-		let remaining_chunks = threshold.saturating_sub(self.chunk_count());
-		// What is the current error rate, so we can make up for it?
-		let inv_error_rate =
-			self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
-		// Actual number of requests we want to have in flight in parallel:
-		std::cmp::min(
-			max_requests_boundary,
-			remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
-		)
-	}
-
-	async fn launch_parallel_requests<Sender>(
-		&mut self,
-		params: &RecoveryParams,
-		sender: &mut Sender,
-	) where
-		Sender: overseer::AvailabilityRecoverySenderTrait,
-	{
-		let num_requests = self.get_desired_request_count(params.threshold);
-		let candidate_hash = &params.candidate_hash;
-		let already_requesting_count = self.requesting_chunks.len();
-
-		gum::debug!(
-			target: LOG_TARGET,
-			?candidate_hash,
-			?num_requests,
-			error_count= ?self.error_count,
-			total_received = ?self.total_received_responses,
-			threshold = ?params.threshold,
-			?already_requesting_count,
-			"Requesting availability chunks for a candidate",
-		);
-		let mut requests = Vec::with_capacity(num_requests - already_requesting_count);
-
-		while self.requesting_chunks.len() < num_requests {
-			if let Some(validator_index) = self.shuffling.pop_back() {
-				let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
-				gum::trace!(
-					target: LOG_TARGET,
-					?validator,
-					?validator_index,
-					?candidate_hash,
-					"Requesting chunk",
-				);
-
-				// Request data.
-				let raw_request = req_res::v1::ChunkFetchingRequest {
-					candidate_hash: params.candidate_hash,
-					index: validator_index,
-				};
-
-				let (req, res) = OutgoingRequest::new(Recipient::Authority(validator), raw_request);
-				requests.push(Requests::ChunkFetchingV1(req));
-
-				params.metrics.on_chunk_request_issued();
-				let timer = params.metrics.time_chunk_request();
-
-				self.requesting_chunks.push(Box::pin(async move {
-					let _timer = timer;
-					match res.await {
-						Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) =>
-							Ok(Some(chunk.recombine_into_chunk(&raw_request))),
-						Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
-						Err(e) => Err((validator_index, e)),
-					}
-				}));
-			} else {
-				break
-			}
-		}
-
-		sender
-			.send_message(NetworkBridgeTxMessage::SendRequests(
-				requests,
-				IfDisconnected::TryConnect,
-			))
-			.await;
-	}
-
-	/// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`.
-	async fn wait_for_chunks(&mut self, params: &RecoveryParams) {
-		let metrics = &params.metrics;
-
-		// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
-		// We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will
-		// return in that case for `launch_parallel_requests` to fill up slots again.
-		while let Some(request_result) =
-			self.requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
-		{
-			self.total_received_responses += 1;
-
-			match request_result {
-				Ok(Some(chunk)) =>
-					if is_chunk_valid(params, &chunk) {
-						metrics.on_chunk_request_succeeded();
-						gum::trace!(
-							target: LOG_TARGET,
-							candidate_hash = ?params.candidate_hash,
-							validator_index = ?chunk.index,
-							"Received valid chunk",
-						);
-						self.insert_chunk(chunk.index, chunk);
-					} else {
-						metrics.on_chunk_request_invalid();
-						self.error_count += 1;
-					},
-				Ok(None) => {
-					metrics.on_chunk_request_no_such_chunk();
-					self.error_count += 1;
-				},
-				Err((validator_index, e)) => {
-					self.error_count += 1;
-
-					gum::trace!(
-						target: LOG_TARGET,
-						candidate_hash= ?params.candidate_hash,
-						err = ?e,
-						?validator_index,
-						"Failure requesting chunk",
-					);
-
-					match e {
-						RequestError::InvalidResponse(_) => {
-							metrics.on_chunk_request_invalid();
-
-							gum::debug!(
-								target: LOG_TARGET,
-								candidate_hash = ?params.candidate_hash,
-								err = ?e,
-								?validator_index,
-								"Chunk fetching response was invalid",
-							);
-						},
-						RequestError::NetworkError(err) => {
-							// No debug logs on general network errors - that became very spammy
-							// occasionally.
-							if let RequestFailure::Network(OutboundFailure::Timeout) = err {
-								metrics.on_chunk_request_timeout();
-							} else {
-								metrics.on_chunk_request_error();
-							}
-
-							self.shuffling.push_front(validator_index);
-						},
-						RequestError::Canceled(_) => {
-							metrics.on_chunk_request_error();
-
-							self.shuffling.push_front(validator_index);
-						},
-					}
-				},
-			}
-
-			// Stop waiting for requests when we either can already recover the data
-			// or have gotten firm 'No' responses from enough validators.
-			if self.can_conclude(params) {
-				gum::debug!(
-					target: LOG_TARGET,
-					candidate_hash = ?params.candidate_hash,
-					received_chunks_count = ?self.chunk_count(),
-					requested_chunks_count = ?self.requesting_chunks.len(),
-					threshold = ?params.threshold,
-					"Can conclude availability for a candidate",
-				);
-				break
-			}
-		}
-	}
-
-	async fn run<Sender>(
-		&mut self,
-		params: &RecoveryParams,
-		sender: &mut Sender,
-	) -> Result<AvailableData, RecoveryError>
-	where
-		Sender: overseer::AvailabilityRecoverySenderTrait,
-	{
-		let metrics = &params.metrics;
-
-		// First query the store for any chunks we've got.
-		if !params.bypass_availability_store {
-			let (tx, rx) = oneshot::channel();
-			sender
-				.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
-				.await;
-
-			match rx.await {
-				Ok(chunks) => {
-					// This should either be length 1 or 0. If we had the whole data,
-					// we wouldn't have reached this stage.
-					let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect();
-					self.shuffling.retain(|i| !chunk_indices.contains(i));
-
-					for chunk in chunks {
-						if is_chunk_valid(params, &chunk) {
-							gum::trace!(
-								target: LOG_TARGET,
-								candidate_hash = ?params.candidate_hash,
-								validator_index = ?chunk.index,
-								"Found valid chunk on disk"
-							);
-							self.insert_chunk(chunk.index, chunk);
-						} else {
-							gum::error!(
-								target: LOG_TARGET,
-								"Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!"
-							);
-						};
-					}
-				},
-				Err(oneshot::Canceled) => {
-					gum::warn!(
-						target: LOG_TARGET,
-						candidate_hash = ?params.candidate_hash,
-						"Failed to reach the availability store"
-					);
-				},
-			}
-		}
-
-		let _recovery_timer = metrics.time_full_recovery();
-
-		loop {
-			if self.is_unavailable(&params) {
-				gum::debug!(
-					target: LOG_TARGET,
-					candidate_hash = ?params.candidate_hash,
-					erasure_root = ?params.erasure_root,
-					received = %self.chunk_count(),
-					requesting = %self.requesting_chunks.len(),
-					total_requesting = %self.requesting_chunks.total_len(),
-					n_validators = %params.validators.len(),
-					"Data recovery is not possible",
-				);
-
-				metrics.on_recovery_failed();
-
-				return Err(RecoveryError::Unavailable)
-			}
-
-			self.launch_parallel_requests(params, sender).await;
-			self.wait_for_chunks(params).await;
-
-			// If received_chunks has more than threshold entries, attempt to recover the data.
-			// If that fails, or a re-encoding of it doesn't match the expected erasure root,
-			// return Err(RecoveryError::Invalid)
-			if self.chunk_count() >= params.threshold {
-				let recovery_duration = metrics.time_erasure_recovery();
-
-				// Send request to reconstruct available data from chunks.
-				let (avilable_data_tx, available_data_rx) = channel();
-				self.erasure_task_tx
-					.send(ErasureTask::Reconstruct(
-						params.validators.len(),
-						std::mem::take(&mut self.received_chunks),
-						avilable_data_tx,
-					))
-					.await
-					.map_err(|_| RecoveryError::ChannelClosed)?;
-
-				let available_data_response =
-					available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
-
-				return match available_data_response {
-					Ok(data) => {
-						// Send request to re-encode the chunks and check merkle root.
-						let (reencode_tx, reencode_rx) = channel();
-						self.erasure_task_tx
-							.send(ErasureTask::Reencode(
-								params.validators.len(),
-								params.erasure_root,
-								data,
-								reencode_tx,
-							))
-							.await
-							.map_err(|_| RecoveryError::ChannelClosed)?;
-
-						let reencode_response =
-							reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
-
-						if let Some(data) = reencode_response {
-							gum::trace!(
-								target: LOG_TARGET,
-								candidate_hash = ?params.candidate_hash,
-								erasure_root = ?params.erasure_root,
-								"Data recovery complete",
-							);
-							metrics.on_recovery_succeeded();
-
-							Ok(data)
-						} else {
-							recovery_duration.map(|rd| rd.stop_and_discard());
-							gum::trace!(
-								target: LOG_TARGET,
-								candidate_hash = ?params.candidate_hash,
-								erasure_root = ?params.erasure_root,
-								"Data recovery - root mismatch",
-							);
-							metrics.on_recovery_invalid();
-
-							Err(RecoveryError::Invalid)
-						}
-					},
-					Err(err) => {
-						recovery_duration.map(|rd| rd.stop_and_discard());
-						gum::trace!(
-							target: LOG_TARGET,
-							candidate_hash = ?params.candidate_hash,
-							erasure_root = ?params.erasure_root,
-							?err,
-							"Data recovery error ",
-						);
-						metrics.on_recovery_invalid();
-
-						Err(RecoveryError::Invalid)
-					},
-				}
-			}
-		}
-	}
-}
-
 const fn is_unavailable(
 	received_chunks: usize,
 	requesting_chunks: usize,
@@ -777,60 +198,6 @@ fn reconstructed_data_matches_root(
 	branches.root() == *expected_root
 }
 
-impl<Sender> RecoveryTask<Sender>
-where
-	Sender: overseer::AvailabilityRecoverySenderTrait,
-{
-	async fn run(mut self) -> Result<AvailableData, RecoveryError> {
-		// First just see if we have the data available locally.
-		if !self.params.bypass_availability_store {
-			let (tx, rx) = oneshot::channel();
-			self.sender
-				.send_message(AvailabilityStoreMessage::QueryAvailableData(
-					self.params.candidate_hash,
-					tx,
-				))
-				.await;
-
-			match rx.await {
-				Ok(Some(data)) => return Ok(data),
-				Ok(None) => {},
-				Err(oneshot::Canceled) => {
-					gum::warn!(
-						target: LOG_TARGET,
-						candidate_hash = ?self.params.candidate_hash,
-						"Failed to reach the availability store",
-					)
-				},
-			}
-		}
-
-		self.params.metrics.on_recovery_started();
-
-		loop {
-			// These only fail if we cannot reach the underlying subsystem, which case there is
-			// nothing meaningful we can do.
-			match self.source {
-				Source::RequestFromBackers(ref mut from_backers) => {
-					match from_backers.run(&self.params, &mut self.sender).await {
-						Ok(data) => break Ok(data),
-						Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid),
-						Err(RecoveryError::ChannelClosed) =>
-							break Err(RecoveryError::ChannelClosed),
-						Err(RecoveryError::Unavailable) =>
-							self.source = Source::RequestChunks(RequestChunksFromValidators::new(
-								self.params.validators.len() as _,
-								self.erasure_task_tx.clone(),
-							)),
-					}
-				},
-				Source::RequestChunks(ref mut from_all) =>
-					break from_all.run(&self.params, &mut self.sender).await,
-			}
-		}
-	}
-}
-
 /// Accumulate all awaiting sides for some particular `AvailableData`.
 struct RecoveryHandle {
 	candidate_hash: CandidateHash,
@@ -973,65 +340,23 @@ async fn launch_recovery_task<Context>(
 	ctx: &mut Context,
 	session_info: SessionInfo,
 	receipt: CandidateReceipt,
-	mut backing_group: Option<GroupIndex>,
 	response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
 	metrics: &Metrics,
-	recovery_strategy: &RecoveryStrategy,
-	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+	recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
+	bypass_availability_store: bool,
 ) -> error::Result<()> {
 	let candidate_hash = receipt.hash();
 	let params = RecoveryParams {
 		validator_authority_keys: session_info.discovery_keys.clone(),
-		validators: session_info.validators.clone(),
+		n_validators: session_info.validators.len(),
 		threshold: recovery_threshold(session_info.validators.len())?,
 		candidate_hash,
 		erasure_root: receipt.descriptor.erasure_root,
 		metrics: metrics.clone(),
-		bypass_availability_store: recovery_strategy == &RecoveryStrategy::BypassAvailabilityStore,
+		bypass_availability_store,
 	};
 
-	if let Some(small_pov_limit) = recovery_strategy.pov_size_limit() {
-		// Get our own chunk size to get an estimate of the PoV size.
-		let chunk_size: Result<Option<usize>, error::Error> =
-			query_chunk_size(ctx, candidate_hash).await;
-		if let Ok(Some(chunk_size)) = chunk_size {
-			let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len()) / 3;
-			let prefer_backing_group = pov_size_estimate < small_pov_limit;
-
-			gum::trace!(
-				target: LOG_TARGET,
-				?candidate_hash,
-				pov_size_estimate,
-				small_pov_limit,
-				enabled = prefer_backing_group,
-				"Prefer fetch from backing group",
-			);
-
-			backing_group = backing_group.filter(|_| {
-				// We keep the backing group only if `1/3` of chunks sum up to less than
-				// `small_pov_limit`.
-				prefer_backing_group
-			});
-		}
-	}
-
-	let phase = backing_group
-		.and_then(|g| session_info.validator_groups.get(g))
-		.map(|group| {
-			Source::RequestFromBackers(RequestFromBackers::new(
-				group.clone(),
-				erasure_task_tx.clone(),
-			))
-		})
-		.unwrap_or_else(|| {
-			Source::RequestChunks(RequestChunksFromValidators::new(
-				params.validators.len() as _,
-				erasure_task_tx.clone(),
-			))
-		});
-
-	let recovery_task =
-		RecoveryTask { sender: ctx.sender().clone(), params, source: phase, erasure_task_tx };
+	let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
 
 	let (remote, remote_handle) = recovery_task.run().remote_handle();
 
@@ -1062,8 +387,9 @@ async fn handle_recover<Context>(
 	backing_group: Option<GroupIndex>,
 	response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
 	metrics: &Metrics,
-	recovery_strategy: &RecoveryStrategy,
 	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+	recovery_strategy_kind: RecoveryStrategyKind,
+	bypass_availability_store: bool,
 ) -> error::Result<()> {
 	let candidate_hash = receipt.hash();
 
@@ -1098,19 +424,71 @@ async fn handle_recover<Context>(
 
 	let _span = span.child("session-info-ctx-received");
 	match session_info {
-		Some(session_info) =>
+		Some(session_info) => {
+			let mut recovery_strategies: VecDeque<
+				Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>,
+			> = VecDeque::with_capacity(2);
+
+			if let Some(backing_group) = backing_group {
+				if let Some(backing_validators) = session_info.validator_groups.get(backing_group) {
+					let mut small_pov_size = true;
+
+					if let RecoveryStrategyKind::BackersFirstIfSizeLower(small_pov_limit) =
+						recovery_strategy_kind
+					{
+						// Get our own chunk size to get an estimate of the PoV size.
+						let chunk_size: Result<Option<usize>, error::Error> =
+							query_chunk_size(ctx, candidate_hash).await;
+						if let Ok(Some(chunk_size)) = chunk_size {
+							let pov_size_estimate =
+								chunk_size.saturating_mul(session_info.validators.len()) / 3;
+							small_pov_size = pov_size_estimate < small_pov_limit;
+
+							gum::trace!(
+								target: LOG_TARGET,
+								?candidate_hash,
+								pov_size_estimate,
+								small_pov_limit,
+								enabled = small_pov_size,
+								"Prefer fetch from backing group",
+							);
+						} else {
+							// we have a POV limit but were not able to query the chunk size, so
+							// don't use the backing group.
+							small_pov_size = false;
+						}
+					};
+
+					match (&recovery_strategy_kind, small_pov_size) {
+						(RecoveryStrategyKind::BackersFirstAlways, _) |
+						(RecoveryStrategyKind::BackersFirstIfSizeLower(_), true) => recovery_strategies.push_back(
+							Box::new(FetchFull::new(FetchFullParams {
+								validators: backing_validators.to_vec(),
+								erasure_task_tx: erasure_task_tx.clone(),
+							})),
+						),
+						_ => {},
+					};
+				}
+			}
+
+			recovery_strategies.push_back(Box::new(FetchChunks::new(FetchChunksParams {
+				n_validators: session_info.validators.len(),
+				erasure_task_tx,
+			})));
+
 			launch_recovery_task(
 				state,
 				ctx,
 				session_info,
 				receipt,
-				backing_group,
 				response_sender,
 				metrics,
-				recovery_strategy,
-				erasure_task_tx,
+				recovery_strategies,
+				bypass_availability_store,
 			)
-			.await,
+			.await
+		},
 		None => {
 			gum::warn!(target: LOG_TARGET, "SessionInfo is `None` at {:?}", state.live_block);
 			response_sender
@@ -1155,7 +533,12 @@ impl AvailabilityRecoverySubsystem {
 		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
 		metrics: Metrics,
 	) -> Self {
-		Self { recovery_strategy: RecoveryStrategy::BypassAvailabilityStore, req_receiver, metrics }
+		Self {
+			recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
+			bypass_availability_store: true,
+			req_receiver,
+			metrics,
+		}
 	}
 
 	/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
@@ -1164,7 +547,12 @@ impl AvailabilityRecoverySubsystem {
 		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
 		metrics: Metrics,
 	) -> Self {
-		Self { recovery_strategy: RecoveryStrategy::BackersFirstAlways, req_receiver, metrics }
+		Self {
+			recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways,
+			bypass_availability_store: false,
+			req_receiver,
+			metrics,
+		}
 	}
 
 	/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
@@ -1172,7 +560,12 @@ impl AvailabilityRecoverySubsystem {
 		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
 		metrics: Metrics,
 	) -> Self {
-		Self { recovery_strategy: RecoveryStrategy::ChunksAlways, req_receiver, metrics }
+		Self {
+			recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways,
+			bypass_availability_store: false,
+			req_receiver,
+			metrics,
+		}
 	}
 
 	/// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is
@@ -1182,7 +575,8 @@ impl AvailabilityRecoverySubsystem {
 		metrics: Metrics,
 	) -> Self {
 		Self {
-			recovery_strategy: RecoveryStrategy::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
+			recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
+			bypass_availability_store: false,
 			req_receiver,
 			metrics,
 		}
@@ -1190,7 +584,8 @@ impl AvailabilityRecoverySubsystem {
 
 	async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
 		let mut state = State::default();
-		let Self { recovery_strategy, mut req_receiver, metrics } = self;
+		let Self { mut req_receiver, metrics, recovery_strategy_kind, bypass_availability_store } =
+			self;
 
 		let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
 		let mut erasure_task_rx = erasure_task_rx.fuse();
@@ -1275,11 +670,12 @@ impl AvailabilityRecoverySubsystem {
 										&mut ctx,
 										receipt,
 										session_index,
-										maybe_backing_group.filter(|_| recovery_strategy.needs_backing_group()),
+										maybe_backing_group,
 										response_sender,
 										&metrics,
-										&recovery_strategy,
 										erasure_task_tx.clone(),
+										recovery_strategy_kind.clone(),
+										bypass_availability_store
 									).await {
 										gum::warn!(
 											target: LOG_TARGET,
@@ -1295,7 +691,7 @@ impl AvailabilityRecoverySubsystem {
 				in_req = recv_req => {
 					match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
 						Ok(req) => {
-							if recovery_strategy == RecoveryStrategy::BypassAvailabilityStore {
+							if bypass_availability_store {
 								gum::debug!(
 									target: LOG_TARGET,
 									"Skipping request to availability-store.",
diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs
new file mode 100644
index 0000000000000000000000000000000000000000..d5bc2da84944a3403f9d4a3bf9d5a11b0e772f40
--- /dev/null
+++ b/polkadot/node/network/availability-recovery/src/task.rs
@@ -0,0 +1,830 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Recovery task and associated strategies.
+
+#![warn(missing_docs)]
+
+use crate::{
+	futures_undead::FuturesUndead, is_chunk_valid, is_unavailable, metrics::Metrics, ErasureTask,
+	LOG_TARGET,
+};
+use futures::{channel::oneshot, SinkExt};
+#[cfg(not(test))]
+use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
+use polkadot_node_network_protocol::request_response::{
+	self as req_res, outgoing::RequestError, OutgoingRequest, Recipient, Requests,
+};
+use polkadot_node_primitives::{AvailableData, ErasureChunk};
+use polkadot_node_subsystem::{
+	messages::{AvailabilityStoreMessage, NetworkBridgeTxMessage},
+	overseer, RecoveryError,
+};
+use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, Hash, ValidatorIndex};
+use rand::seq::SliceRandom;
+use sc_network::{IfDisconnected, OutboundFailure, RequestFailure};
+use std::{
+	collections::{HashMap, VecDeque},
+	time::Duration,
+};
+
+// How many parallel recovery tasks should be running at once.
+const N_PARALLEL: usize = 50;
+
+/// Time after which we consider a request to have failed
+///
+/// and we should try more peers. Note in theory the request times out at the network level,
+/// measurements have shown, that in practice requests might actually take longer to fail in
+/// certain occasions. (The very least, authority discovery is not part of the timeout.)
+///
+/// For the time being this value is the same as the timeout on the networking layer, but as this
+/// timeout is more soft than the networking one, it might make sense to pick different values as
+/// well.
+#[cfg(not(test))]
+const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
+#[cfg(test)]
+const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
+
+#[async_trait::async_trait]
+/// Common trait for runnable recovery strategies.
+pub trait RecoveryStrategy<Sender: overseer::AvailabilityRecoverySenderTrait>: Send {
+	/// Main entry point of the strategy.
+	async fn run(
+		&mut self,
+		state: &mut State,
+		sender: &mut Sender,
+		common_params: &RecoveryParams,
+	) -> Result<AvailableData, RecoveryError>;
+
+	/// Return the name of the strategy for logging purposes.
+	fn display_name(&self) -> &'static str;
+}
+
+/// Recovery parameters common to all strategies in a `RecoveryTask`.
+pub struct RecoveryParams {
+	/// Discovery ids of `validators`.
+	pub validator_authority_keys: Vec<AuthorityDiscoveryId>,
+
+	/// Number of validators.
+	pub n_validators: usize,
+
+	/// The number of chunks needed.
+	pub threshold: usize,
+
+	/// A hash of the relevant candidate.
+	pub candidate_hash: CandidateHash,
+
+	/// The root of the erasure encoding of the candidate.
+	pub erasure_root: Hash,
+
+	/// Metrics to report.
+	pub metrics: Metrics,
+
+	/// Do not request data from availability-store. Useful for collators.
+	pub bypass_availability_store: bool,
+}
+
+/// Intermediate/common data that must be passed between `RecoveryStrategy`s belonging to the
+/// same `RecoveryTask`.
+pub struct State {
+	/// Chunks received so far.
+	received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
+}
+
+impl State {
+	fn new() -> Self {
+		Self { received_chunks: HashMap::new() }
+	}
+
+	fn insert_chunk(&mut self, validator: ValidatorIndex, chunk: ErasureChunk) {
+		self.received_chunks.insert(validator, chunk);
+	}
+
+	fn chunk_count(&self) -> usize {
+		self.received_chunks.len()
+	}
+
+	/// Retrieve the local chunks held in the av-store (either 0 or 1).
+	async fn populate_from_av_store<Sender: overseer::AvailabilityRecoverySenderTrait>(
+		&mut self,
+		params: &RecoveryParams,
+		sender: &mut Sender,
+	) -> Vec<ValidatorIndex> {
+		let (tx, rx) = oneshot::channel();
+		sender
+			.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
+			.await;
+
+		match rx.await {
+			Ok(chunks) => {
+				// This should either be length 1 or 0. If we had the whole data,
+				// we wouldn't have reached this stage.
+				let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect();
+
+				for chunk in chunks {
+					if is_chunk_valid(params, &chunk) {
+						gum::trace!(
+							target: LOG_TARGET,
+							candidate_hash = ?params.candidate_hash,
+							validator_index = ?chunk.index,
+							"Found valid chunk on disk"
+						);
+						self.insert_chunk(chunk.index, chunk);
+					} else {
+						gum::error!(
+							target: LOG_TARGET,
+							"Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!"
+						);
+					};
+				}
+
+				chunk_indices
+			},
+			Err(oneshot::Canceled) => {
+				gum::warn!(
+					target: LOG_TARGET,
+					candidate_hash = ?params.candidate_hash,
+					"Failed to reach the availability store"
+				);
+
+				vec![]
+			},
+		}
+	}
+
+	/// Launch chunk requests in parallel, according to the parameters.
+	async fn launch_parallel_chunk_requests<Sender>(
+		&mut self,
+		params: &RecoveryParams,
+		sender: &mut Sender,
+		desired_requests_count: usize,
+		validators: &mut VecDeque<ValidatorIndex>,
+		requesting_chunks: &mut FuturesUndead<
+			Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>,
+		>,
+	) where
+		Sender: overseer::AvailabilityRecoverySenderTrait,
+	{
+		let candidate_hash = &params.candidate_hash;
+		let already_requesting_count = requesting_chunks.len();
+
+		let mut requests = Vec::with_capacity(desired_requests_count - already_requesting_count);
+
+		while requesting_chunks.len() < desired_requests_count {
+			if let Some(validator_index) = validators.pop_back() {
+				let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
+				gum::trace!(
+					target: LOG_TARGET,
+					?validator,
+					?validator_index,
+					?candidate_hash,
+					"Requesting chunk",
+				);
+
+				// Request data.
+				let raw_request = req_res::v1::ChunkFetchingRequest {
+					candidate_hash: params.candidate_hash,
+					index: validator_index,
+				};
+
+				let (req, res) = OutgoingRequest::new(Recipient::Authority(validator), raw_request);
+				requests.push(Requests::ChunkFetchingV1(req));
+
+				params.metrics.on_chunk_request_issued();
+				let timer = params.metrics.time_chunk_request();
+
+				requesting_chunks.push(Box::pin(async move {
+					let _timer = timer;
+					match res.await {
+						Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) =>
+							Ok(Some(chunk.recombine_into_chunk(&raw_request))),
+						Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
+						Err(e) => Err((validator_index, e)),
+					}
+				}));
+			} else {
+				break
+			}
+		}
+
+		sender
+			.send_message(NetworkBridgeTxMessage::SendRequests(
+				requests,
+				IfDisconnected::TryConnect,
+			))
+			.await;
+	}
+
+	/// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`.
+	async fn wait_for_chunks(
+		&mut self,
+		params: &RecoveryParams,
+		validators: &mut VecDeque<ValidatorIndex>,
+		requesting_chunks: &mut FuturesUndead<
+			Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>,
+		>,
+		can_conclude: impl Fn(usize, usize, usize, &RecoveryParams, usize) -> bool,
+	) -> (usize, usize) {
+		let metrics = &params.metrics;
+
+		let mut total_received_responses = 0;
+		let mut error_count = 0;
+
+		// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
+		// We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will
+		// return in that case for `launch_parallel_requests` to fill up slots again.
+		while let Some(request_result) =
+			requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
+		{
+			total_received_responses += 1;
+
+			match request_result {
+				Ok(Some(chunk)) =>
+					if is_chunk_valid(params, &chunk) {
+						metrics.on_chunk_request_succeeded();
+						gum::trace!(
+							target: LOG_TARGET,
+							candidate_hash = ?params.candidate_hash,
+							validator_index = ?chunk.index,
+							"Received valid chunk",
+						);
+						self.insert_chunk(chunk.index, chunk);
+					} else {
+						metrics.on_chunk_request_invalid();
+						error_count += 1;
+					},
+				Ok(None) => {
+					metrics.on_chunk_request_no_such_chunk();
+					error_count += 1;
+				},
+				Err((validator_index, e)) => {
+					error_count += 1;
+
+					gum::trace!(
+						target: LOG_TARGET,
+						candidate_hash= ?params.candidate_hash,
+						err = ?e,
+						?validator_index,
+						"Failure requesting chunk",
+					);
+
+					match e {
+						RequestError::InvalidResponse(_) => {
+							metrics.on_chunk_request_invalid();
+
+							gum::debug!(
+								target: LOG_TARGET,
+								candidate_hash = ?params.candidate_hash,
+								err = ?e,
+								?validator_index,
+								"Chunk fetching response was invalid",
+							);
+						},
+						RequestError::NetworkError(err) => {
+							// No debug logs on general network errors - that became very spammy
+							// occasionally.
+							if let RequestFailure::Network(OutboundFailure::Timeout) = err {
+								metrics.on_chunk_request_timeout();
+							} else {
+								metrics.on_chunk_request_error();
+							}
+
+							validators.push_front(validator_index);
+						},
+						RequestError::Canceled(_) => {
+							metrics.on_chunk_request_error();
+
+							validators.push_front(validator_index);
+						},
+					}
+				},
+			}
+
+			// Stop waiting for requests when we either can already recover the data
+			// or have gotten firm 'No' responses from enough validators.
+			if can_conclude(
+				validators.len(),
+				requesting_chunks.total_len(),
+				self.chunk_count(),
+				params,
+				error_count,
+			) {
+				gum::debug!(
+					target: LOG_TARGET,
+					candidate_hash = ?params.candidate_hash,
+					received_chunks_count = ?self.chunk_count(),
+					requested_chunks_count = ?requesting_chunks.len(),
+					threshold = ?params.threshold,
+					"Can conclude availability for a candidate",
+				);
+				break
+			}
+		}
+
+		(total_received_responses, error_count)
+	}
+}
+
+/// A stateful reconstruction of availability data in reference to
+/// a candidate hash.
+pub struct RecoveryTask<Sender: overseer::AvailabilityRecoverySenderTrait> {
+	sender: Sender,
+	params: RecoveryParams,
+	strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
+	state: State,
+}
+
+impl<Sender> RecoveryTask<Sender>
+where
+	Sender: overseer::AvailabilityRecoverySenderTrait,
+{
+	/// Instantiate a new recovery task.
+	pub fn new(
+		sender: Sender,
+		params: RecoveryParams,
+		strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
+	) -> Self {
+		Self { sender, params, strategies, state: State::new() }
+	}
+
+	async fn in_availability_store(&mut self) -> Option<AvailableData> {
+		if !self.params.bypass_availability_store {
+			let (tx, rx) = oneshot::channel();
+			self.sender
+				.send_message(AvailabilityStoreMessage::QueryAvailableData(
+					self.params.candidate_hash,
+					tx,
+				))
+				.await;
+
+			match rx.await {
+				Ok(Some(data)) => return Some(data),
+				Ok(None) => {},
+				Err(oneshot::Canceled) => {
+					gum::warn!(
+						target: LOG_TARGET,
+						candidate_hash = ?self.params.candidate_hash,
+						"Failed to reach the availability store",
+					)
+				},
+			}
+		}
+
+		None
+	}
+
+	/// Run this recovery task to completion. It will loop through the configured strategies
+	/// in-order and return whenever the first one recovers the full `AvailableData`.
+	pub async fn run(mut self) -> Result<AvailableData, RecoveryError> {
+		if let Some(data) = self.in_availability_store().await {
+			return Ok(data)
+		}
+
+		self.params.metrics.on_recovery_started();
+
+		let _timer = self.params.metrics.time_full_recovery();
+
+		while let Some(mut current_strategy) = self.strategies.pop_front() {
+			gum::debug!(
+				target: LOG_TARGET,
+				candidate_hash = ?self.params.candidate_hash,
+				"Starting `{}` strategy",
+				current_strategy.display_name(),
+			);
+
+			let res = current_strategy.run(&mut self.state, &mut self.sender, &self.params).await;
+
+			match res {
+				Err(RecoveryError::Unavailable) =>
+					if self.strategies.front().is_some() {
+						gum::debug!(
+							target: LOG_TARGET,
+							candidate_hash = ?self.params.candidate_hash,
+							"Recovery strategy `{}` did not conclude. Trying the next one.",
+							current_strategy.display_name(),
+						);
+						continue
+					},
+				Err(err) => {
+					match &err {
+						RecoveryError::Invalid => self.params.metrics.on_recovery_invalid(),
+						_ => self.params.metrics.on_recovery_failed(),
+					}
+					return Err(err)
+				},
+				Ok(data) => {
+					self.params.metrics.on_recovery_succeeded();
+					return Ok(data)
+				},
+			}
+		}
+
+		// We have no other strategies to try.
+		gum::warn!(
+			target: LOG_TARGET,
+			candidate_hash = ?self.params.candidate_hash,
+			"Recovery of available data failed.",
+		);
+		self.params.metrics.on_recovery_failed();
+
+		Err(RecoveryError::Unavailable)
+	}
+}
+
+/// `RecoveryStrategy` that sequentially tries to fetch the full `AvailableData` from
+/// already-connected validators in the configured validator set.
+pub struct FetchFull {
+	params: FetchFullParams,
+}
+
+pub struct FetchFullParams {
+	/// Validators that will be used for fetching the data.
+	pub validators: Vec<ValidatorIndex>,
+	/// Channel to the erasure task handler.
+	pub erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+}
+
+impl FetchFull {
+	/// Create a new `FetchFull` recovery strategy.
+	pub fn new(mut params: FetchFullParams) -> Self {
+		params.validators.shuffle(&mut rand::thread_rng());
+		Self { params }
+	}
+}
+
+#[async_trait::async_trait]
+impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for FetchFull {
+	fn display_name(&self) -> &'static str {
+		"Full recovery from backers"
+	}
+
+	async fn run(
+		&mut self,
+		_: &mut State,
+		sender: &mut Sender,
+		common_params: &RecoveryParams,
+	) -> Result<AvailableData, RecoveryError> {
+		loop {
+			// Pop the next validator, and proceed to next fetch_chunks_task if we're out.
+			let validator_index =
+				self.params.validators.pop().ok_or_else(|| RecoveryError::Unavailable)?;
+
+			// Request data.
+			let (req, response) = OutgoingRequest::new(
+				Recipient::Authority(
+					common_params.validator_authority_keys[validator_index.0 as usize].clone(),
+				),
+				req_res::v1::AvailableDataFetchingRequest {
+					candidate_hash: common_params.candidate_hash,
+				},
+			);
+
+			sender
+				.send_message(NetworkBridgeTxMessage::SendRequests(
+					vec![Requests::AvailableDataFetchingV1(req)],
+					IfDisconnected::ImmediateError,
+				))
+				.await;
+
+			match response.await {
+				Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
+					let (reencode_tx, reencode_rx) = oneshot::channel();
+					self.params
+						.erasure_task_tx
+						.send(ErasureTask::Reencode(
+							common_params.n_validators,
+							common_params.erasure_root,
+							data,
+							reencode_tx,
+						))
+						.await
+						.map_err(|_| RecoveryError::ChannelClosed)?;
+
+					let reencode_response =
+						reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
+
+					if let Some(data) = reencode_response {
+						gum::trace!(
+							target: LOG_TARGET,
+							candidate_hash = ?common_params.candidate_hash,
+							"Received full data",
+						);
+
+						return Ok(data)
+					} else {
+						gum::debug!(
+							target: LOG_TARGET,
+							candidate_hash = ?common_params.candidate_hash,
+							?validator_index,
+							"Invalid data response",
+						);
+
+						// it doesn't help to report the peer with req/res.
+					}
+				},
+				Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {},
+				Err(e) => gum::debug!(
+					target: LOG_TARGET,
+					candidate_hash = ?common_params.candidate_hash,
+					?validator_index,
+					err = ?e,
+					"Error fetching full available data."
+				),
+			}
+		}
+	}
+}
+
+/// `RecoveryStrategy` that requests chunks from validators, in parallel.
+pub struct FetchChunks {
+	/// How many requests have been unsuccessful so far.
+	error_count: usize,
+	/// Total number of responses that have been received, including failed ones.
+	total_received_responses: usize,
+	/// Collection of in-flight requests.
+	requesting_chunks: FuturesUndead<Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
+	/// A random shuffling of the validators which indicates the order in which we connect to the
+	/// validators and request the chunk from them.
+	validators: VecDeque<ValidatorIndex>,
+	/// Channel to the erasure task handler.
+	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+}
+
+/// Parameters specific to the `FetchChunks` strategy.
+pub struct FetchChunksParams {
+	/// Total number of validators.
+	pub n_validators: usize,
+	/// Channel to the erasure task handler.
+	pub erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+}
+
+impl FetchChunks {
+	/// Instantiate a new strategy.
+	pub fn new(params: FetchChunksParams) -> Self {
+		let mut shuffling: Vec<_> = (0..params.n_validators)
+			.map(|i| ValidatorIndex(i.try_into().expect("number of validators must fit in a u32")))
+			.collect();
+		shuffling.shuffle(&mut rand::thread_rng());
+
+		Self {
+			error_count: 0,
+			total_received_responses: 0,
+			requesting_chunks: FuturesUndead::new(),
+			validators: shuffling.into(),
+			erasure_task_tx: params.erasure_task_tx,
+		}
+	}
+
+	fn is_unavailable(
+		unrequested_validators: usize,
+		in_flight_requests: usize,
+		chunk_count: usize,
+		threshold: usize,
+	) -> bool {
+		is_unavailable(chunk_count, in_flight_requests, unrequested_validators, threshold)
+	}
+
+	/// Desired number of parallel requests.
+	///
+	/// For the given threshold (total required number of chunks) get the desired number of
+	/// requests we want to have running in parallel at this time.
+	fn get_desired_request_count(&self, chunk_count: usize, threshold: usize) -> usize {
+		// Upper bound for parallel requests.
+		// We want to limit this, so requests can be processed within the timeout and we limit the
+		// following feedback loop:
+		// 1. Requests fail due to timeout
+		// 2. We request more chunks to make up for it
+		// 3. Bandwidth is spread out even more, so we get even more timeouts
+		// 4. We request more chunks to make up for it ...
+		let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
+		// How many chunks are still needed?
+		let remaining_chunks = threshold.saturating_sub(chunk_count);
+		// What is the current error rate, so we can make up for it?
+		let inv_error_rate =
+			self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
+		// Actual number of requests we want to have in flight in parallel:
+		std::cmp::min(
+			max_requests_boundary,
+			remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
+		)
+	}
+
+	async fn attempt_recovery(
+		&mut self,
+		state: &mut State,
+		common_params: &RecoveryParams,
+	) -> Result<AvailableData, RecoveryError> {
+		let recovery_duration = common_params.metrics.time_erasure_recovery();
+
+		// Send request to reconstruct available data from chunks.
+		let (avilable_data_tx, available_data_rx) = oneshot::channel();
+		self.erasure_task_tx
+			.send(ErasureTask::Reconstruct(
+				common_params.n_validators,
+				// Safe to leave an empty vec in place, as we're stopping the recovery process if
+				// this reconstruct fails.
+				std::mem::take(&mut state.received_chunks),
+				avilable_data_tx,
+			))
+			.await
+			.map_err(|_| RecoveryError::ChannelClosed)?;
+
+		let available_data_response =
+			available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
+
+		match available_data_response {
+			Ok(data) => {
+				// Send request to re-encode the chunks and check merkle root.
+				let (reencode_tx, reencode_rx) = oneshot::channel();
+				self.erasure_task_tx
+					.send(ErasureTask::Reencode(
+						common_params.n_validators,
+						common_params.erasure_root,
+						data,
+						reencode_tx,
+					))
+					.await
+					.map_err(|_| RecoveryError::ChannelClosed)?;
+
+				let reencode_response =
+					reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
+
+				if let Some(data) = reencode_response {
+					gum::trace!(
+						target: LOG_TARGET,
+						candidate_hash = ?common_params.candidate_hash,
+						erasure_root = ?common_params.erasure_root,
+						"Data recovery from chunks complete",
+					);
+
+					Ok(data)
+				} else {
+					recovery_duration.map(|rd| rd.stop_and_discard());
+					gum::trace!(
+						target: LOG_TARGET,
+						candidate_hash = ?common_params.candidate_hash,
+						erasure_root = ?common_params.erasure_root,
+						"Data recovery error - root mismatch",
+					);
+
+					Err(RecoveryError::Invalid)
+				}
+			},
+			Err(err) => {
+				recovery_duration.map(|rd| rd.stop_and_discard());
+				gum::trace!(
+					target: LOG_TARGET,
+					candidate_hash = ?common_params.candidate_hash,
+					erasure_root = ?common_params.erasure_root,
+					?err,
+					"Data recovery error ",
+				);
+
+				Err(RecoveryError::Invalid)
+			},
+		}
+	}
+}
+
+#[async_trait::async_trait]
+impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for FetchChunks {
+	fn display_name(&self) -> &'static str {
+		"Fetch chunks"
+	}
+
+	async fn run(
+		&mut self,
+		state: &mut State,
+		sender: &mut Sender,
+		common_params: &RecoveryParams,
+	) -> Result<AvailableData, RecoveryError> {
+		// First query the store for any chunks we've got.
+		if !common_params.bypass_availability_store {
+			let local_chunk_indices = state.populate_from_av_store(common_params, sender).await;
+			self.validators.retain(|i| !local_chunk_indices.contains(i));
+		}
+
+		// No need to query the validators that have the chunks we already received.
+		self.validators.retain(|i| !state.received_chunks.contains_key(i));
+
+		loop {
+			// If received_chunks has more than threshold entries, attempt to recover the data.
+			// If that fails, or a re-encoding of it doesn't match the expected erasure root,
+			// return Err(RecoveryError::Invalid).
+			// Do this before requesting any chunks because we may have enough of them coming from
+			// past RecoveryStrategies.
+			if state.chunk_count() >= common_params.threshold {
+				return self.attempt_recovery(state, common_params).await
+			}
+
+			if Self::is_unavailable(
+				self.validators.len(),
+				self.requesting_chunks.total_len(),
+				state.chunk_count(),
+				common_params.threshold,
+			) {
+				gum::debug!(
+					target: LOG_TARGET,
+					candidate_hash = ?common_params.candidate_hash,
+					erasure_root = ?common_params.erasure_root,
+					received = %state.chunk_count(),
+					requesting = %self.requesting_chunks.len(),
+					total_requesting = %self.requesting_chunks.total_len(),
+					n_validators = %common_params.n_validators,
+					"Data recovery from chunks is not possible",
+				);
+
+				return Err(RecoveryError::Unavailable)
+			}
+
+			let desired_requests_count =
+				self.get_desired_request_count(state.chunk_count(), common_params.threshold);
+			let already_requesting_count = self.requesting_chunks.len();
+			gum::debug!(
+				target: LOG_TARGET,
+				?common_params.candidate_hash,
+				?desired_requests_count,
+				error_count= ?self.error_count,
+				total_received = ?self.total_received_responses,
+				threshold = ?common_params.threshold,
+				?already_requesting_count,
+				"Requesting availability chunks for a candidate",
+			);
+			state
+				.launch_parallel_chunk_requests(
+					common_params,
+					sender,
+					desired_requests_count,
+					&mut self.validators,
+					&mut self.requesting_chunks,
+				)
+				.await;
+
+			let (total_responses, error_count) = state
+				.wait_for_chunks(
+					common_params,
+					&mut self.validators,
+					&mut self.requesting_chunks,
+					|unrequested_validators, reqs, chunk_count, params, _error_count| {
+						chunk_count >= params.threshold ||
+							Self::is_unavailable(
+								unrequested_validators,
+								reqs,
+								chunk_count,
+								params.threshold,
+							)
+					},
+				)
+				.await;
+
+			self.total_received_responses += total_responses;
+			self.error_count += error_count;
+		}
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use polkadot_erasure_coding::recovery_threshold;
+
+	#[test]
+	fn parallel_request_calculation_works_as_expected() {
+		let num_validators = 100;
+		let threshold = recovery_threshold(num_validators).unwrap();
+		let (erasure_task_tx, _erasure_task_rx) = futures::channel::mpsc::channel(16);
+
+		let mut fetch_chunks_task =
+			FetchChunks::new(FetchChunksParams { n_validators: 100, erasure_task_tx });
+		assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold);
+		fetch_chunks_task.error_count = 1;
+		fetch_chunks_task.total_received_responses = 1;
+		// We saturate at threshold (34):
+		assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold);
+
+		fetch_chunks_task.total_received_responses = 2;
+		// With given error rate - still saturating:
+		assert_eq!(fetch_chunks_task.get_desired_request_count(1, threshold), threshold);
+		fetch_chunks_task.total_received_responses += 8;
+		// error rate: 1/10
+		// remaining chunks needed: threshold (34) - 9
+		// expected: 24 * (1+ 1/10) = (next greater integer) = 27
+		assert_eq!(fetch_chunks_task.get_desired_request_count(9, threshold), 27);
+		fetch_chunks_task.error_count = 0;
+		// With error count zero - we should fetch exactly as needed:
+		assert_eq!(fetch_chunks_task.get_desired_request_count(10, threshold), threshold - 10);
+	}
+}
diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs
index 60c2d38ab31ba57eef9590af7b80591b24dfae48..63ccf0e94f91ebad2a8cba3158d6db24159683c0 100644
--- a/polkadot/node/network/availability-recovery/src/tests.rs
+++ b/polkadot/node/network/availability-recovery/src/tests.rs
@@ -21,15 +21,19 @@ use futures::{executor, future};
 use futures_timer::Delay;
 
 use parity_scale_codec::Encode;
-use polkadot_node_network_protocol::request_response::{IncomingRequest, ReqProtocolNames};
+use polkadot_node_network_protocol::request_response::{
+	self as req_res, IncomingRequest, Recipient, ReqProtocolNames, Requests,
+};
 
 use super::*;
 
-use sc_network::config::RequestResponseConfig;
+use sc_network::{config::RequestResponseConfig, IfDisconnected, OutboundFailure, RequestFailure};
 
 use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
 use polkadot_node_primitives::{BlockData, PoV, Proof};
-use polkadot_node_subsystem::messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest};
+use polkadot_node_subsystem::messages::{
+	AllMessages, NetworkBridgeTxMessage, RuntimeApiMessage, RuntimeApiRequest,
+};
 use polkadot_node_subsystem_test_helpers::{
 	make_subsystem_context, mock::new_leaf, TestSubsystemContextHandle,
 };
@@ -204,7 +208,7 @@ use sp_keyring::Sr25519Keyring;
 enum Has {
 	No,
 	Yes,
-	NetworkError(sc_network::RequestFailure),
+	NetworkError(RequestFailure),
 	/// Make request not return at all, instead the sender is returned from the function.
 	///
 	/// Note, if you use `DoesNotReturn` you have to keep the returned senders alive, otherwise the
@@ -214,7 +218,7 @@ enum Has {
 
 impl Has {
 	fn timeout() -> Self {
-		Has::NetworkError(sc_network::RequestFailure::Network(sc_network::OutboundFailure::Timeout))
+		Has::NetworkError(RequestFailure::Network(OutboundFailure::Timeout))
 	}
 }
 
@@ -393,7 +397,7 @@ impl TestState {
 		candidate_hash: CandidateHash,
 		virtual_overseer: &mut VirtualOverseer,
 		who_has: impl Fn(usize) -> Has,
-	) -> Vec<oneshot::Sender<std::result::Result<Vec<u8>, sc_network::RequestFailure>>> {
+	) -> Vec<oneshot::Sender<std::result::Result<Vec<u8>, RequestFailure>>> {
 		let mut senders = Vec::new();
 		for _ in 0..self.validators.len() {
 			// Receive a request for a chunk.
@@ -1010,7 +1014,7 @@ fn recovers_from_only_chunks_if_pov_large() {
 			AvailabilityRecoveryMessage::RecoverAvailableData(
 				new_candidate.clone(),
 				test_state.session_index,
-				None,
+				Some(GroupIndex(0)),
 				tx,
 			),
 		)
@@ -1546,36 +1550,3 @@ fn invalid_local_chunk_is_ignored() {
 		(virtual_overseer, req_cfg)
 	});
 }
-
-#[test]
-fn parallel_request_calculation_works_as_expected() {
-	let num_validators = 100;
-	let threshold = recovery_threshold(num_validators).unwrap();
-	let (erasure_task_tx, _erasure_task_rx) = futures::channel::mpsc::channel(16);
-
-	let mut phase = RequestChunksFromValidators::new(100, erasure_task_tx);
-	assert_eq!(phase.get_desired_request_count(threshold), threshold);
-	phase.error_count = 1;
-	phase.total_received_responses = 1;
-	// We saturate at threshold (34):
-	assert_eq!(phase.get_desired_request_count(threshold), threshold);
-
-	let dummy_chunk =
-		ErasureChunk { chunk: Vec::new(), index: ValidatorIndex(0), proof: Proof::dummy_proof() };
-	phase.insert_chunk(ValidatorIndex(0), dummy_chunk.clone());
-	phase.total_received_responses = 2;
-	// With given error rate - still saturating:
-	assert_eq!(phase.get_desired_request_count(threshold), threshold);
-	for i in 1..9 {
-		phase.insert_chunk(ValidatorIndex(i), dummy_chunk.clone());
-	}
-	phase.total_received_responses += 8;
-	// error rate: 1/10
-	// remaining chunks needed: threshold (34) - 9
-	// expected: 24 * (1+ 1/10) = (next greater integer) = 27
-	assert_eq!(phase.get_desired_request_count(threshold), 27);
-	phase.insert_chunk(ValidatorIndex(9), dummy_chunk.clone());
-	phase.error_count = 0;
-	// With error count zero - we should fetch exactly as needed:
-	assert_eq!(phase.get_desired_request_count(threshold), threshold - phase.chunk_count());
-}