From 6f00edbc5567facd655a3f9162daf7786381ec5c Mon Sep 17 00:00:00 2001
From: Alin Dima <alin@parity.io>
Date: Wed, 20 Sep 2023 15:56:43 +0300
Subject: [PATCH] Refactor availability-recovery strategies (#1457)

Refactors availability-recovery strategies to allow for easily adding
new hotpaths and failover mechanisms.

The new interface allows for chaining multiple `RecoveryStrategy`-es
together, to cleanly express the relationship between them and share
state and code where neccessary/possible:

This was done in order to aid in implementing new hotpaths like
[systematic chunks
recovery](https://github.com/paritytech/polkadot-sdk/issues/598) and
[fetching from approval
checkers](https://github.com/paritytech/polkadot-sdk/issues/575).

Thanks to this design, intermediate state can be shared between the
strategies. For example, if the systematic chunks recovery retrieved
less than the needed amount of chunks, pass them over to the next
FetchChunks strategy, which will only need to recover the remaining
number of chunks.

Draft example of how a systematic chunk recovery strategy would look:
https://github.com/paritytech/polkadot-sdk/commit/667d870bdf1470525d66c13929d5eac7249dd995
(notice how easy it was to add and reuse code)

Note that this PR doesn't itself add any new strategy, it should fully
preserve backwards compatiblity in terms of functionality. Follow-up PRs
to add new strategies will come.
---
 Cargo.lock                                    |   1 +
 .../network/availability-recovery/Cargo.toml  |   1 +
 .../network/availability-recovery/src/lib.rs  | 824 +++--------------
 .../network/availability-recovery/src/task.rs | 830 ++++++++++++++++++
 .../availability-recovery/src/tests.rs        |  51 +-
 5 files changed, 953 insertions(+), 754 deletions(-)
 create mode 100644 polkadot/node/network/availability-recovery/src/task.rs

diff --git a/Cargo.lock b/Cargo.lock
index 9dfff599300..ce0d7011204 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -11508,6 +11508,7 @@ name = "polkadot-availability-recovery"
 version = "1.0.0"
 dependencies = [
  "assert_matches",
+ "async-trait",
  "env_logger 0.9.3",
  "fatality",
  "futures",
diff --git a/polkadot/node/network/availability-recovery/Cargo.toml b/polkadot/node/network/availability-recovery/Cargo.toml
index 07ff09c7e70..42c3abef547 100644
--- a/polkadot/node/network/availability-recovery/Cargo.toml
+++ b/polkadot/node/network/availability-recovery/Cargo.toml
@@ -11,6 +11,7 @@ schnellru = "0.2.1"
 rand = "0.8.5"
 fatality = "0.0.6"
 thiserror = "1.0.48"
+async-trait = "0.1.73"
 gum = { package = "tracing-gum", path = "../../gum" }
 
 polkadot-erasure-coding = { path = "../../../erasure-coding" }
diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs
index 99f42f4bf9f..e2146981da9 100644
--- a/polkadot/node/network/availability-recovery/src/lib.rs
+++ b/polkadot/node/network/availability-recovery/src/lib.rs
@@ -23,11 +23,10 @@ use std::{
 	iter::Iterator,
 	num::NonZeroUsize,
 	pin::Pin,
-	time::Duration,
 };
 
 use futures::{
-	channel::oneshot::{self, channel},
+	channel::oneshot,
 	future::{Future, FutureExt, RemoteHandle},
 	pin_mut,
 	prelude::*,
@@ -35,77 +34,55 @@ use futures::{
 	stream::{FuturesUnordered, StreamExt},
 	task::{Context, Poll},
 };
-use rand::seq::SliceRandom;
 use schnellru::{ByLength, LruMap};
+use task::{FetchChunks, FetchChunksParams, FetchFull, FetchFullParams};
 
 use fatality::Nested;
 use polkadot_erasure_coding::{
 	branch_hash, branches, obtain_chunks_v1, recovery_threshold, Error as ErasureEncodingError,
 };
-#[cfg(not(test))]
-use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
+use task::{RecoveryParams, RecoveryStrategy, RecoveryTask};
+
 use polkadot_node_network_protocol::{
-	request_response::{
-		self as req_res, outgoing::RequestError, v1 as request_v1, IncomingRequestReceiver,
-		OutgoingRequest, Recipient, Requests,
-	},
-	IfDisconnected, UnifiedReputationChange as Rep,
+	request_response::{v1 as request_v1, IncomingRequestReceiver},
+	UnifiedReputationChange as Rep,
 };
 use polkadot_node_primitives::{AvailableData, ErasureChunk};
 use polkadot_node_subsystem::{
 	errors::RecoveryError,
 	jaeger,
-	messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage, NetworkBridgeTxMessage},
-	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
-	SubsystemResult,
+	messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage},
+	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
+	SubsystemContext, SubsystemError, SubsystemResult,
 };
 use polkadot_node_subsystem_util::request_session_info;
 use polkadot_primitives::{
-	AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex,
-	Hash, HashT, IndexedVec, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex,
+	BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HashT,
+	SessionIndex, SessionInfo, ValidatorIndex,
 };
 
 mod error;
 mod futures_undead;
 mod metrics;
+mod task;
 use metrics::Metrics;
 
-use futures_undead::FuturesUndead;
-use sc_network::{OutboundFailure, RequestFailure};
-
 #[cfg(test)]
 mod tests;
 
 const LOG_TARGET: &str = "parachain::availability-recovery";
 
-// How many parallel recovery tasks should be running at once.
-const N_PARALLEL: usize = 50;
-
 // Size of the LRU cache where we keep recovered data.
 const LRU_SIZE: u32 = 16;
 
 const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
 
-/// Time after which we consider a request to have failed
-///
-/// and we should try more peers. Note in theory the request times out at the network level,
-/// measurements have shown, that in practice requests might actually take longer to fail in
-/// certain occasions. (The very least, authority discovery is not part of the timeout.)
-///
-/// For the time being this value is the same as the timeout on the networking layer, but as this
-/// timeout is more soft than the networking one, it might make sense to pick different values as
-/// well.
-#[cfg(not(test))]
-const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
-#[cfg(test)]
-const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
-
 /// PoV size limit in bytes for which prefer fetching from backers.
 const SMALL_POV_LIMIT: usize = 128 * 1024;
 
 #[derive(Clone, PartialEq)]
 /// The strategy we use to recover the PoV.
-pub enum RecoveryStrategy {
+pub enum RecoveryStrategyKind {
 	/// We always try the backing group first, then fallback to validator chunks.
 	BackersFirstAlways,
 	/// We try the backing group first if PoV size is lower than specified, then fallback to
@@ -113,101 +90,25 @@ pub enum RecoveryStrategy {
 	BackersFirstIfSizeLower(usize),
 	/// We always recover using validator chunks.
 	ChunksAlways,
-	/// Do not request data from the availability store.
-	/// This is the useful for nodes where the
-	/// availability-store subsystem is not expected to run,
-	/// such as collators.
-	BypassAvailabilityStore,
 }
 
-impl RecoveryStrategy {
-	/// Returns true if the strategy needs backing group index.
-	pub fn needs_backing_group(&self) -> bool {
-		match self {
-			RecoveryStrategy::BackersFirstAlways | RecoveryStrategy::BackersFirstIfSizeLower(_) =>
-				true,
-			_ => false,
-		}
-	}
-
-	/// Returns the PoV size limit in bytes for `BackersFirstIfSizeLower` strategy, otherwise
-	/// `None`.
-	pub fn pov_size_limit(&self) -> Option<usize> {
-		match *self {
-			RecoveryStrategy::BackersFirstIfSizeLower(limit) => Some(limit),
-			_ => None,
-		}
-	}
-}
 /// The Availability Recovery Subsystem.
 pub struct AvailabilityRecoverySubsystem {
 	/// PoV recovery strategy to use.
-	recovery_strategy: RecoveryStrategy,
+	recovery_strategy_kind: RecoveryStrategyKind,
+	// If this is true, do not request data from the availability store.
+	/// This is the useful for nodes where the
+	/// availability-store subsystem is not expected to run,
+	/// such as collators.
+	bypass_availability_store: bool,
 	/// Receiver for available data requests.
 	req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
 	/// Metrics for this subsystem.
 	metrics: Metrics,
 }
 
-struct RequestFromBackers {
-	// a random shuffling of the validators from the backing group which indicates the order
-	// in which we connect to them and request the chunk.
-	shuffled_backers: Vec<ValidatorIndex>,
-	// channel to the erasure task handler.
-	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-}
-
-struct RequestChunksFromValidators {
-	/// How many request have been unsuccessful so far.
-	error_count: usize,
-	/// Total number of responses that have been received.
-	///
-	/// including failed ones.
-	total_received_responses: usize,
-	/// a random shuffling of the validators which indicates the order in which we connect to the
-	/// validators and request the chunk from them.
-	shuffling: VecDeque<ValidatorIndex>,
-	/// Chunks received so far.
-	received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
-	/// Pending chunk requests with soft timeout.
-	requesting_chunks: FuturesUndead<Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
-	// channel to the erasure task handler.
-	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-}
-
-struct RecoveryParams {
-	/// Discovery ids of `validators`.
-	validator_authority_keys: Vec<AuthorityDiscoveryId>,
-
-	/// Validators relevant to this `RecoveryTask`.
-	validators: IndexedVec<ValidatorIndex, ValidatorId>,
-
-	/// The number of pieces needed.
-	threshold: usize,
-
-	/// A hash of the relevant candidate.
-	candidate_hash: CandidateHash,
-
-	/// The root of the erasure encoding of the para block.
-	erasure_root: Hash,
-
-	/// Metrics to report
-	metrics: Metrics,
-
-	/// Do not request data from availability-store
-	bypass_availability_store: bool,
-}
-
-/// Source the availability data either by means
-/// of direct request response protocol to
-/// backers (a.k.a. fast-path), or recover from chunks.
-enum Source {
-	RequestFromBackers(RequestFromBackers),
-	RequestChunks(RequestChunksFromValidators),
-}
-
 /// Expensive erasure coding computations that we want to run on a blocking thread.
-enum ErasureTask {
+pub enum ErasureTask {
 	/// Reconstructs `AvailableData` from chunks given `n_validators`.
 	Reconstruct(
 		usize,
@@ -219,486 +120,6 @@ enum ErasureTask {
 	Reencode(usize, Hash, AvailableData, oneshot::Sender<Option<AvailableData>>),
 }
 
-/// A stateful reconstruction of availability data in reference to
-/// a candidate hash.
-struct RecoveryTask<Sender> {
-	sender: Sender,
-
-	/// The parameters of the recovery process.
-	params: RecoveryParams,
-
-	/// The source to obtain the availability data from.
-	source: Source,
-
-	// channel to the erasure task handler.
-	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-}
-
-impl RequestFromBackers {
-	fn new(
-		mut backers: Vec<ValidatorIndex>,
-		erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-	) -> Self {
-		backers.shuffle(&mut rand::thread_rng());
-
-		RequestFromBackers { shuffled_backers: backers, erasure_task_tx }
-	}
-
-	// Run this phase to completion.
-	async fn run(
-		&mut self,
-		params: &RecoveryParams,
-		sender: &mut impl overseer::AvailabilityRecoverySenderTrait,
-	) -> Result<AvailableData, RecoveryError> {
-		gum::trace!(
-			target: LOG_TARGET,
-			candidate_hash = ?params.candidate_hash,
-			erasure_root = ?params.erasure_root,
-			"Requesting from backers",
-		);
-		loop {
-			// Pop the next backer, and proceed to next phase if we're out.
-			let validator_index =
-				self.shuffled_backers.pop().ok_or_else(|| RecoveryError::Unavailable)?;
-
-			// Request data.
-			let (req, response) = OutgoingRequest::new(
-				Recipient::Authority(
-					params.validator_authority_keys[validator_index.0 as usize].clone(),
-				),
-				req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash },
-			);
-
-			sender
-				.send_message(NetworkBridgeTxMessage::SendRequests(
-					vec![Requests::AvailableDataFetchingV1(req)],
-					IfDisconnected::ImmediateError,
-				))
-				.await;
-
-			match response.await {
-				Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
-					let (reencode_tx, reencode_rx) = channel();
-					self.erasure_task_tx
-						.send(ErasureTask::Reencode(
-							params.validators.len(),
-							params.erasure_root,
-							data,
-							reencode_tx,
-						))
-						.await
-						.map_err(|_| RecoveryError::ChannelClosed)?;
-
-					let reencode_response =
-						reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
-
-					if let Some(data) = reencode_response {
-						gum::trace!(
-							target: LOG_TARGET,
-							candidate_hash = ?params.candidate_hash,
-							"Received full data",
-						);
-
-						return Ok(data)
-					} else {
-						gum::debug!(
-							target: LOG_TARGET,
-							candidate_hash = ?params.candidate_hash,
-							?validator_index,
-							"Invalid data response",
-						);
-
-						// it doesn't help to report the peer with req/res.
-					}
-				},
-				Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {},
-				Err(e) => gum::debug!(
-					target: LOG_TARGET,
-					candidate_hash = ?params.candidate_hash,
-					?validator_index,
-					err = ?e,
-					"Error fetching full available data."
-				),
-			}
-		}
-	}
-}
-
-impl RequestChunksFromValidators {
-	fn new(
-		n_validators: u32,
-		erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
-	) -> Self {
-		let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect();
-		shuffling.shuffle(&mut rand::thread_rng());
-
-		RequestChunksFromValidators {
-			error_count: 0,
-			total_received_responses: 0,
-			shuffling: shuffling.into(),
-			received_chunks: HashMap::new(),
-			requesting_chunks: FuturesUndead::new(),
-			erasure_task_tx,
-		}
-	}
-
-	fn is_unavailable(&self, params: &RecoveryParams) -> bool {
-		is_unavailable(
-			self.chunk_count(),
-			self.requesting_chunks.total_len(),
-			self.shuffling.len(),
-			params.threshold,
-		)
-	}
-
-	fn chunk_count(&self) -> usize {
-		self.received_chunks.len()
-	}
-
-	fn insert_chunk(&mut self, validator_index: ValidatorIndex, chunk: ErasureChunk) {
-		self.received_chunks.insert(validator_index, chunk);
-	}
-
-	fn can_conclude(&self, params: &RecoveryParams) -> bool {
-		self.chunk_count() >= params.threshold || self.is_unavailable(params)
-	}
-
-	/// Desired number of parallel requests.
-	///
-	/// For the given threshold (total required number of chunks) get the desired number of
-	/// requests we want to have running in parallel at this time.
-	fn get_desired_request_count(&self, threshold: usize) -> usize {
-		// Upper bound for parallel requests.
-		// We want to limit this, so requests can be processed within the timeout and we limit the
-		// following feedback loop:
-		// 1. Requests fail due to timeout
-		// 2. We request more chunks to make up for it
-		// 3. Bandwidth is spread out even more, so we get even more timeouts
-		// 4. We request more chunks to make up for it ...
-		let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
-		// How many chunks are still needed?
-		let remaining_chunks = threshold.saturating_sub(self.chunk_count());
-		// What is the current error rate, so we can make up for it?
-		let inv_error_rate =
-			self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
-		// Actual number of requests we want to have in flight in parallel:
-		std::cmp::min(
-			max_requests_boundary,
-			remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
-		)
-	}
-
-	async fn launch_parallel_requests<Sender>(
-		&mut self,
-		params: &RecoveryParams,
-		sender: &mut Sender,
-	) where
-		Sender: overseer::AvailabilityRecoverySenderTrait,
-	{
-		let num_requests = self.get_desired_request_count(params.threshold);
-		let candidate_hash = &params.candidate_hash;
-		let already_requesting_count = self.requesting_chunks.len();
-
-		gum::debug!(
-			target: LOG_TARGET,
-			?candidate_hash,
-			?num_requests,
-			error_count= ?self.error_count,
-			total_received = ?self.total_received_responses,
-			threshold = ?params.threshold,
-			?already_requesting_count,
-			"Requesting availability chunks for a candidate",
-		);
-		let mut requests = Vec::with_capacity(num_requests - already_requesting_count);
-
-		while self.requesting_chunks.len() < num_requests {
-			if let Some(validator_index) = self.shuffling.pop_back() {
-				let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
-				gum::trace!(
-					target: LOG_TARGET,
-					?validator,
-					?validator_index,
-					?candidate_hash,
-					"Requesting chunk",
-				);
-
-				// Request data.
-				let raw_request = req_res::v1::ChunkFetchingRequest {
-					candidate_hash: params.candidate_hash,
-					index: validator_index,
-				};
-
-				let (req, res) = OutgoingRequest::new(Recipient::Authority(validator), raw_request);
-				requests.push(Requests::ChunkFetchingV1(req));
-
-				params.metrics.on_chunk_request_issued();
-				let timer = params.metrics.time_chunk_request();
-
-				self.requesting_chunks.push(Box::pin(async move {
-					let _timer = timer;
-					match res.await {
-						Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) =>
-							Ok(Some(chunk.recombine_into_chunk(&raw_request))),
-						Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
-						Err(e) => Err((validator_index, e)),
-					}
-				}));
-			} else {
-				break
-			}
-		}
-
-		sender
-			.send_message(NetworkBridgeTxMessage::SendRequests(
-				requests,
-				IfDisconnected::TryConnect,
-			))
-			.await;
-	}
-
-	/// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`.
-	async fn wait_for_chunks(&mut self, params: &RecoveryParams) {
-		let metrics = &params.metrics;
-
-		// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
-		// We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will
-		// return in that case for `launch_parallel_requests` to fill up slots again.
-		while let Some(request_result) =
-			self.requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
-		{
-			self.total_received_responses += 1;
-
-			match request_result {
-				Ok(Some(chunk)) =>
-					if is_chunk_valid(params, &chunk) {
-						metrics.on_chunk_request_succeeded();
-						gum::trace!(
-							target: LOG_TARGET,
-							candidate_hash = ?params.candidate_hash,
-							validator_index = ?chunk.index,
-							"Received valid chunk",
-						);
-						self.insert_chunk(chunk.index, chunk);
-					} else {
-						metrics.on_chunk_request_invalid();
-						self.error_count += 1;
-					},
-				Ok(None) => {
-					metrics.on_chunk_request_no_such_chunk();
-					self.error_count += 1;
-				},
-				Err((validator_index, e)) => {
-					self.error_count += 1;
-
-					gum::trace!(
-						target: LOG_TARGET,
-						candidate_hash= ?params.candidate_hash,
-						err = ?e,
-						?validator_index,
-						"Failure requesting chunk",
-					);
-
-					match e {
-						RequestError::InvalidResponse(_) => {
-							metrics.on_chunk_request_invalid();
-
-							gum::debug!(
-								target: LOG_TARGET,
-								candidate_hash = ?params.candidate_hash,
-								err = ?e,
-								?validator_index,
-								"Chunk fetching response was invalid",
-							);
-						},
-						RequestError::NetworkError(err) => {
-							// No debug logs on general network errors - that became very spammy
-							// occasionally.
-							if let RequestFailure::Network(OutboundFailure::Timeout) = err {
-								metrics.on_chunk_request_timeout();
-							} else {
-								metrics.on_chunk_request_error();
-							}
-
-							self.shuffling.push_front(validator_index);
-						},
-						RequestError::Canceled(_) => {
-							metrics.on_chunk_request_error();
-
-							self.shuffling.push_front(validator_index);
-						},
-					}
-				},
-			}
-
-			// Stop waiting for requests when we either can already recover the data
-			// or have gotten firm 'No' responses from enough validators.
-			if self.can_conclude(params) {
-				gum::debug!(
-					target: LOG_TARGET,
-					candidate_hash = ?params.candidate_hash,
-					received_chunks_count = ?self.chunk_count(),
-					requested_chunks_count = ?self.requesting_chunks.len(),
-					threshold = ?params.threshold,
-					"Can conclude availability for a candidate",
-				);
-				break
-			}
-		}
-	}
-
-	async fn run<Sender>(
-		&mut self,
-		params: &RecoveryParams,
-		sender: &mut Sender,
-	) -> Result<AvailableData, RecoveryError>
-	where
-		Sender: overseer::AvailabilityRecoverySenderTrait,
-	{
-		let metrics = &params.metrics;
-
-		// First query the store for any chunks we've got.
-		if !params.bypass_availability_store {
-			let (tx, rx) = oneshot::channel();
-			sender
-				.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
-				.await;
-
-			match rx.await {
-				Ok(chunks) => {
-					// This should either be length 1 or 0. If we had the whole data,
-					// we wouldn't have reached this stage.
-					let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect();
-					self.shuffling.retain(|i| !chunk_indices.contains(i));
-
-					for chunk in chunks {
-						if is_chunk_valid(params, &chunk) {
-							gum::trace!(
-								target: LOG_TARGET,
-								candidate_hash = ?params.candidate_hash,
-								validator_index = ?chunk.index,
-								"Found valid chunk on disk"
-							);
-							self.insert_chunk(chunk.index, chunk);
-						} else {
-							gum::error!(
-								target: LOG_TARGET,
-								"Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!"
-							);
-						};
-					}
-				},
-				Err(oneshot::Canceled) => {
-					gum::warn!(
-						target: LOG_TARGET,
-						candidate_hash = ?params.candidate_hash,
-						"Failed to reach the availability store"
-					);
-				},
-			}
-		}
-
-		let _recovery_timer = metrics.time_full_recovery();
-
-		loop {
-			if self.is_unavailable(&params) {
-				gum::debug!(
-					target: LOG_TARGET,
-					candidate_hash = ?params.candidate_hash,
-					erasure_root = ?params.erasure_root,
-					received = %self.chunk_count(),
-					requesting = %self.requesting_chunks.len(),
-					total_requesting = %self.requesting_chunks.total_len(),
-					n_validators = %params.validators.len(),
-					"Data recovery is not possible",
-				);
-
-				metrics.on_recovery_failed();
-
-				return Err(RecoveryError::Unavailable)
-			}
-
-			self.launch_parallel_requests(params, sender).await;
-			self.wait_for_chunks(params).await;
-
-			// If received_chunks has more than threshold entries, attempt to recover the data.
-			// If that fails, or a re-encoding of it doesn't match the expected erasure root,
-			// return Err(RecoveryError::Invalid)
-			if self.chunk_count() >= params.threshold {
-				let recovery_duration = metrics.time_erasure_recovery();
-
-				// Send request to reconstruct available data from chunks.
-				let (avilable_data_tx, available_data_rx) = channel();
-				self.erasure_task_tx
-					.send(ErasureTask::Reconstruct(
-						params.validators.len(),
-						std::mem::take(&mut self.received_chunks),
-						avilable_data_tx,
-					))
-					.await
-					.map_err(|_| RecoveryError::ChannelClosed)?;
-
-				let available_data_response =
-					available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
-
-				return match available_data_response {
-					Ok(data) => {
-						// Send request to re-encode the chunks and check merkle root.
-						let (reencode_tx, reencode_rx) = channel();
-						self.erasure_task_tx
-							.send(ErasureTask::Reencode(
-								params.validators.len(),
-								params.erasure_root,
-								data,
-								reencode_tx,
-							))
-							.await
-							.map_err(|_| RecoveryError::ChannelClosed)?;
-
-						let reencode_response =
-							reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
-
-						if let Some(data) = reencode_response {
-							gum::trace!(
-								target: LOG_TARGET,
-								candidate_hash = ?params.candidate_hash,
-								erasure_root = ?params.erasure_root,
-								"Data recovery complete",
-							);
-							metrics.on_recovery_succeeded();
-
-							Ok(data)
-						} else {
-							recovery_duration.map(|rd| rd.stop_and_discard());
-							gum::trace!(
-								target: LOG_TARGET,
-								candidate_hash = ?params.candidate_hash,
-								erasure_root = ?params.erasure_root,
-								"Data recovery - root mismatch",
-							);
-							metrics.on_recovery_invalid();
-
-							Err(RecoveryError::Invalid)
-						}
-					},
-					Err(err) => {
-						recovery_duration.map(|rd| rd.stop_and_discard());
-						gum::trace!(
-							target: LOG_TARGET,
-							candidate_hash = ?params.candidate_hash,
-							erasure_root = ?params.erasure_root,
-							?err,
-							"Data recovery error ",
-						);
-						metrics.on_recovery_invalid();
-
-						Err(RecoveryError::Invalid)
-					},
-				}
-			}
-		}
-	}
-}
-
 const fn is_unavailable(
 	received_chunks: usize,
 	requesting_chunks: usize,
@@ -777,60 +198,6 @@ fn reconstructed_data_matches_root(
 	branches.root() == *expected_root
 }
 
-impl<Sender> RecoveryTask<Sender>
-where
-	Sender: overseer::AvailabilityRecoverySenderTrait,
-{
-	async fn run(mut self) -> Result<AvailableData, RecoveryError> {
-		// First just see if we have the data available locally.
-		if !self.params.bypass_availability_store {
-			let (tx, rx) = oneshot::channel();
-			self.sender
-				.send_message(AvailabilityStoreMessage::QueryAvailableData(
-					self.params.candidate_hash,
-					tx,
-				))
-				.await;
-
-			match rx.await {
-				Ok(Some(data)) => return Ok(data),
-				Ok(None) => {},
-				Err(oneshot::Canceled) => {
-					gum::warn!(
-						target: LOG_TARGET,
-						candidate_hash = ?self.params.candidate_hash,
-						"Failed to reach the availability store",
-					)
-				},
-			}
-		}
-
-		self.params.metrics.on_recovery_started();
-
-		loop {
-			// These only fail if we cannot reach the underlying subsystem, which case there is
-			// nothing meaningful we can do.
-			match self.source {
-				Source::RequestFromBackers(ref mut from_backers) => {
-					match from_backers.run(&self.params, &mut self.sender).await {
-						Ok(data) => break Ok(data),
-						Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid),
-						Err(RecoveryError::ChannelClosed) =>
-							break Err(RecoveryError::ChannelClosed),
-						Err(RecoveryError::Unavailable) =>
-							self.source = Source::RequestChunks(RequestChunksFromValidators::new(
-								self.params.validators.len() as _,
-								self.erasure_task_tx.clone(),
-							)),
-					}
-				},
-				Source::RequestChunks(ref mut from_all) =>
-					break from_all.run(&self.params, &mut self.sender).await,
-			}
-		}
-	}
-}
-
 /// Accumulate all awaiting sides for some particular `AvailableData`.
 struct RecoveryHandle {
 	candidate_hash: CandidateHash,
@@ -973,65 +340,23 @@ async fn launch_recovery_task<Context>(
 	ctx: &mut Context,
 	session_info: SessionInfo,
 	receipt: CandidateReceipt,
-	mut backing_group: Option<GroupIndex>,
 	response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
 	metrics: &Metrics,
-	recovery_strategy: &RecoveryStrategy,
-	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+	recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
+	bypass_availability_store: bool,
 ) -> error::Result<()> {
 	let candidate_hash = receipt.hash();
 	let params = RecoveryParams {
 		validator_authority_keys: session_info.discovery_keys.clone(),
-		validators: session_info.validators.clone(),
+		n_validators: session_info.validators.len(),
 		threshold: recovery_threshold(session_info.validators.len())?,
 		candidate_hash,
 		erasure_root: receipt.descriptor.erasure_root,
 		metrics: metrics.clone(),
-		bypass_availability_store: recovery_strategy == &RecoveryStrategy::BypassAvailabilityStore,
+		bypass_availability_store,
 	};
 
-	if let Some(small_pov_limit) = recovery_strategy.pov_size_limit() {
-		// Get our own chunk size to get an estimate of the PoV size.
-		let chunk_size: Result<Option<usize>, error::Error> =
-			query_chunk_size(ctx, candidate_hash).await;
-		if let Ok(Some(chunk_size)) = chunk_size {
-			let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len()) / 3;
-			let prefer_backing_group = pov_size_estimate < small_pov_limit;
-
-			gum::trace!(
-				target: LOG_TARGET,
-				?candidate_hash,
-				pov_size_estimate,
-				small_pov_limit,
-				enabled = prefer_backing_group,
-				"Prefer fetch from backing group",
-			);
-
-			backing_group = backing_group.filter(|_| {
-				// We keep the backing group only if `1/3` of chunks sum up to less than
-				// `small_pov_limit`.
-				prefer_backing_group
-			});
-		}
-	}
-
-	let phase = backing_group
-		.and_then(|g| session_info.validator_groups.get(g))
-		.map(|group| {
-			Source::RequestFromBackers(RequestFromBackers::new(
-				group.clone(),
-				erasure_task_tx.clone(),
-			))
-		})
-		.unwrap_or_else(|| {
-			Source::RequestChunks(RequestChunksFromValidators::new(
-				params.validators.len() as _,
-				erasure_task_tx.clone(),
-			))
-		});
-
-	let recovery_task =
-		RecoveryTask { sender: ctx.sender().clone(), params, source: phase, erasure_task_tx };
+	let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
 
 	let (remote, remote_handle) = recovery_task.run().remote_handle();
 
@@ -1062,8 +387,9 @@ async fn handle_recover<Context>(
 	backing_group: Option<GroupIndex>,
 	response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
 	metrics: &Metrics,
-	recovery_strategy: &RecoveryStrategy,
 	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+	recovery_strategy_kind: RecoveryStrategyKind,
+	bypass_availability_store: bool,
 ) -> error::Result<()> {
 	let candidate_hash = receipt.hash();
 
@@ -1098,19 +424,71 @@ async fn handle_recover<Context>(
 
 	let _span = span.child("session-info-ctx-received");
 	match session_info {
-		Some(session_info) =>
+		Some(session_info) => {
+			let mut recovery_strategies: VecDeque<
+				Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>,
+			> = VecDeque::with_capacity(2);
+
+			if let Some(backing_group) = backing_group {
+				if let Some(backing_validators) = session_info.validator_groups.get(backing_group) {
+					let mut small_pov_size = true;
+
+					if let RecoveryStrategyKind::BackersFirstIfSizeLower(small_pov_limit) =
+						recovery_strategy_kind
+					{
+						// Get our own chunk size to get an estimate of the PoV size.
+						let chunk_size: Result<Option<usize>, error::Error> =
+							query_chunk_size(ctx, candidate_hash).await;
+						if let Ok(Some(chunk_size)) = chunk_size {
+							let pov_size_estimate =
+								chunk_size.saturating_mul(session_info.validators.len()) / 3;
+							small_pov_size = pov_size_estimate < small_pov_limit;
+
+							gum::trace!(
+								target: LOG_TARGET,
+								?candidate_hash,
+								pov_size_estimate,
+								small_pov_limit,
+								enabled = small_pov_size,
+								"Prefer fetch from backing group",
+							);
+						} else {
+							// we have a POV limit but were not able to query the chunk size, so
+							// don't use the backing group.
+							small_pov_size = false;
+						}
+					};
+
+					match (&recovery_strategy_kind, small_pov_size) {
+						(RecoveryStrategyKind::BackersFirstAlways, _) |
+						(RecoveryStrategyKind::BackersFirstIfSizeLower(_), true) => recovery_strategies.push_back(
+							Box::new(FetchFull::new(FetchFullParams {
+								validators: backing_validators.to_vec(),
+								erasure_task_tx: erasure_task_tx.clone(),
+							})),
+						),
+						_ => {},
+					};
+				}
+			}
+
+			recovery_strategies.push_back(Box::new(FetchChunks::new(FetchChunksParams {
+				n_validators: session_info.validators.len(),
+				erasure_task_tx,
+			})));
+
 			launch_recovery_task(
 				state,
 				ctx,
 				session_info,
 				receipt,
-				backing_group,
 				response_sender,
 				metrics,
-				recovery_strategy,
-				erasure_task_tx,
+				recovery_strategies,
+				bypass_availability_store,
 			)
-			.await,
+			.await
+		},
 		None => {
 			gum::warn!(target: LOG_TARGET, "SessionInfo is `None` at {:?}", state.live_block);
 			response_sender
@@ -1155,7 +533,12 @@ impl AvailabilityRecoverySubsystem {
 		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
 		metrics: Metrics,
 	) -> Self {
-		Self { recovery_strategy: RecoveryStrategy::BypassAvailabilityStore, req_receiver, metrics }
+		Self {
+			recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
+			bypass_availability_store: true,
+			req_receiver,
+			metrics,
+		}
 	}
 
 	/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
@@ -1164,7 +547,12 @@ impl AvailabilityRecoverySubsystem {
 		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
 		metrics: Metrics,
 	) -> Self {
-		Self { recovery_strategy: RecoveryStrategy::BackersFirstAlways, req_receiver, metrics }
+		Self {
+			recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways,
+			bypass_availability_store: false,
+			req_receiver,
+			metrics,
+		}
 	}
 
 	/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
@@ -1172,7 +560,12 @@ impl AvailabilityRecoverySubsystem {
 		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
 		metrics: Metrics,
 	) -> Self {
-		Self { recovery_strategy: RecoveryStrategy::ChunksAlways, req_receiver, metrics }
+		Self {
+			recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways,
+			bypass_availability_store: false,
+			req_receiver,
+			metrics,
+		}
 	}
 
 	/// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is
@@ -1182,7 +575,8 @@ impl AvailabilityRecoverySubsystem {
 		metrics: Metrics,
 	) -> Self {
 		Self {
-			recovery_strategy: RecoveryStrategy::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
+			recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
+			bypass_availability_store: false,
 			req_receiver,
 			metrics,
 		}
@@ -1190,7 +584,8 @@ impl AvailabilityRecoverySubsystem {
 
 	async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
 		let mut state = State::default();
-		let Self { recovery_strategy, mut req_receiver, metrics } = self;
+		let Self { mut req_receiver, metrics, recovery_strategy_kind, bypass_availability_store } =
+			self;
 
 		let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
 		let mut erasure_task_rx = erasure_task_rx.fuse();
@@ -1275,11 +670,12 @@ impl AvailabilityRecoverySubsystem {
 										&mut ctx,
 										receipt,
 										session_index,
-										maybe_backing_group.filter(|_| recovery_strategy.needs_backing_group()),
+										maybe_backing_group,
 										response_sender,
 										&metrics,
-										&recovery_strategy,
 										erasure_task_tx.clone(),
+										recovery_strategy_kind.clone(),
+										bypass_availability_store
 									).await {
 										gum::warn!(
 											target: LOG_TARGET,
@@ -1295,7 +691,7 @@ impl AvailabilityRecoverySubsystem {
 				in_req = recv_req => {
 					match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
 						Ok(req) => {
-							if recovery_strategy == RecoveryStrategy::BypassAvailabilityStore {
+							if bypass_availability_store {
 								gum::debug!(
 									target: LOG_TARGET,
 									"Skipping request to availability-store.",
diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs
new file mode 100644
index 00000000000..d5bc2da8494
--- /dev/null
+++ b/polkadot/node/network/availability-recovery/src/task.rs
@@ -0,0 +1,830 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Recovery task and associated strategies.
+
+#![warn(missing_docs)]
+
+use crate::{
+	futures_undead::FuturesUndead, is_chunk_valid, is_unavailable, metrics::Metrics, ErasureTask,
+	LOG_TARGET,
+};
+use futures::{channel::oneshot, SinkExt};
+#[cfg(not(test))]
+use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
+use polkadot_node_network_protocol::request_response::{
+	self as req_res, outgoing::RequestError, OutgoingRequest, Recipient, Requests,
+};
+use polkadot_node_primitives::{AvailableData, ErasureChunk};
+use polkadot_node_subsystem::{
+	messages::{AvailabilityStoreMessage, NetworkBridgeTxMessage},
+	overseer, RecoveryError,
+};
+use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, Hash, ValidatorIndex};
+use rand::seq::SliceRandom;
+use sc_network::{IfDisconnected, OutboundFailure, RequestFailure};
+use std::{
+	collections::{HashMap, VecDeque},
+	time::Duration,
+};
+
+// How many parallel recovery tasks should be running at once.
+const N_PARALLEL: usize = 50;
+
+/// Time after which we consider a request to have failed
+///
+/// and we should try more peers. Note in theory the request times out at the network level,
+/// measurements have shown, that in practice requests might actually take longer to fail in
+/// certain occasions. (The very least, authority discovery is not part of the timeout.)
+///
+/// For the time being this value is the same as the timeout on the networking layer, but as this
+/// timeout is more soft than the networking one, it might make sense to pick different values as
+/// well.
+#[cfg(not(test))]
+const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
+#[cfg(test)]
+const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
+
+#[async_trait::async_trait]
+/// Common trait for runnable recovery strategies.
+pub trait RecoveryStrategy<Sender: overseer::AvailabilityRecoverySenderTrait>: Send {
+	/// Main entry point of the strategy.
+	async fn run(
+		&mut self,
+		state: &mut State,
+		sender: &mut Sender,
+		common_params: &RecoveryParams,
+	) -> Result<AvailableData, RecoveryError>;
+
+	/// Return the name of the strategy for logging purposes.
+	fn display_name(&self) -> &'static str;
+}
+
+/// Recovery parameters common to all strategies in a `RecoveryTask`.
+pub struct RecoveryParams {
+	/// Discovery ids of `validators`.
+	pub validator_authority_keys: Vec<AuthorityDiscoveryId>,
+
+	/// Number of validators.
+	pub n_validators: usize,
+
+	/// The number of chunks needed.
+	pub threshold: usize,
+
+	/// A hash of the relevant candidate.
+	pub candidate_hash: CandidateHash,
+
+	/// The root of the erasure encoding of the candidate.
+	pub erasure_root: Hash,
+
+	/// Metrics to report.
+	pub metrics: Metrics,
+
+	/// Do not request data from availability-store. Useful for collators.
+	pub bypass_availability_store: bool,
+}
+
+/// Intermediate/common data that must be passed between `RecoveryStrategy`s belonging to the
+/// same `RecoveryTask`.
+pub struct State {
+	/// Chunks received so far.
+	received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
+}
+
+impl State {
+	fn new() -> Self {
+		Self { received_chunks: HashMap::new() }
+	}
+
+	fn insert_chunk(&mut self, validator: ValidatorIndex, chunk: ErasureChunk) {
+		self.received_chunks.insert(validator, chunk);
+	}
+
+	fn chunk_count(&self) -> usize {
+		self.received_chunks.len()
+	}
+
+	/// Retrieve the local chunks held in the av-store (either 0 or 1).
+	async fn populate_from_av_store<Sender: overseer::AvailabilityRecoverySenderTrait>(
+		&mut self,
+		params: &RecoveryParams,
+		sender: &mut Sender,
+	) -> Vec<ValidatorIndex> {
+		let (tx, rx) = oneshot::channel();
+		sender
+			.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
+			.await;
+
+		match rx.await {
+			Ok(chunks) => {
+				// This should either be length 1 or 0. If we had the whole data,
+				// we wouldn't have reached this stage.
+				let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect();
+
+				for chunk in chunks {
+					if is_chunk_valid(params, &chunk) {
+						gum::trace!(
+							target: LOG_TARGET,
+							candidate_hash = ?params.candidate_hash,
+							validator_index = ?chunk.index,
+							"Found valid chunk on disk"
+						);
+						self.insert_chunk(chunk.index, chunk);
+					} else {
+						gum::error!(
+							target: LOG_TARGET,
+							"Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!"
+						);
+					};
+				}
+
+				chunk_indices
+			},
+			Err(oneshot::Canceled) => {
+				gum::warn!(
+					target: LOG_TARGET,
+					candidate_hash = ?params.candidate_hash,
+					"Failed to reach the availability store"
+				);
+
+				vec![]
+			},
+		}
+	}
+
+	/// Launch chunk requests in parallel, according to the parameters.
+	async fn launch_parallel_chunk_requests<Sender>(
+		&mut self,
+		params: &RecoveryParams,
+		sender: &mut Sender,
+		desired_requests_count: usize,
+		validators: &mut VecDeque<ValidatorIndex>,
+		requesting_chunks: &mut FuturesUndead<
+			Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>,
+		>,
+	) where
+		Sender: overseer::AvailabilityRecoverySenderTrait,
+	{
+		let candidate_hash = &params.candidate_hash;
+		let already_requesting_count = requesting_chunks.len();
+
+		let mut requests = Vec::with_capacity(desired_requests_count - already_requesting_count);
+
+		while requesting_chunks.len() < desired_requests_count {
+			if let Some(validator_index) = validators.pop_back() {
+				let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
+				gum::trace!(
+					target: LOG_TARGET,
+					?validator,
+					?validator_index,
+					?candidate_hash,
+					"Requesting chunk",
+				);
+
+				// Request data.
+				let raw_request = req_res::v1::ChunkFetchingRequest {
+					candidate_hash: params.candidate_hash,
+					index: validator_index,
+				};
+
+				let (req, res) = OutgoingRequest::new(Recipient::Authority(validator), raw_request);
+				requests.push(Requests::ChunkFetchingV1(req));
+
+				params.metrics.on_chunk_request_issued();
+				let timer = params.metrics.time_chunk_request();
+
+				requesting_chunks.push(Box::pin(async move {
+					let _timer = timer;
+					match res.await {
+						Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) =>
+							Ok(Some(chunk.recombine_into_chunk(&raw_request))),
+						Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
+						Err(e) => Err((validator_index, e)),
+					}
+				}));
+			} else {
+				break
+			}
+		}
+
+		sender
+			.send_message(NetworkBridgeTxMessage::SendRequests(
+				requests,
+				IfDisconnected::TryConnect,
+			))
+			.await;
+	}
+
+	/// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`.
+	async fn wait_for_chunks(
+		&mut self,
+		params: &RecoveryParams,
+		validators: &mut VecDeque<ValidatorIndex>,
+		requesting_chunks: &mut FuturesUndead<
+			Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>,
+		>,
+		can_conclude: impl Fn(usize, usize, usize, &RecoveryParams, usize) -> bool,
+	) -> (usize, usize) {
+		let metrics = &params.metrics;
+
+		let mut total_received_responses = 0;
+		let mut error_count = 0;
+
+		// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
+		// We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will
+		// return in that case for `launch_parallel_requests` to fill up slots again.
+		while let Some(request_result) =
+			requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
+		{
+			total_received_responses += 1;
+
+			match request_result {
+				Ok(Some(chunk)) =>
+					if is_chunk_valid(params, &chunk) {
+						metrics.on_chunk_request_succeeded();
+						gum::trace!(
+							target: LOG_TARGET,
+							candidate_hash = ?params.candidate_hash,
+							validator_index = ?chunk.index,
+							"Received valid chunk",
+						);
+						self.insert_chunk(chunk.index, chunk);
+					} else {
+						metrics.on_chunk_request_invalid();
+						error_count += 1;
+					},
+				Ok(None) => {
+					metrics.on_chunk_request_no_such_chunk();
+					error_count += 1;
+				},
+				Err((validator_index, e)) => {
+					error_count += 1;
+
+					gum::trace!(
+						target: LOG_TARGET,
+						candidate_hash= ?params.candidate_hash,
+						err = ?e,
+						?validator_index,
+						"Failure requesting chunk",
+					);
+
+					match e {
+						RequestError::InvalidResponse(_) => {
+							metrics.on_chunk_request_invalid();
+
+							gum::debug!(
+								target: LOG_TARGET,
+								candidate_hash = ?params.candidate_hash,
+								err = ?e,
+								?validator_index,
+								"Chunk fetching response was invalid",
+							);
+						},
+						RequestError::NetworkError(err) => {
+							// No debug logs on general network errors - that became very spammy
+							// occasionally.
+							if let RequestFailure::Network(OutboundFailure::Timeout) = err {
+								metrics.on_chunk_request_timeout();
+							} else {
+								metrics.on_chunk_request_error();
+							}
+
+							validators.push_front(validator_index);
+						},
+						RequestError::Canceled(_) => {
+							metrics.on_chunk_request_error();
+
+							validators.push_front(validator_index);
+						},
+					}
+				},
+			}
+
+			// Stop waiting for requests when we either can already recover the data
+			// or have gotten firm 'No' responses from enough validators.
+			if can_conclude(
+				validators.len(),
+				requesting_chunks.total_len(),
+				self.chunk_count(),
+				params,
+				error_count,
+			) {
+				gum::debug!(
+					target: LOG_TARGET,
+					candidate_hash = ?params.candidate_hash,
+					received_chunks_count = ?self.chunk_count(),
+					requested_chunks_count = ?requesting_chunks.len(),
+					threshold = ?params.threshold,
+					"Can conclude availability for a candidate",
+				);
+				break
+			}
+		}
+
+		(total_received_responses, error_count)
+	}
+}
+
+/// A stateful reconstruction of availability data in reference to
+/// a candidate hash.
+pub struct RecoveryTask<Sender: overseer::AvailabilityRecoverySenderTrait> {
+	sender: Sender,
+	params: RecoveryParams,
+	strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
+	state: State,
+}
+
+impl<Sender> RecoveryTask<Sender>
+where
+	Sender: overseer::AvailabilityRecoverySenderTrait,
+{
+	/// Instantiate a new recovery task.
+	pub fn new(
+		sender: Sender,
+		params: RecoveryParams,
+		strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
+	) -> Self {
+		Self { sender, params, strategies, state: State::new() }
+	}
+
+	async fn in_availability_store(&mut self) -> Option<AvailableData> {
+		if !self.params.bypass_availability_store {
+			let (tx, rx) = oneshot::channel();
+			self.sender
+				.send_message(AvailabilityStoreMessage::QueryAvailableData(
+					self.params.candidate_hash,
+					tx,
+				))
+				.await;
+
+			match rx.await {
+				Ok(Some(data)) => return Some(data),
+				Ok(None) => {},
+				Err(oneshot::Canceled) => {
+					gum::warn!(
+						target: LOG_TARGET,
+						candidate_hash = ?self.params.candidate_hash,
+						"Failed to reach the availability store",
+					)
+				},
+			}
+		}
+
+		None
+	}
+
+	/// Run this recovery task to completion. It will loop through the configured strategies
+	/// in-order and return whenever the first one recovers the full `AvailableData`.
+	pub async fn run(mut self) -> Result<AvailableData, RecoveryError> {
+		if let Some(data) = self.in_availability_store().await {
+			return Ok(data)
+		}
+
+		self.params.metrics.on_recovery_started();
+
+		let _timer = self.params.metrics.time_full_recovery();
+
+		while let Some(mut current_strategy) = self.strategies.pop_front() {
+			gum::debug!(
+				target: LOG_TARGET,
+				candidate_hash = ?self.params.candidate_hash,
+				"Starting `{}` strategy",
+				current_strategy.display_name(),
+			);
+
+			let res = current_strategy.run(&mut self.state, &mut self.sender, &self.params).await;
+
+			match res {
+				Err(RecoveryError::Unavailable) =>
+					if self.strategies.front().is_some() {
+						gum::debug!(
+							target: LOG_TARGET,
+							candidate_hash = ?self.params.candidate_hash,
+							"Recovery strategy `{}` did not conclude. Trying the next one.",
+							current_strategy.display_name(),
+						);
+						continue
+					},
+				Err(err) => {
+					match &err {
+						RecoveryError::Invalid => self.params.metrics.on_recovery_invalid(),
+						_ => self.params.metrics.on_recovery_failed(),
+					}
+					return Err(err)
+				},
+				Ok(data) => {
+					self.params.metrics.on_recovery_succeeded();
+					return Ok(data)
+				},
+			}
+		}
+
+		// We have no other strategies to try.
+		gum::warn!(
+			target: LOG_TARGET,
+			candidate_hash = ?self.params.candidate_hash,
+			"Recovery of available data failed.",
+		);
+		self.params.metrics.on_recovery_failed();
+
+		Err(RecoveryError::Unavailable)
+	}
+}
+
+/// `RecoveryStrategy` that sequentially tries to fetch the full `AvailableData` from
+/// already-connected validators in the configured validator set.
+pub struct FetchFull {
+	params: FetchFullParams,
+}
+
+pub struct FetchFullParams {
+	/// Validators that will be used for fetching the data.
+	pub validators: Vec<ValidatorIndex>,
+	/// Channel to the erasure task handler.
+	pub erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+}
+
+impl FetchFull {
+	/// Create a new `FetchFull` recovery strategy.
+	pub fn new(mut params: FetchFullParams) -> Self {
+		params.validators.shuffle(&mut rand::thread_rng());
+		Self { params }
+	}
+}
+
+#[async_trait::async_trait]
+impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for FetchFull {
+	fn display_name(&self) -> &'static str {
+		"Full recovery from backers"
+	}
+
+	async fn run(
+		&mut self,
+		_: &mut State,
+		sender: &mut Sender,
+		common_params: &RecoveryParams,
+	) -> Result<AvailableData, RecoveryError> {
+		loop {
+			// Pop the next validator, and proceed to next fetch_chunks_task if we're out.
+			let validator_index =
+				self.params.validators.pop().ok_or_else(|| RecoveryError::Unavailable)?;
+
+			// Request data.
+			let (req, response) = OutgoingRequest::new(
+				Recipient::Authority(
+					common_params.validator_authority_keys[validator_index.0 as usize].clone(),
+				),
+				req_res::v1::AvailableDataFetchingRequest {
+					candidate_hash: common_params.candidate_hash,
+				},
+			);
+
+			sender
+				.send_message(NetworkBridgeTxMessage::SendRequests(
+					vec![Requests::AvailableDataFetchingV1(req)],
+					IfDisconnected::ImmediateError,
+				))
+				.await;
+
+			match response.await {
+				Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
+					let (reencode_tx, reencode_rx) = oneshot::channel();
+					self.params
+						.erasure_task_tx
+						.send(ErasureTask::Reencode(
+							common_params.n_validators,
+							common_params.erasure_root,
+							data,
+							reencode_tx,
+						))
+						.await
+						.map_err(|_| RecoveryError::ChannelClosed)?;
+
+					let reencode_response =
+						reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
+
+					if let Some(data) = reencode_response {
+						gum::trace!(
+							target: LOG_TARGET,
+							candidate_hash = ?common_params.candidate_hash,
+							"Received full data",
+						);
+
+						return Ok(data)
+					} else {
+						gum::debug!(
+							target: LOG_TARGET,
+							candidate_hash = ?common_params.candidate_hash,
+							?validator_index,
+							"Invalid data response",
+						);
+
+						// it doesn't help to report the peer with req/res.
+					}
+				},
+				Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {},
+				Err(e) => gum::debug!(
+					target: LOG_TARGET,
+					candidate_hash = ?common_params.candidate_hash,
+					?validator_index,
+					err = ?e,
+					"Error fetching full available data."
+				),
+			}
+		}
+	}
+}
+
+/// `RecoveryStrategy` that requests chunks from validators, in parallel.
+pub struct FetchChunks {
+	/// How many requests have been unsuccessful so far.
+	error_count: usize,
+	/// Total number of responses that have been received, including failed ones.
+	total_received_responses: usize,
+	/// Collection of in-flight requests.
+	requesting_chunks: FuturesUndead<Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
+	/// A random shuffling of the validators which indicates the order in which we connect to the
+	/// validators and request the chunk from them.
+	validators: VecDeque<ValidatorIndex>,
+	/// Channel to the erasure task handler.
+	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+}
+
+/// Parameters specific to the `FetchChunks` strategy.
+pub struct FetchChunksParams {
+	/// Total number of validators.
+	pub n_validators: usize,
+	/// Channel to the erasure task handler.
+	pub erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
+}
+
+impl FetchChunks {
+	/// Instantiate a new strategy.
+	pub fn new(params: FetchChunksParams) -> Self {
+		let mut shuffling: Vec<_> = (0..params.n_validators)
+			.map(|i| ValidatorIndex(i.try_into().expect("number of validators must fit in a u32")))
+			.collect();
+		shuffling.shuffle(&mut rand::thread_rng());
+
+		Self {
+			error_count: 0,
+			total_received_responses: 0,
+			requesting_chunks: FuturesUndead::new(),
+			validators: shuffling.into(),
+			erasure_task_tx: params.erasure_task_tx,
+		}
+	}
+
+	fn is_unavailable(
+		unrequested_validators: usize,
+		in_flight_requests: usize,
+		chunk_count: usize,
+		threshold: usize,
+	) -> bool {
+		is_unavailable(chunk_count, in_flight_requests, unrequested_validators, threshold)
+	}
+
+	/// Desired number of parallel requests.
+	///
+	/// For the given threshold (total required number of chunks) get the desired number of
+	/// requests we want to have running in parallel at this time.
+	fn get_desired_request_count(&self, chunk_count: usize, threshold: usize) -> usize {
+		// Upper bound for parallel requests.
+		// We want to limit this, so requests can be processed within the timeout and we limit the
+		// following feedback loop:
+		// 1. Requests fail due to timeout
+		// 2. We request more chunks to make up for it
+		// 3. Bandwidth is spread out even more, so we get even more timeouts
+		// 4. We request more chunks to make up for it ...
+		let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
+		// How many chunks are still needed?
+		let remaining_chunks = threshold.saturating_sub(chunk_count);
+		// What is the current error rate, so we can make up for it?
+		let inv_error_rate =
+			self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
+		// Actual number of requests we want to have in flight in parallel:
+		std::cmp::min(
+			max_requests_boundary,
+			remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
+		)
+	}
+
+	async fn attempt_recovery(
+		&mut self,
+		state: &mut State,
+		common_params: &RecoveryParams,
+	) -> Result<AvailableData, RecoveryError> {
+		let recovery_duration = common_params.metrics.time_erasure_recovery();
+
+		// Send request to reconstruct available data from chunks.
+		let (avilable_data_tx, available_data_rx) = oneshot::channel();
+		self.erasure_task_tx
+			.send(ErasureTask::Reconstruct(
+				common_params.n_validators,
+				// Safe to leave an empty vec in place, as we're stopping the recovery process if
+				// this reconstruct fails.
+				std::mem::take(&mut state.received_chunks),
+				avilable_data_tx,
+			))
+			.await
+			.map_err(|_| RecoveryError::ChannelClosed)?;
+
+		let available_data_response =
+			available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
+
+		match available_data_response {
+			Ok(data) => {
+				// Send request to re-encode the chunks and check merkle root.
+				let (reencode_tx, reencode_rx) = oneshot::channel();
+				self.erasure_task_tx
+					.send(ErasureTask::Reencode(
+						common_params.n_validators,
+						common_params.erasure_root,
+						data,
+						reencode_tx,
+					))
+					.await
+					.map_err(|_| RecoveryError::ChannelClosed)?;
+
+				let reencode_response =
+					reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
+
+				if let Some(data) = reencode_response {
+					gum::trace!(
+						target: LOG_TARGET,
+						candidate_hash = ?common_params.candidate_hash,
+						erasure_root = ?common_params.erasure_root,
+						"Data recovery from chunks complete",
+					);
+
+					Ok(data)
+				} else {
+					recovery_duration.map(|rd| rd.stop_and_discard());
+					gum::trace!(
+						target: LOG_TARGET,
+						candidate_hash = ?common_params.candidate_hash,
+						erasure_root = ?common_params.erasure_root,
+						"Data recovery error - root mismatch",
+					);
+
+					Err(RecoveryError::Invalid)
+				}
+			},
+			Err(err) => {
+				recovery_duration.map(|rd| rd.stop_and_discard());
+				gum::trace!(
+					target: LOG_TARGET,
+					candidate_hash = ?common_params.candidate_hash,
+					erasure_root = ?common_params.erasure_root,
+					?err,
+					"Data recovery error ",
+				);
+
+				Err(RecoveryError::Invalid)
+			},
+		}
+	}
+}
+
+#[async_trait::async_trait]
+impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for FetchChunks {
+	fn display_name(&self) -> &'static str {
+		"Fetch chunks"
+	}
+
+	async fn run(
+		&mut self,
+		state: &mut State,
+		sender: &mut Sender,
+		common_params: &RecoveryParams,
+	) -> Result<AvailableData, RecoveryError> {
+		// First query the store for any chunks we've got.
+		if !common_params.bypass_availability_store {
+			let local_chunk_indices = state.populate_from_av_store(common_params, sender).await;
+			self.validators.retain(|i| !local_chunk_indices.contains(i));
+		}
+
+		// No need to query the validators that have the chunks we already received.
+		self.validators.retain(|i| !state.received_chunks.contains_key(i));
+
+		loop {
+			// If received_chunks has more than threshold entries, attempt to recover the data.
+			// If that fails, or a re-encoding of it doesn't match the expected erasure root,
+			// return Err(RecoveryError::Invalid).
+			// Do this before requesting any chunks because we may have enough of them coming from
+			// past RecoveryStrategies.
+			if state.chunk_count() >= common_params.threshold {
+				return self.attempt_recovery(state, common_params).await
+			}
+
+			if Self::is_unavailable(
+				self.validators.len(),
+				self.requesting_chunks.total_len(),
+				state.chunk_count(),
+				common_params.threshold,
+			) {
+				gum::debug!(
+					target: LOG_TARGET,
+					candidate_hash = ?common_params.candidate_hash,
+					erasure_root = ?common_params.erasure_root,
+					received = %state.chunk_count(),
+					requesting = %self.requesting_chunks.len(),
+					total_requesting = %self.requesting_chunks.total_len(),
+					n_validators = %common_params.n_validators,
+					"Data recovery from chunks is not possible",
+				);
+
+				return Err(RecoveryError::Unavailable)
+			}
+
+			let desired_requests_count =
+				self.get_desired_request_count(state.chunk_count(), common_params.threshold);
+			let already_requesting_count = self.requesting_chunks.len();
+			gum::debug!(
+				target: LOG_TARGET,
+				?common_params.candidate_hash,
+				?desired_requests_count,
+				error_count= ?self.error_count,
+				total_received = ?self.total_received_responses,
+				threshold = ?common_params.threshold,
+				?already_requesting_count,
+				"Requesting availability chunks for a candidate",
+			);
+			state
+				.launch_parallel_chunk_requests(
+					common_params,
+					sender,
+					desired_requests_count,
+					&mut self.validators,
+					&mut self.requesting_chunks,
+				)
+				.await;
+
+			let (total_responses, error_count) = state
+				.wait_for_chunks(
+					common_params,
+					&mut self.validators,
+					&mut self.requesting_chunks,
+					|unrequested_validators, reqs, chunk_count, params, _error_count| {
+						chunk_count >= params.threshold ||
+							Self::is_unavailable(
+								unrequested_validators,
+								reqs,
+								chunk_count,
+								params.threshold,
+							)
+					},
+				)
+				.await;
+
+			self.total_received_responses += total_responses;
+			self.error_count += error_count;
+		}
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use polkadot_erasure_coding::recovery_threshold;
+
+	#[test]
+	fn parallel_request_calculation_works_as_expected() {
+		let num_validators = 100;
+		let threshold = recovery_threshold(num_validators).unwrap();
+		let (erasure_task_tx, _erasure_task_rx) = futures::channel::mpsc::channel(16);
+
+		let mut fetch_chunks_task =
+			FetchChunks::new(FetchChunksParams { n_validators: 100, erasure_task_tx });
+		assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold);
+		fetch_chunks_task.error_count = 1;
+		fetch_chunks_task.total_received_responses = 1;
+		// We saturate at threshold (34):
+		assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold);
+
+		fetch_chunks_task.total_received_responses = 2;
+		// With given error rate - still saturating:
+		assert_eq!(fetch_chunks_task.get_desired_request_count(1, threshold), threshold);
+		fetch_chunks_task.total_received_responses += 8;
+		// error rate: 1/10
+		// remaining chunks needed: threshold (34) - 9
+		// expected: 24 * (1+ 1/10) = (next greater integer) = 27
+		assert_eq!(fetch_chunks_task.get_desired_request_count(9, threshold), 27);
+		fetch_chunks_task.error_count = 0;
+		// With error count zero - we should fetch exactly as needed:
+		assert_eq!(fetch_chunks_task.get_desired_request_count(10, threshold), threshold - 10);
+	}
+}
diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs
index 60c2d38ab31..63ccf0e94f9 100644
--- a/polkadot/node/network/availability-recovery/src/tests.rs
+++ b/polkadot/node/network/availability-recovery/src/tests.rs
@@ -21,15 +21,19 @@ use futures::{executor, future};
 use futures_timer::Delay;
 
 use parity_scale_codec::Encode;
-use polkadot_node_network_protocol::request_response::{IncomingRequest, ReqProtocolNames};
+use polkadot_node_network_protocol::request_response::{
+	self as req_res, IncomingRequest, Recipient, ReqProtocolNames, Requests,
+};
 
 use super::*;
 
-use sc_network::config::RequestResponseConfig;
+use sc_network::{config::RequestResponseConfig, IfDisconnected, OutboundFailure, RequestFailure};
 
 use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
 use polkadot_node_primitives::{BlockData, PoV, Proof};
-use polkadot_node_subsystem::messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest};
+use polkadot_node_subsystem::messages::{
+	AllMessages, NetworkBridgeTxMessage, RuntimeApiMessage, RuntimeApiRequest,
+};
 use polkadot_node_subsystem_test_helpers::{
 	make_subsystem_context, mock::new_leaf, TestSubsystemContextHandle,
 };
@@ -204,7 +208,7 @@ use sp_keyring::Sr25519Keyring;
 enum Has {
 	No,
 	Yes,
-	NetworkError(sc_network::RequestFailure),
+	NetworkError(RequestFailure),
 	/// Make request not return at all, instead the sender is returned from the function.
 	///
 	/// Note, if you use `DoesNotReturn` you have to keep the returned senders alive, otherwise the
@@ -214,7 +218,7 @@ enum Has {
 
 impl Has {
 	fn timeout() -> Self {
-		Has::NetworkError(sc_network::RequestFailure::Network(sc_network::OutboundFailure::Timeout))
+		Has::NetworkError(RequestFailure::Network(OutboundFailure::Timeout))
 	}
 }
 
@@ -393,7 +397,7 @@ impl TestState {
 		candidate_hash: CandidateHash,
 		virtual_overseer: &mut VirtualOverseer,
 		who_has: impl Fn(usize) -> Has,
-	) -> Vec<oneshot::Sender<std::result::Result<Vec<u8>, sc_network::RequestFailure>>> {
+	) -> Vec<oneshot::Sender<std::result::Result<Vec<u8>, RequestFailure>>> {
 		let mut senders = Vec::new();
 		for _ in 0..self.validators.len() {
 			// Receive a request for a chunk.
@@ -1010,7 +1014,7 @@ fn recovers_from_only_chunks_if_pov_large() {
 			AvailabilityRecoveryMessage::RecoverAvailableData(
 				new_candidate.clone(),
 				test_state.session_index,
-				None,
+				Some(GroupIndex(0)),
 				tx,
 			),
 		)
@@ -1546,36 +1550,3 @@ fn invalid_local_chunk_is_ignored() {
 		(virtual_overseer, req_cfg)
 	});
 }
-
-#[test]
-fn parallel_request_calculation_works_as_expected() {
-	let num_validators = 100;
-	let threshold = recovery_threshold(num_validators).unwrap();
-	let (erasure_task_tx, _erasure_task_rx) = futures::channel::mpsc::channel(16);
-
-	let mut phase = RequestChunksFromValidators::new(100, erasure_task_tx);
-	assert_eq!(phase.get_desired_request_count(threshold), threshold);
-	phase.error_count = 1;
-	phase.total_received_responses = 1;
-	// We saturate at threshold (34):
-	assert_eq!(phase.get_desired_request_count(threshold), threshold);
-
-	let dummy_chunk =
-		ErasureChunk { chunk: Vec::new(), index: ValidatorIndex(0), proof: Proof::dummy_proof() };
-	phase.insert_chunk(ValidatorIndex(0), dummy_chunk.clone());
-	phase.total_received_responses = 2;
-	// With given error rate - still saturating:
-	assert_eq!(phase.get_desired_request_count(threshold), threshold);
-	for i in 1..9 {
-		phase.insert_chunk(ValidatorIndex(i), dummy_chunk.clone());
-	}
-	phase.total_received_responses += 8;
-	// error rate: 1/10
-	// remaining chunks needed: threshold (34) - 9
-	// expected: 24 * (1+ 1/10) = (next greater integer) = 27
-	assert_eq!(phase.get_desired_request_count(threshold), 27);
-	phase.insert_chunk(ValidatorIndex(9), dummy_chunk.clone());
-	phase.error_count = 0;
-	// With error count zero - we should fetch exactly as needed:
-	assert_eq!(phase.get_desired_request_count(threshold), threshold - phase.chunk_count());
-}
-- 
GitLab