diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs
index 3f7db15727eafb48667061258de50bc6408bb03d..1f4b4f84b74f2f028b252f2da78fd664b4bd01a8 100644
--- a/polkadot/node/core/approval-voting/src/lib.rs
+++ b/polkadot/node/core/approval-voting/src/lib.rs
@@ -35,7 +35,7 @@ use polkadot_primitives::v1::{
 	ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash,
 	CandidateReceipt, BlockNumber, PersistedValidationData,
 	ValidationCode, CandidateDescriptor, PoV, ValidatorPair, ValidatorSignature, ValidatorId,
-	CandidateIndex,
+	CandidateIndex, GroupIndex,
 };
 use polkadot_node_primitives::ValidationResult;
 use polkadot_node_primitives::approval::{
@@ -268,6 +268,7 @@ enum Action {
 		candidate_index: CandidateIndex,
 		session: SessionIndex,
 		candidate: CandidateReceipt,
+		backing_group: GroupIndex,
 	},
 	Conclude,
 }
@@ -391,6 +392,7 @@ async fn handle_actions(
 				candidate_index,
 				session,
 				candidate,
+				backing_group,
 			} => {
 				let block_hash = indirect_cert.block_hash;
 				let validator_index = indirect_cert.validator;
@@ -408,6 +410,7 @@ async fn handle_actions(
 					validator_index,
 					block_hash,
 					candidate_index as _,
+					backing_group,
 				).await?
 			}
 			Action::Conclude => { conclude = true; }
@@ -1050,7 +1053,7 @@ fn process_wakeup(
 
 	let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
 
-	let should_trigger = {
+	let (should_trigger, backing_group) = {
 		let approval_entry = match candidate_entry.approval_entry(&relay_block) {
 			Some(e) => e,
 			None => return Ok(Vec::new()),
@@ -1065,12 +1068,14 @@ fn process_wakeup(
 			session_info.needed_approvals as _,
 		);
 
-		should_trigger_assignment(
+		let should_trigger = should_trigger_assignment(
 			&approval_entry,
 			&candidate_entry,
 			tranches_to_approve,
 			tranche_now,
-		)
+		);
+
+		(should_trigger, approval_entry.backing_group())
 	};
 
 	let (mut actions, maybe_cert) = if should_trigger {
@@ -1105,6 +1110,7 @@ fn process_wakeup(
 				candidate_index: i as _,
 				session: block_entry.session(),
 				candidate: candidate_entry.candidate_receipt().clone(),
+				backing_group,
 			});
 		}
 	}
@@ -1142,6 +1148,7 @@ async fn launch_approval(
 	validator_index: ValidatorIndex,
 	block_hash: Hash,
 	candidate_index: usize,
+	backing_group: GroupIndex,
 ) -> SubsystemResult<()> {
 	let (a_tx, a_rx) = oneshot::channel();
 	let (code_tx, code_rx) = oneshot::channel();
@@ -1150,6 +1157,7 @@ async fn launch_approval(
 	ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
 		candidate.clone(),
 		session_index,
+		Some(backing_group),
 		a_tx,
 	).into()).await;
 
diff --git a/polkadot/node/network/availability-recovery/src/error.rs b/polkadot/node/network/availability-recovery/src/error.rs
index 70545020dcee72a3e96d6ac5c32d75ecf237700e..d33f496917c9ca2764a4d19cc421ec7126ffdb0c 100644
--- a/polkadot/node/network/availability-recovery/src/error.rs
+++ b/polkadot/node/network/availability-recovery/src/error.rs
@@ -28,6 +28,9 @@ pub enum Error {
 	#[error("failed to query a chunk from store")]
 	CanceledQueryChunk(#[source] oneshot::Canceled),
 
+	#[error("failed to query full data from store")]
+	CanceledQueryFullData(#[source] oneshot::Canceled),
+
 	#[error("failed to query session info")]
 	CanceledSessionInfo(#[source] oneshot::Canceled),
 
diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs
index 6af4e45a0e9e291f1b72039788d6d1fd6489ebc2..114bbd48a4cddc42f6c8f227245dfc5a23a0c63e 100644
--- a/polkadot/node/network/availability-recovery/src/lib.rs
+++ b/polkadot/node/network/availability-recovery/src/lib.rs
@@ -25,13 +25,13 @@ use std::pin::Pin;
 use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered};
 use futures_timer::Delay;
 use lru::LruCache;
-use rand::{seq::SliceRandom, thread_rng};
+use rand::seq::SliceRandom;
 use streamunordered::{StreamUnordered, StreamYield};
 
 use polkadot_primitives::v1::{
 	AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash,
 	Hash, ErasureChunk, ValidatorId, ValidatorIndex,
-	SessionInfo, SessionIndex, BlakeTwo256, HashT,
+	SessionInfo, SessionIndex, BlakeTwo256, HashT, GroupIndex,
 };
 use polkadot_subsystem::{
 	SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
@@ -59,6 +59,7 @@ const LOG_TARGET: &str = "availability_recovery";
 
 const COST_MERKLE_PROOF_INVALID: Rep = Rep::CostMinor("Merkle proof was invalid");
 const COST_UNEXPECTED_CHUNK: Rep = Rep::CostMinor("Peer has sent an unexpected chunk");
+const COST_INVALID_AVAILABLE_DATA: Rep = Rep::CostMinor("Peer provided invalid available data");
 
 // How many parallel requests interaction should have going at once.
 const N_PARALLEL: usize = 50;
@@ -67,18 +68,54 @@ const N_PARALLEL: usize = 50;
 const LRU_SIZE: usize = 16;
 
 // A timeout for a chunk request.
+#[cfg(not(test))]
 const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
 
-// A period to poll and clean AwaitedChunks.
-const AWAITED_CHUNKS_CLEANUP_INTERVAL: Duration = Duration::from_secs(1);
+#[cfg(test)]
+const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
+
+// A timeout for a full data request.
+#[cfg(not(test))]
+const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
+
+#[cfg(test)]
+const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
+
+// A period to poll and clean awaited data.
+const AWAITED_CLEANUP_INTERVAL: Duration = Duration::from_secs(1);
 
 /// The Availability Recovery Subsystem.
-pub struct AvailabilityRecoverySubsystem;
+pub struct AvailabilityRecoverySubsystem {
+	fast_path: bool,
+}
 
-type ChunkResponse = Result<(PeerId, ErasureChunk), RecoveryError>;
+type DataResponse<T> = (PeerId, ValidatorIndex, T);
 
-/// Data we keep around for every chunk that we are awaiting.
-struct AwaitedChunk {
+/// Awaited data from the network.
+enum Awaited {
+	Chunk(AwaitedData<ErasureChunk>),
+	FullData(AwaitedData<AvailableData>),
+}
+
+impl Awaited {
+	fn is_canceled(&self) -> bool {
+		match *self {
+			Awaited::Chunk(ref c) => c.response.is_canceled(),
+			Awaited::FullData(ref fd) => fd.response.is_canceled(),
+		}
+	}
+
+	/// Token to cancel the connection request to the validator.
+	fn token(&self) -> usize {
+		match *self {
+			Awaited::Chunk(ref c) => c.token,
+			Awaited::FullData(ref fd) => fd.token,
+		}
+	}
+}
+
+/// Data we keep around for network data that we are awaiting.
+struct AwaitedData<T> {
 	/// Index of the validator we have requested this chunk from.
 	validator_index: ValidatorIndex,
 
@@ -89,7 +126,7 @@ struct AwaitedChunk {
 	token: usize,
 
 	/// Result sender.
-	response: oneshot::Sender<ChunkResponse>,
+	response: oneshot::Sender<DataResponse<T>>,
 }
 
 /// Accumulate all awaiting sides for some particular `AvailableData`.
@@ -104,11 +141,19 @@ enum FromInteraction {
 	Concluded(CandidateHash, Result<AvailableData, RecoveryError>),
 
 	/// Make a request of a particular chunk from a particular validator.
-	MakeRequest(
+	MakeChunkRequest(
+		AuthorityDiscoveryId,
+		CandidateHash,
+		ValidatorIndex,
+		oneshot::Sender<DataResponse<ErasureChunk>>,
+	),
+
+	/// Make a request of the full data from a particular validator.
+	MakeFullDataRequest(
 		AuthorityDiscoveryId,
 		CandidateHash,
 		ValidatorIndex,
-		oneshot::Sender<ChunkResponse>,
+		oneshot::Sender<DataResponse<AvailableData>>,
 	),
 
 	/// Report a peer.
@@ -118,21 +163,27 @@ enum FromInteraction {
 	),
 }
 
-/// A state of a single interaction reconstructing an available data.
-struct Interaction {
-	/// A communication channel with the `State`.
-	to_state: mpsc::Sender<FromInteraction>,
+struct RequestFromBackersPhase {
+	// a random shuffling of the validators from the backing group which indicates the order
+	// in which we connect to them and request the chunk.
+	shuffled_backers: Vec<ValidatorIndex>,
+}
+
+struct RequestChunksPhase {
+	// a random shuffling of the validators which indicates the order in which we connect to the validators and
+	// request the chunk from them.
+	shuffling: Vec<ValidatorIndex>,
+	received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
+	requesting_chunks: FuturesUnordered<Timeout<oneshot::Receiver<DataResponse<ErasureChunk>>>>,
+}
 
+struct InteractionParams {
 	/// Discovery ids of `validators`.
 	validator_authority_keys: Vec<AuthorityDiscoveryId>,
 
 	/// Validators relevant to this `Interaction`.
 	validators: Vec<ValidatorId>,
 
-	/// A random shuffling of the validators which indicates the order in which we connect
-	/// to the validators and request the chunk from them.
-	shuffling: Vec<ValidatorIndex>,
-
 	/// The number of pieces needed.
 	threshold: usize,
 
@@ -141,35 +192,118 @@ struct Interaction {
 
 	/// The root of the erasure encoding of the para block.
 	erasure_root: Hash,
+}
+
+enum InteractionPhase {
+	RequestFromBackers(RequestFromBackersPhase),
+	RequestChunks(RequestChunksPhase),
+}
+
+/// A state of a single interaction reconstructing an available data.
+struct Interaction {
+	/// A communication channel with the `State`.
+	to_state: mpsc::Sender<FromInteraction>,
 
-	/// The chunks that we have received from peers.
-	received_chunks: HashMap<PeerId, ErasureChunk>,
+	/// The parameters of the interaction.
+	params: InteractionParams,
 
-	/// The chunk requests that are waiting to complete.
-	requesting_chunks: FuturesUnordered<Timeout<oneshot::Receiver<ChunkResponse>>>,
+	/// The phase of the interaction.
+	phase: InteractionPhase,
 }
 
-const fn is_unavailable(
-	received_chunks: usize,
-	requesting_chunks: usize,
-	n_validators: usize,
-	threshold: usize,
-) -> bool {
-	received_chunks + requesting_chunks + n_validators < threshold
+impl RequestFromBackersPhase {
+	fn new(mut backers: Vec<ValidatorIndex>) -> Self {
+		backers.shuffle(&mut rand::thread_rng());
+
+		RequestFromBackersPhase {
+			shuffled_backers: backers,
+		}
+	}
+
+	// Run this phase to completion, returning `true` if data was successfully recovered and
+	// false otherwise.
+	async fn run(
+		&mut self,
+		params: &InteractionParams,
+		to_state: &mut mpsc::Sender<FromInteraction>
+	) -> Result<bool, mpsc::SendError> {
+		loop {
+			// Pop the next backer, and proceed to next phase if we're out.
+			let validator_index = match self.shuffled_backers.pop() {
+				None => return Ok(false),
+				Some(i) => i,
+			};
+
+			let (tx, rx) = oneshot::channel();
+
+			// Request data.
+			to_state.send(FromInteraction::MakeFullDataRequest(
+				params.validator_authority_keys[validator_index as usize].clone(),
+				params.candidate_hash.clone(),
+				validator_index,
+				tx,
+			)).await?;
+
+			match rx.timeout(FULL_DATA_REQUEST_TIMEOUT).await {
+				Some(Ok((peer_id, _validator_index, data))) => {
+					if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
+						to_state.send(
+							FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
+						).await?;
+
+						return Ok(true);
+					} else {
+						to_state.send(FromInteraction::ReportPeer(
+							peer_id.clone(),
+							COST_INVALID_AVAILABLE_DATA,
+						)).await?;
+					}
+				}
+				Some(Err(e)) => {
+					tracing::debug!(
+						target: LOG_TARGET,
+						err = ?e,
+						"A response channel was cancelled while waiting for full data",
+					);
+				}
+				None => {
+					tracing::debug!(
+						target: LOG_TARGET,
+						"A full data request has timed out",
+					);
+				}
+			}
+		}
+	}
 }
 
-impl Interaction {
-	async fn launch_parallel_requests(&mut self) -> error::Result<()> {
+impl RequestChunksPhase {
+	fn new(n_validators: ValidatorIndex) -> Self {
+		let mut shuffling: Vec<_> = (0..n_validators).collect();
+		shuffling.shuffle(&mut rand::thread_rng());
+
+		RequestChunksPhase {
+			shuffling,
+			received_chunks: HashMap::new(),
+			requesting_chunks: FuturesUnordered::new(),
+		}
+	}
+
+	async fn launch_parallel_requests(
+		&mut self,
+		params: &InteractionParams,
+		to_state: &mut mpsc::Sender<FromInteraction>,
+	) -> Result<(), mpsc::SendError> {
 		while self.requesting_chunks.len() < N_PARALLEL {
 			if let Some(validator_index) = self.shuffling.pop() {
 				let (tx, rx) = oneshot::channel();
 
-				self.to_state.send(FromInteraction::MakeRequest(
-					self.validator_authority_keys[validator_index as usize].clone(),
-					self.candidate_hash.clone(),
+				to_state.send(FromInteraction::MakeChunkRequest(
+					params.validator_authority_keys[validator_index as usize].clone(),
+					params.candidate_hash.clone(),
 					validator_index,
 					tx,
-				)).await.map_err(error::Error::ClosedToState)?;
+				)).await?;
 
 				self.requesting_chunks.push(rx.timeout(CHUNK_REQUEST_TIMEOUT));
 			} else {
@@ -180,7 +314,11 @@ impl Interaction {
 		Ok(())
 	}
 
-	async fn wait_for_chunks(&mut self) -> error::Result<()> {
+	async fn wait_for_chunks(
+		&mut self,
+		params: &InteractionParams,
+		to_state: &mut mpsc::Sender<FromInteraction>,
+	) -> Result<(), mpsc::SendError> {
 		// Check if the requesting chunks is not empty not to poll to completion.
 		if self.requesting_chunks.is_empty() {
 			return Ok(());
@@ -189,43 +327,49 @@ impl Interaction {
 		// Poll for new updates from requesting_chunks.
 		while let Some(request_result) = self.requesting_chunks.next().await {
 			match request_result {
-				Some(Ok(Ok((peer_id, chunk)))) => {
+				Some(Ok((peer_id, validator_index, chunk))) => {
 					// Check merkle proofs of any received chunks, and any failures should
 					// lead to issuance of a FromInteraction::ReportPeer message.
+
+					// We need to check that the validator index matches the chunk index and
+					// not blindly trust the data from an untrusted peer.
+					if validator_index != chunk.index {
+						to_state.send(FromInteraction::ReportPeer(
+							peer_id.clone(),
+							COST_MERKLE_PROOF_INVALID,
+						)).await?;
+
+						continue;
+					}
+
+
 					if let Ok(anticipated_hash) = branch_hash(
-						&self.erasure_root,
+						&params.erasure_root,
 						&chunk.proof,
 						chunk.index as usize,
 					) {
 						let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
 
 						if erasure_chunk_hash != anticipated_hash {
-							self.to_state.send(FromInteraction::ReportPeer(
-									peer_id.clone(),
-									COST_MERKLE_PROOF_INVALID,
-							)).await.map_err(error::Error::ClosedToState)?;
-						}
-					} else {
-						self.to_state.send(FromInteraction::ReportPeer(
+							to_state.send(FromInteraction::ReportPeer(
 								peer_id.clone(),
 								COST_MERKLE_PROOF_INVALID,
-						)).await.map_err(error::Error::ClosedToState)?;
+							)).await?;
+						} else {
+							self.received_chunks.insert(validator_index, chunk);
+						}
+					} else {
+						to_state.send(FromInteraction::ReportPeer(
+							peer_id.clone(),
+							COST_MERKLE_PROOF_INVALID,
+						)).await?;
 					}
-
-					self.received_chunks.insert(peer_id, chunk);
 				}
 				Some(Err(e)) => {
 					tracing::debug!(
 						target: LOG_TARGET,
 						err = ?e,
-						"A response channel was cacelled while waiting for a chunk",
-					);
-				}
-				Some(Ok(Err(e))) => {
-					tracing::debug!(
-						target: LOG_TARGET,
-						err = ?e,
-						"A chunk request ended with an error",
+						"A response channel was cancelled while waiting for a chunk",
 					);
 				}
 				None => {
@@ -233,8 +377,6 @@ impl Interaction {
 						target: LOG_TARGET,
 						"A chunk request has timed out",
 					);
-					// we break here to launch another request.
-					break;
 				}
 			}
 		}
@@ -242,58 +384,70 @@ impl Interaction {
 		Ok(())
 	}
 
-	async fn run(mut self) -> error::Result<()> {
+	async fn run(
+		&mut self,
+		params: &InteractionParams,
+		to_state: &mut mpsc::Sender<FromInteraction>,
+	) -> Result<(), mpsc::SendError> {
 		loop {
 			if is_unavailable(
 				self.received_chunks.len(),
 				self.requesting_chunks.len(),
 				self.shuffling.len(),
-				self.threshold,
+				params.threshold,
 			) {
-				self.to_state.send(FromInteraction::Concluded(
-					self.candidate_hash,
+				to_state.send(FromInteraction::Concluded(
+					params.candidate_hash,
 					Err(RecoveryError::Unavailable),
-				)).await.map_err(error::Error::ClosedToState)?;
+				)).await?;
 
 				return Ok(());
 			}
 
-			self.launch_parallel_requests().await?;
-
-			self.wait_for_chunks().await?;
+			self.launch_parallel_requests(params, to_state).await?;
+			self.wait_for_chunks(params, to_state).await?;
 
 			// If received_chunks has more than threshold entries, attempt to recover the data.
 			// If that fails, or a re-encoding of it doesn't match the expected erasure root,
 			// break and issue a FromInteraction::Concluded(RecoveryError::Invalid).
 			// Otherwise, issue a FromInteraction::Concluded(Ok(())).
-			if self.received_chunks.len() >= self.threshold {
+			if self.received_chunks.len() >= params.threshold {
 				let concluded = match polkadot_erasure_coding::reconstruct_v1(
-					self.validators.len(),
+					params.validators.len(),
 					self.received_chunks.values().map(|c| (&c.chunk[..], c.index as usize)),
 				) {
 					Ok(data) => {
-						if reconstructed_data_matches_root(self.validators.len(), &self.erasure_root, &data) {
-							FromInteraction::Concluded(self.candidate_hash.clone(), Ok(data))
+						if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
+							FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
 						} else {
 							FromInteraction::Concluded(
-								self.candidate_hash.clone(),
+								params.candidate_hash.clone(),
 								Err(RecoveryError::Invalid),
 							)
 						}
 					}
 					Err(_) => FromInteraction::Concluded(
-						self.candidate_hash.clone(),
+						params.candidate_hash.clone(),
 						Err(RecoveryError::Invalid),
 					),
 				};
 
-				self.to_state.send(concluded).await.map_err(error::Error::ClosedToState)?;
+				to_state.send(concluded).await?;
 				return Ok(());
 			}
 		}
 	}
 }
 
+const fn is_unavailable(
+	received_chunks: usize,
+	requesting_chunks: usize,
+	n_validators: usize,
+	threshold: usize,
+) -> bool {
+	received_chunks + requesting_chunks + n_validators < threshold
+}
+
 fn reconstructed_data_matches_root(
 	n_validators: usize,
 	expected_root: &Hash,
@@ -316,6 +470,32 @@ fn reconstructed_data_matches_root(
 	branches.root() == *expected_root
 }
 
+impl Interaction {
+	async fn run(mut self) -> error::Result<()> {
+		loop {
+			// These only fail if we cannot reach the underlying subsystem, which case there is nothing
+			// meaningful we can do.
+			match self.phase {
+				InteractionPhase::RequestFromBackers(ref mut from_backers) => {
+					if from_backers.run(&self.params, &mut self.to_state).await
+						.map_err(error::Error::ClosedToState)?
+					{
+						break Ok(())
+					} else {
+						self.phase = InteractionPhase::RequestChunks(
+							RequestChunksPhase::new(self.params.validators.len() as _)
+						);
+					}
+				}
+				InteractionPhase::RequestChunks(ref mut from_all) => {
+					break from_all.run(&self.params, &mut self.to_state).await
+						.map_err(error::Error::ClosedToState)
+				}
+			}
+		}
+	}
+}
+
 struct State {
 	/// Each interaction is implemented as its own async task,
 	/// and these handles are for communicating with them.
@@ -325,12 +505,12 @@ struct State {
 	live_block_hash: Hash,
 
 	/// We are waiting for these validators to connect and as soon as they
-	/// do to request the needed chunks we are awaitinf for.
-	discovering_validators: HashMap<AuthorityDiscoveryId, Vec<AwaitedChunk>>,
+	/// do, request the needed data we are waiting for.
+	discovering_validators: HashMap<AuthorityDiscoveryId, Vec<Awaited>>,
 
 	/// Requests that we have issued to the already connected validators
-	/// about the chunks we are interested in.
-	live_chunk_requests: HashMap<RequestId, (PeerId, AwaitedChunk)>,
+	/// about the data we are interested in.
+	live_requests: HashMap<RequestId, (PeerId, Awaited)>,
 
 	/// Derive request ids from this.
 	next_request_id: RequestId,
@@ -357,7 +537,7 @@ impl Default for State {
 			interactions: HashMap::new(),
 			live_block_hash: Hash::default(),
 			discovering_validators: HashMap::new(),
-			live_chunk_requests: HashMap::new(),
+			live_requests: HashMap::new(),
 			next_request_id: 0,
 			connecting_validators: StreamUnordered::new(),
 			availability_lru: LruCache::new(LRU_SIZE),
@@ -394,7 +574,7 @@ async fn handle_signal(
 
 			Ok(false)
 		}
-	    OverseerSignal::BlockFinalized(_, _) => Ok(false)
+		OverseerSignal::BlockFinalized(_, _) => Ok(false)
 	}
 }
 
@@ -415,16 +595,12 @@ async fn launch_interaction(
 	session_index: SessionIndex,
 	session_info: SessionInfo,
 	receipt: CandidateReceipt,
+	backing_group: Option<GroupIndex>,
 	response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
 ) -> error::Result<()> {
-	let threshold = recovery_threshold(session_info.validators.len())?;
 	let to_state = state.from_interaction_tx.clone();
-	let candidate_hash = receipt.hash();
-	let erasure_root = receipt.descriptor.erasure_root;
-	let validators = session_info.validators.clone();
-	let validator_authority_keys = session_info.discovery_keys.clone();
-	let mut shuffling: Vec<_> = (0..validators.len() as ValidatorIndex).collect();
 
+	let candidate_hash = receipt.hash();
 	state.interactions.insert(
 		candidate_hash.clone(),
 		InteractionHandle {
@@ -432,22 +608,27 @@ async fn launch_interaction(
 		}
 	);
 
-	{
-		// make borrow checker happy.
-		let mut rng = thread_rng();
-		shuffling.shuffle(&mut rng);
-	}
+	let params = InteractionParams {
+		validator_authority_keys: session_info.discovery_keys.clone(),
+		validators: session_info.validators.clone(),
+		threshold: recovery_threshold(session_info.validators.len())?,
+		candidate_hash,
+		erasure_root: receipt.descriptor.erasure_root,
+	};
+
+	let phase = backing_group
+		.and_then(|g| session_info.validator_groups.get(g.0 as usize))
+		.map(|group| InteractionPhase::RequestFromBackers(
+			RequestFromBackersPhase::new(group.clone())
+		))
+		.unwrap_or_else(|| InteractionPhase::RequestChunks(
+			RequestChunksPhase::new(params.validators.len() as _)
+		));
 
 	let interaction = Interaction {
 		to_state,
-		validator_authority_keys,
-		validators,
-		shuffling,
-		threshold,
-		candidate_hash,
-		erasure_root,
-		received_chunks: HashMap::new(),
-		requesting_chunks: FuturesUnordered::new(),
+		params,
+		phase,
 	};
 
 	let future = async move {
@@ -478,6 +659,7 @@ async fn handle_recover(
 	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
 	receipt: CandidateReceipt,
 	session_index: SessionIndex,
+	backing_group: Option<GroupIndex>,
 	response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
 ) -> error::Result<()> {
 	let candidate_hash = receipt.hash();
@@ -512,6 +694,7 @@ async fn handle_recover(
 				session_index,
 				session_info,
 				receipt,
+				backing_group,
 				response_sender,
 			).await
 		}
@@ -543,6 +726,20 @@ async fn query_chunk(
 	Ok(rx.await.map_err(error::Error::CanceledQueryChunk)?)
 }
 
+/// Queries a chunk from av-store.
+#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
+async fn query_full_data(
+	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
+	candidate_hash: CandidateHash,
+) -> error::Result<Option<AvailableData>> {
+	let (tx, rx) = oneshot::channel();
+	ctx.send_message(AllMessages::AvailabilityStore(
+		AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx),
+	)).await;
+
+	Ok(rx.await.map_err(error::Error::CanceledQueryFullData)?)
+}
+
 /// Handles message from interaction.
 #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
 async fn handle_from_interaction(
@@ -574,7 +771,27 @@ async fn handle_from_interaction(
 
 			state.availability_lru.put(candidate_hash, result);
 		}
-		FromInteraction::MakeRequest(id, candidate_hash, validator_index, response) => {
+		FromInteraction::MakeChunkRequest(id, candidate_hash, validator_index, response) => {
+			let (tx, rx) = mpsc::channel(2);
+
+			let message = NetworkBridgeMessage::ConnectToValidators {
+				validator_ids: vec![id.clone()],
+				peer_set: PeerSet::Validation,
+				connected: tx,
+			};
+
+			ctx.send_message(AllMessages::NetworkBridge(message)).await;
+
+			let token = state.connecting_validators.push(rx);
+
+			state.discovering_validators.entry(id).or_default().push(Awaited::Chunk(AwaitedData {
+				validator_index,
+				candidate_hash,
+				token,
+				response,
+			}));
+		}
+		FromInteraction::MakeFullDataRequest(id, candidate_hash, validator_index, response) => {
 			let (tx, rx) = mpsc::channel(2);
 
 			let message = NetworkBridgeMessage::ConnectToValidators {
@@ -587,12 +804,13 @@ async fn handle_from_interaction(
 
 			let token = state.connecting_validators.push(rx);
 
-			state.discovering_validators.entry(id).or_default().push(AwaitedChunk {
+			println!("pushing full data request");
+			state.discovering_validators.entry(id).or_default().push(Awaited::FullData(AwaitedData {
 				validator_index,
 				candidate_hash,
 				token,
 				response,
-			});
+			}));
 		}
 		FromInteraction::ReportPeer(peer_id, rep) => {
 			report_peer(ctx, peer_id, rep).await;
@@ -637,16 +855,18 @@ async fn handle_network_update(
 					)).await;
 				}
 				protocol_v1::AvailabilityRecoveryMessage::Chunk(request_id, chunk) => {
-					match state.live_chunk_requests.remove(&request_id) {
+					match state.live_requests.remove(&request_id) {
 						None => {
 							// If there doesn't exist one, report the peer and return.
 							report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
 						}
-						Some((peer_id, awaited_chunk)) if peer_id == peer => {
+						Some((peer_id, Awaited::Chunk(awaited_chunk))) if peer_id == peer => {
 							// If there exists an entry under r_id, remove it.
 							// Send the chunk response on the awaited_chunk for the interaction to handle.
 							if let Some(chunk) = chunk {
-								if awaited_chunk.response.send(Ok((peer_id, chunk))).is_err() {
+								if awaited_chunk.response.send(
+									(peer_id, awaited_chunk.validator_index, chunk)
+								).is_err() {
 									tracing::debug!(
 										target: LOG_TARGET,
 										"A sending side of the recovery request is closed",
@@ -657,14 +877,59 @@ async fn handle_network_update(
 						Some(a) => {
 							// If the peer in the entry doesn't match the sending peer,
 							// reinstate the entry, report the peer, and return
-							state.live_chunk_requests.insert(request_id, a);
+							state.live_requests.insert(request_id, a);
 							report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
 						}
 					}
 				}
-				protocol_v1::AvailabilityRecoveryMessage::RequestFullData(_, _) |
-				protocol_v1::AvailabilityRecoveryMessage::FullData(_, _) => {
-					// handled in https://github.com/paritytech/polkadot/pull/2453
+				protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
+					request_id,
+					candidate_hash,
+				) => {
+					// Issue a
+					// AvailabilityStore::QueryAvailableData(candidate-hash, response)
+					// message.
+					let full_data = query_full_data(ctx, candidate_hash).await?;
+
+					// Whatever the result, issue an
+					// AvailabilityRecoveryV1Message::FullData(r_id, response) message.
+					let wire_message = protocol_v1::AvailabilityRecoveryMessage::FullData(
+						request_id,
+						full_data,
+					);
+
+					ctx.send_message(AllMessages::NetworkBridge(
+						NetworkBridgeMessage::SendValidationMessage(
+							vec![peer],
+							protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
+						),
+					)).await;
+				}
+				protocol_v1::AvailabilityRecoveryMessage::FullData(request_id, data) => {
+					match state.live_requests.remove(&request_id) {
+						None => {
+							// If there doesn't exist one, report the peer and return.
+							report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
+						}
+						Some((peer_id, Awaited::FullData(awaited))) if peer_id == peer => {
+							// If there exists an entry under r_id, remove it.
+							// Send the response on the awaited for the interaction to handle.
+							if let Some(data) = data {
+								if awaited.response.send((peer_id, awaited.validator_index, data)).is_err() {
+									tracing::debug!(
+										target: LOG_TARGET,
+										"A sending side of the recovery request is closed",
+									);
+								}
+							}
+						}
+						Some(a) => {
+							// If the peer in the entry doesn't match the sending peer,
+							// reinstate the entry, report the peer, and return
+							state.live_requests.insert(request_id, a);
+							report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
+						}
+					}
 				}
 			}
 		}
@@ -680,21 +945,27 @@ async fn handle_network_update(
 	Ok(())
 }
 
-/// Issues a chunk request to the validator we've been waiting for to connect to us.
-async fn issue_chunk_request(
+/// Issues a request to the validator we've been waiting for to connect to us.
+async fn issue_request(
 	state: &mut State,
 	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
 	peer_id: PeerId,
-	awaited_chunk: AwaitedChunk,
+	awaited: Awaited,
 ) -> error::Result<()> {
 	let request_id = state.next_request_id;
 	state.next_request_id += 1;
 
-	let wire_message = protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
-		request_id,
-		awaited_chunk.candidate_hash,
-		awaited_chunk.validator_index,
-	);
+	let wire_message = match awaited {
+		Awaited::Chunk(ref awaited_chunk) => protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
+			request_id,
+			awaited_chunk.candidate_hash,
+			awaited_chunk.validator_index,
+		),
+		Awaited::FullData(ref awaited_data) => protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
+			request_id,
+			awaited_data.candidate_hash,
+		),
+	};
 
 	ctx.send_message(AllMessages::NetworkBridge(
 		NetworkBridgeMessage::SendValidationMessage(
@@ -703,7 +974,7 @@ async fn issue_chunk_request(
 		),
 	)).await;
 
-	state.live_chunk_requests.insert(request_id, (peer_id, awaited_chunk));
+	state.live_requests.insert(request_id, (peer_id, awaited));
 
 	Ok(())
 }
@@ -716,23 +987,23 @@ async fn handle_validator_connected(
 	peer_id: PeerId,
 ) -> error::Result<()> {
 	if let Some(discovering) = state.discovering_validators.remove(&authority_id) {
-		for chunk in discovering {
-			issue_chunk_request(state, ctx, peer_id.clone(), chunk).await?;
+		for awaited in discovering {
+			issue_request(state, ctx, peer_id.clone(), awaited).await?;
 		}
 	}
 
 	Ok(())
 }
 
-/// Awaited chunks info that `State` holds has to be cleaned up
+/// Awaited info that `State` holds has to be cleaned up
 /// periodically since there is no way `Interaction` can communicate
 /// a timedout request.
-fn cleanup_awaited_chunks(state: &mut State) {
+fn cleanup_awaited(state: &mut State) {
 	let mut removed_tokens = Vec::new();
 
 	for (_, v) in state.discovering_validators.iter_mut() {
-		v.retain(|e| if e.response.is_canceled() {
-			removed_tokens.push(e.token);
+		v.retain(|e| if e.is_canceled() {
+			removed_tokens.push(e.token());
 			false
 		} else {
 			true
@@ -744,13 +1015,18 @@ fn cleanup_awaited_chunks(state: &mut State) {
 	}
 
 	state.discovering_validators.retain(|_, v| !v.is_empty());
-	state.live_chunk_requests.retain(|_, v| !v.1.response.is_canceled());
+	state.live_requests.retain(|_, v| !v.1.is_canceled());
 }
 
 impl AvailabilityRecoverySubsystem {
-	/// Create a new instance of `AvailabilityRecoverySubsystem`.
-	pub fn new() -> Self {
-		Self
+	/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to request data from backers.
+	pub fn with_fast_path() -> Self {
+		Self { fast_path: true }
+	}
+
+	/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
+	pub fn with_chunks_only() -> Self {
+		Self { fast_path: false }
 	}
 
 	async fn run(
@@ -759,16 +1035,16 @@ impl AvailabilityRecoverySubsystem {
 	) -> SubsystemResult<()> {
 		let mut state = State::default();
 
-		let awaited_chunk_cleanup_interval = futures::stream::repeat(()).then(|_| async move {
-			Delay::new(AWAITED_CHUNKS_CLEANUP_INTERVAL).await;
+		let awaited_cleanup_interval = futures::stream::repeat(()).then(|_| async move {
+			Delay::new(AWAITED_CLEANUP_INTERVAL).await;
 		});
 
-		futures::pin_mut!(awaited_chunk_cleanup_interval);
+		futures::pin_mut!(awaited_cleanup_interval);
 
 		loop {
 			futures::select_biased! {
-				_v = awaited_chunk_cleanup_interval.next() => {
-					cleanup_awaited_chunks(&mut state);
+				_v = awaited_cleanup_interval.next() => {
+					cleanup_awaited(&mut state);
 				}
 				v = state.connecting_validators.next() => {
 					if let Some((v, token)) = v {
@@ -806,6 +1082,7 @@ impl AvailabilityRecoverySubsystem {
 								AvailabilityRecoveryMessage::RecoverAvailableData(
 									receipt,
 									session_index,
+									maybe_backing_group,
 									response_sender,
 								) => {
 									if let Err(e) = handle_recover(
@@ -813,6 +1090,7 @@ impl AvailabilityRecoverySubsystem {
 										&mut ctx,
 										receipt,
 										session_index,
+										maybe_backing_group.filter(|_| self.fast_path),
 										response_sender,
 									).await {
 										tracing::warn!(
diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs
index d2ec29ccc857581e65fbb5cc56a9c674acb6341f..8a7c255ce9bc31e605067e82c1917cbc308ae72a 100644
--- a/polkadot/node/network/availability-recovery/src/tests.rs
+++ b/polkadot/node/network/availability-recovery/src/tests.rs
@@ -38,7 +38,7 @@ struct TestHarness {
 	virtual_overseer: VirtualOverseer,
 }
 
-fn test_harness<T: Future<Output = ()>>(
+fn test_harness_fast_path<T: Future<Output = ()>>(
 	test: impl FnOnce(TestHarness) -> T,
 ) {
 	let _ = env_logger::builder()
@@ -53,7 +53,33 @@ fn test_harness<T: Future<Output = ()>>(
 
 	let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
 
-	let subsystem = AvailabilityRecoverySubsystem::new();
+	let subsystem = AvailabilityRecoverySubsystem::with_fast_path();
+	let subsystem = subsystem.run(context);
+
+	let test_fut = test(TestHarness { virtual_overseer });
+
+	futures::pin_mut!(test_fut);
+	futures::pin_mut!(subsystem);
+
+	executor::block_on(future::select(test_fut, subsystem));
+}
+
+fn test_harness_chunks_only<T: Future<Output = ()>>(
+	test: impl FnOnce(TestHarness) -> T,
+) {
+	let _ = env_logger::builder()
+		.is_test(true)
+		.filter(
+			Some("polkadot_availability_recovery"),
+			log::LevelFilter::Trace,
+		)
+		.try_init();
+
+	let pool = sp_core::testing::TaskExecutor::new();
+
+	let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
+
+	let subsystem = AvailabilityRecoverySubsystem::with_chunks_only();
 	let subsystem = subsystem.run(context);
 
 	let test_fut = test(TestHarness { virtual_overseer });
@@ -112,6 +138,14 @@ async fn overseer_recv(
 
 use sp_keyring::Sr25519Keyring;
 
+#[derive(Debug, Clone)]
+enum HasAvailableData {
+	No,
+	Yes,
+	Timeout,
+	Other(AvailableData),
+}
+
 #[derive(Clone)]
 struct TestState {
 	validators: Vec<Sr25519Keyring>,
@@ -149,21 +183,31 @@ impl TestState {
 				tx.send(Ok(Some(SessionInfo {
 					validators: self.validator_public.clone(),
 					discovery_keys: self.validator_authority_id.clone(),
+					// all validators in the same group.
+					validator_groups: vec![(0..self.validators.len()).map(|i| i as ValidatorIndex).collect()],
 					..Default::default()
 				}))).unwrap();
 			}
 		);
 	}
 
+	async fn test_connect_to_all_validators(
+		&self,
+		virtual_overseer: &mut VirtualOverseer,
+	) {
+		self.test_connect_to_validators(virtual_overseer, self.validator_public.len()).await;
+	}
+
 	async fn test_connect_to_validators(
 		&self,
 		virtual_overseer: &mut VirtualOverseer,
+		n: usize,
 	) {
 		// Channels by AuthorityDiscoveryId to send results to.
 		// Gather them here and send in batch after the loop not to race.
 		let mut results = HashMap::new();
 
-		for _ in 0..self.validator_public.len() {
+		for _ in 0..n {
 			// Connect to shuffled validators one by one.
 			assert_matches!(
 				overseer_recv(virtual_overseer).await,
@@ -305,6 +349,84 @@ impl TestState {
 			}
 		}
 	}
+
+	async fn test_full_data_requests(
+		&self,
+		candidate_hash: CandidateHash,
+		virtual_overseer: &mut VirtualOverseer,
+		who_has: &[HasAvailableData],
+	) {
+		for _ in 0..self.validator_public.len() {
+			self.test_connect_to_validators(virtual_overseer, 1).await;
+
+			// Receive a request for a chunk.
+			assert_matches!(
+				overseer_recv(virtual_overseer).await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::SendValidationMessage(
+						peers,
+						protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
+					)
+				) => {
+					let (request_id, validator_index) = assert_matches!(
+						wire_message,
+						protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
+							request_id,
+							candidate_hash_recvd,
+						) => {
+							assert_eq!(candidate_hash_recvd, candidate_hash);
+							assert_eq!(peers.len(), 1);
+
+							let validator_index = self.validator_peer_id.iter().position(|p| p == &peers[0]).unwrap();
+							(request_id, validator_index)
+						}
+					);
+
+					let available_data = match who_has[validator_index] {
+						HasAvailableData::No => Some(None),
+						HasAvailableData::Yes => Some(Some(self.available_data.clone())),
+						HasAvailableData::Timeout => None,
+						HasAvailableData::Other(ref other) => Some(Some(other.clone())),
+					};
+
+					if let Some(maybe_data) = available_data {
+						overseer_send(
+							virtual_overseer,
+							AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
+								NetworkBridgeEvent::PeerMessage(
+									self.validator_peer_id[validator_index].clone(),
+									protocol_v1::AvailabilityRecoveryMessage::FullData(
+										request_id,
+										maybe_data,
+									)
+								)
+							)
+						).await;
+					}
+
+					match who_has[validator_index] {
+						HasAvailableData::Yes => break, // done
+						HasAvailableData::No => {}
+						HasAvailableData::Timeout => { Delay::new(FULL_DATA_REQUEST_TIMEOUT).await }
+						HasAvailableData::Other(_) => {
+							assert_matches!(
+								overseer_recv(virtual_overseer).await,
+								AllMessages::NetworkBridge(
+									NetworkBridgeMessage::ReportPeer(
+										p,
+										rep,
+									)
+								) => {
+									assert_eq!(p, self.validator_peer_id[validator_index]);
+									assert_eq!(rep, COST_INVALID_AVAILABLE_DATA);
+								}
+							);
+						}
+					}
+				}
+			);
+		}
+	}
 }
 
 
@@ -319,8 +441,13 @@ fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryI
 fn derive_erasure_chunks_with_proofs_and_root(
 	n_validators: usize,
 	available_data: &AvailableData,
+	alter_chunk: impl Fn(usize, &mut Vec<u8>),
 ) -> (Vec<ErasureChunk>, Hash) {
-	let chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();
+	let mut chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();
+
+	for (i, chunk) in chunks.iter_mut().enumerate() {
+		alter_chunk(i, chunk)
+	}
 
 	// create proofs for each erasure chunk
 	let branches = branches(chunks.as_ref());
@@ -379,6 +506,7 @@ impl Default for TestState {
 		let (chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
 			validators.len(),
 			&available_data,
+			|_, _| {},
 		);
 
 		candidate.descriptor.erasure_root = erasure_root;
@@ -400,10 +528,10 @@ impl Default for TestState {
 }
 
 #[test]
-fn availability_is_recovered() {
+fn availability_is_recovered_from_chunks_if_no_group_provided() {
 	let test_state = TestState::default();
 
-	test_harness(|test_harness| async move {
+	test_harness_fast_path(|test_harness| async move {
 		let TestHarness { mut virtual_overseer } = test_harness;
 
 		overseer_signal(
@@ -421,13 +549,14 @@ fn availability_is_recovered() {
 			AvailabilityRecoveryMessage::RecoverAvailableData(
 				test_state.candidate.clone(),
 				test_state.session_index,
+				None,
 				tx,
 			)
 		).await;
 
 		test_state.test_runtime_api(&mut virtual_overseer).await;
 
-		test_state.test_connect_to_validators(&mut virtual_overseer).await;
+		test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
 
 		let candidate_hash = test_state.candidate.hash();
 
@@ -448,13 +577,14 @@ fn availability_is_recovered() {
 			AvailabilityRecoveryMessage::RecoverAvailableData(
 				new_candidate,
 				test_state.session_index,
+				None,
 				tx,
 			)
 		).await;
 
 		test_state.test_runtime_api(&mut virtual_overseer).await;
 
-		test_state.test_connect_to_validators(&mut virtual_overseer).await;
+		test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
 
 		// A request times out with `Unavailable` error.
 		assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
@@ -462,10 +592,10 @@ fn availability_is_recovered() {
 }
 
 #[test]
-fn a_faulty_chunk_leads_to_recovery_error() {
-	let mut test_state = TestState::default();
+fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunks_only() {
+	let test_state = TestState::default();
 
-	test_harness(|test_harness| async move {
+	test_harness_chunks_only(|test_harness| async move {
 		let TestHarness { mut virtual_overseer } = test_harness;
 
 		overseer_signal(
@@ -483,22 +613,90 @@ fn a_faulty_chunk_leads_to_recovery_error() {
 			AvailabilityRecoveryMessage::RecoverAvailableData(
 				test_state.candidate.clone(),
 				test_state.session_index,
+				Some(GroupIndex(0)),
 				tx,
 			)
 		).await;
 
 		test_state.test_runtime_api(&mut virtual_overseer).await;
+		test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
 
-		test_state.test_connect_to_validators(&mut virtual_overseer).await;
+		let candidate_hash = test_state.candidate.hash();
+
+		test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await;
+
+		// Recovered data should match the original one.
+		assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
+
+		let (tx, rx) = oneshot::channel();
+
+		// Test another candidate, send no chunks.
+		let mut new_candidate = CandidateReceipt::default();
+
+		new_candidate.descriptor.relay_parent = test_state.candidate.descriptor.relay_parent;
+
+		overseer_send(
+			&mut virtual_overseer,
+			AvailabilityRecoveryMessage::RecoverAvailableData(
+				new_candidate,
+				test_state.session_index,
+				None,
+				tx,
+			)
+		).await;
+
+		test_state.test_runtime_api(&mut virtual_overseer).await;
+
+		test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
+
+		// A request times out with `Unavailable` error.
+		assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
+	});
+}
+
+#[test]
+fn bad_merkle_path_leads_to_recovery_error() {
+	let mut test_state = TestState::default();
+
+	test_harness_fast_path(|test_harness| async move {
+		let TestHarness { mut virtual_overseer } = test_harness;
+
+		overseer_signal(
+			&mut virtual_overseer,
+			OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
+				activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
+				deactivated: smallvec![],
+			}),
+		).await;
+
+		let (tx, rx) = oneshot::channel();
+
+		overseer_send(
+			&mut virtual_overseer,
+			AvailabilityRecoveryMessage::RecoverAvailableData(
+				test_state.candidate.clone(),
+				test_state.session_index,
+				None,
+				tx,
+			)
+		).await;
+
+		test_state.test_runtime_api(&mut virtual_overseer).await;
+		test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
 
 		let candidate_hash = test_state.candidate.hash();
 
 		// Create some faulty chunks.
-		test_state.chunks[0].chunk = vec![1; 32];
-		test_state.chunks[1].chunk = vec![2; 32];
+		test_state.chunks[0].chunk = vec![0; 32];
+		test_state.chunks[1].chunk = vec![1; 32];
+		test_state.chunks[2].chunk = vec![2; 32];
+		test_state.chunks[3].chunk = vec![3; 32];
+
 		let mut faulty = vec![false; test_state.chunks.len()];
 		faulty[0] = true;
 		faulty[1] = true;
+		faulty[2] = true;
+		faulty[3] = true;
 
 		test_state.test_faulty_chunk_requests(
 			candidate_hash,
@@ -507,15 +705,15 @@ fn a_faulty_chunk_leads_to_recovery_error() {
 		).await;
 
 		// A request times out with `Unavailable` error.
-		assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid);
+		assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
 	});
 }
 
 #[test]
-fn a_wrong_chunk_leads_to_recovery_error() {
+fn wrong_chunk_index_leads_to_recovery_error() {
 	let mut test_state = TestState::default();
 
-	test_harness(|test_harness| async move {
+	test_harness_fast_path(|test_harness| async move {
 		let TestHarness { mut virtual_overseer } = test_harness;
 
 		overseer_signal(
@@ -533,23 +731,25 @@ fn a_wrong_chunk_leads_to_recovery_error() {
 			AvailabilityRecoveryMessage::RecoverAvailableData(
 				test_state.candidate.clone(),
 				test_state.session_index,
+				None,
 				tx,
 			)
 		).await;
 
 		test_state.test_runtime_api(&mut virtual_overseer).await;
 
-		test_state.test_connect_to_validators(&mut virtual_overseer).await;
+		test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
 
 		let candidate_hash = test_state.candidate.hash();
 
-		// Send a wrong chunk so it passes proof check but fails to reconstruct.
+		// These chunks should fail the index check as they don't have the correct index for validator.
 		test_state.chunks[1] = test_state.chunks[0].clone();
 		test_state.chunks[2] = test_state.chunks[0].clone();
 		test_state.chunks[3] = test_state.chunks[0].clone();
 		test_state.chunks[4] = test_state.chunks[0].clone();
 
-		let faulty = vec![false; test_state.chunks.len()];
+		let mut faulty = vec![true; test_state.chunks.len()];
+		faulty[0] = false;
 
 		test_state.test_faulty_chunk_requests(
 			candidate_hash,
@@ -557,7 +757,206 @@ fn a_wrong_chunk_leads_to_recovery_error() {
 			&faulty,
 		).await;
 
-		// A request times out with `Unavailable` error.
+		// A request times out with `Unavailable` error as there are no good peers.
+		assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
+	});
+}
+
+#[test]
+fn invalid_erasure_coding_leads_to_invalid_error() {
+	let mut test_state = TestState::default();
+
+	test_harness_fast_path(|test_harness| async move {
+		let TestHarness { mut virtual_overseer } = test_harness;
+
+		let pov = PoV {
+			block_data: BlockData(vec![69; 64]),
+		};
+
+		let (bad_chunks, bad_erasure_root) = derive_erasure_chunks_with_proofs_and_root(
+			test_state.chunks.len(),
+			&AvailableData {
+				validation_data: test_state.persisted_validation_data.clone(),
+				pov: Arc::new(pov),
+			},
+			|i, chunk| *chunk = vec![i as u8; 32],
+		);
+
+		test_state.chunks = bad_chunks;
+		test_state.candidate.descriptor.erasure_root = bad_erasure_root;
+
+		let candidate_hash = test_state.candidate.hash();
+
+		overseer_signal(
+			&mut virtual_overseer,
+			OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
+				activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
+				deactivated: smallvec![],
+			}),
+		).await;
+
+		let (tx, rx) = oneshot::channel();
+
+		overseer_send(
+			&mut virtual_overseer,
+			AvailabilityRecoveryMessage::RecoverAvailableData(
+				test_state.candidate.clone(),
+				test_state.session_index,
+				None,
+				tx,
+			)
+		).await;
+
+		test_state.test_runtime_api(&mut virtual_overseer).await;
+
+		test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
+
+		test_state.test_chunk_requests(
+			candidate_hash,
+			&mut virtual_overseer,
+		).await;
+
+		// A request times out with `Unavailable` error as there are no good peers.
 		assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid);
 	});
 }
+
+#[test]
+fn fast_path_backing_group_recovers() {
+	let test_state = TestState::default();
+
+	test_harness_fast_path(|test_harness| async move {
+		let TestHarness { mut virtual_overseer } = test_harness;
+
+		overseer_signal(
+			&mut virtual_overseer,
+			OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
+				activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
+				deactivated: smallvec![],
+			}),
+		).await;
+
+		let (tx, rx) = oneshot::channel();
+
+		overseer_send(
+			&mut virtual_overseer,
+			AvailabilityRecoveryMessage::RecoverAvailableData(
+				test_state.candidate.clone(),
+				test_state.session_index,
+				Some(GroupIndex(0)),
+				tx,
+			)
+		).await;
+
+		test_state.test_runtime_api(&mut virtual_overseer).await;
+
+		let candidate_hash = test_state.candidate.hash();
+
+		let mut who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::No).collect();
+		who_has[3] = HasAvailableData::Yes;
+
+		test_state.test_full_data_requests(
+			candidate_hash,
+			&mut virtual_overseer,
+			&who_has,
+		).await;
+
+		// Recovered data should match the original one.
+		assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
+	});
+}
+
+#[test]
+fn wrong_data_from_fast_path_peer_leads_to_punishment() {
+	let test_state = TestState::default();
+
+	test_harness_fast_path(|test_harness| async move {
+		let TestHarness { mut virtual_overseer } = test_harness;
+
+		overseer_signal(
+			&mut virtual_overseer,
+			OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
+				activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
+				deactivated: smallvec![],
+			}),
+		).await;
+
+		let (tx, _rx) = oneshot::channel();
+
+		overseer_send(
+			&mut virtual_overseer,
+			AvailabilityRecoveryMessage::RecoverAvailableData(
+				test_state.candidate.clone(),
+				test_state.session_index,
+				Some(GroupIndex(0)),
+				tx,
+			)
+		).await;
+
+		test_state.test_runtime_api(&mut virtual_overseer).await;
+
+		let candidate_hash = test_state.candidate.hash();
+
+		let mut a = test_state.available_data.clone();
+		a.pov = Arc::new(PoV { block_data: BlockData(vec![69; 420]) });
+
+		let who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::Other(a.clone())).collect();
+
+		// This function implicitly punishes.
+		test_state.test_full_data_requests(
+			candidate_hash,
+			&mut virtual_overseer,
+			&who_has,
+		).await;
+	});
+}
+
+#[test]
+fn no_answers_in_fast_path_causes_chunk_requests() {
+	let test_state = TestState::default();
+
+	test_harness_fast_path(|test_harness| async move {
+		let TestHarness { mut virtual_overseer } = test_harness;
+
+		overseer_signal(
+			&mut virtual_overseer,
+			OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
+				activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
+				deactivated: smallvec![],
+			}),
+		).await;
+
+		let (tx, rx) = oneshot::channel();
+
+		overseer_send(
+			&mut virtual_overseer,
+			AvailabilityRecoveryMessage::RecoverAvailableData(
+				test_state.candidate.clone(),
+				test_state.session_index,
+				Some(GroupIndex(0)),
+				tx,
+			)
+		).await;
+
+		test_state.test_runtime_api(&mut virtual_overseer).await;
+
+		let candidate_hash = test_state.candidate.hash();
+
+		// mix of timeout and no.
+		let mut who_has: Vec<_> = (0..test_state.validators.len()).map(|_| HasAvailableData::Timeout).collect();
+		who_has[0] = HasAvailableData::No;
+		who_has[3] = HasAvailableData::No;
+
+		test_state.test_full_data_requests(
+			candidate_hash,
+			&mut virtual_overseer,
+			&who_has,
+		).await;
+
+		test_state.test_connect_to_all_validators(&mut virtual_overseer).await;
+		test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await;
+
+		// Recovered data should match the original one.
+		assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
+	});
+}
diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 4f105f9e01190290821690d5579cb7c12fa263d4..3d18ac737a8ab3dfafcfb671ac4a26938e4b69b7 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -2726,6 +2726,7 @@ mod tests {
 		AvailabilityRecoveryMessage::RecoverAvailableData(
 			Default::default(),
 			Default::default(),
+			None,
 			sender,
 		)
 	}
diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs
index 02df6a9c2bfb54dd3bb2ca9cc7503440918c9225..1cb4b1ce08720fbd6b89113e779fae378f7b7844 100644
--- a/polkadot/node/service/src/lib.rs
+++ b/polkadot/node/service/src/lib.rs
@@ -419,7 +419,7 @@ where
 			keystore.clone(),
 			Metrics::register(registry)?,
 		),
-		availability_recovery: AvailabilityRecoverySubsystem::new(
+		availability_recovery: AvailabilityRecoverySubsystem::with_chunks_only(
 		),
 		availability_store: AvailabilityStoreSubsystem::new_on_disk(
 			availability_config,
diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs
index 7706b12739199efe12c419fa442c1fa280039fb8..857b1e90ae96dc733629416640db0b4881ce846b 100644
--- a/polkadot/node/subsystem/src/messages.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -41,7 +41,7 @@ use polkadot_primitives::v1::{
 	PersistedValidationData, PoV, SessionIndex, SignedAvailabilityBitfield,
 	ValidationCode, ValidatorId, CandidateHash,
 	ValidatorIndex, ValidatorSignature, InboundDownwardMessage, InboundHrmpMessage,
-	CandidateIndex,
+	CandidateIndex, GroupIndex,
 };
 use polkadot_statement_table::v1::Misbehavior;
 use std::{sync::Arc, collections::btree_map::BTreeMap};
@@ -282,6 +282,7 @@ pub enum AvailabilityRecoveryMessage {
 	RecoverAvailableData(
 		CandidateReceipt,
 		SessionIndex,
+		Option<GroupIndex>, // Optional backing group to request from first.
 		oneshot::Sender<Result<AvailableData, crate::errors::RecoveryError>>,
 	),
 	/// Event from the network bridge.
diff --git a/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md b/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md
index db71720e4c4daed7b3f73d1b8bad66291234af10..0ff0db72dc40809bed6a3883449195410b347965 100644
--- a/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md
+++ b/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md
@@ -267,9 +267,9 @@ On receiving an `ApprovedAncestor(Hash, BlockNumber, response_channel)`:
   * If `RequiredTranches::Exact { next_no_show, .. } - set a wakeup for the next no-show tick.
 
 #### Launch Approval Work
-  * Requires `(SessionIndex, SessionInfo, CandidateReceipt, ValidatorIndex, block_hash, candidate_index)`
+  * Requires `(SessionIndex, SessionInfo, CandidateReceipt, ValidatorIndex, backing_group, block_hash, candidate_index)`
   * Extract the public key of the `ValidatorIndex` from the `SessionInfo` for the session.
-  * Issue an `AvailabilityRecoveryMessage::RecoverAvailableData(candidate, session_index, response_sender)`
+  * Issue an `AvailabilityRecoveryMessage::RecoverAvailableData(candidate, session_index, Some(backing_group), response_sender)`
   * Load the historical validation code of the parachain by dispatching a `RuntimeApiRequest::HistoricalValidationCode(`descriptor.para_id`, `descriptor.relay_parent`)` against the state of `block_hash`.
   * Spawn a background task with a clone of `background_tx`
     * Wait for the available data
diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md
index a05b9e9c17499a3794943c81f1b3b50eed3f3f0f..31ec01897514038af707150a942dce57e9d683b2 100644
--- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md
+++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-recovery.md
@@ -1,7 +1,5 @@
 # Availability Recovery
 
-> TODO: <https://github.com/paritytech/polkadot/issues/1597>
-
 This subsystem is the inverse of the [Availability Distribution](availability-distribution.md) subsystem: validators will serve the availability chunks kept in the availability store to nodes who connect to them. And the subsystem will also implement the other side: the logic for nodes to connect to validators, request availability pieces, and reconstruct the `AvailableData`.
 
 This version of the availability recovery subsystem is based off of direct connections to validators. In order to recover any given `AvailableData`, we must recover at least `f + 1` pieces from validators of the session. Thus, we will connect to and query randomly chosen validators until we have received `f + 1` pieces.
@@ -13,7 +11,7 @@ This version of the availability recovery subsystem is based off of direct conne
 Input:
 
 - NetworkBridgeUpdateV1(update)
-- AvailabilityRecoveryMessage::RecoverAvailableData(candidate, session, response)
+- AvailabilityRecoveryMessage::RecoverAvailableData(candidate, session, backing_group, response)
 
 Output:
 
@@ -26,13 +24,18 @@ Output:
 We hold a state which tracks the current recovery interactions we have live, as well as which request IDs correspond to which interactions. An interaction is a structure encapsulating all interaction with the network necessary to recover the available data.
 
 ```rust
-type ChunkResponse = Result<(PeerId, ErasureChunk), Unavailable>;
+type DataResponse<T> = Result<(PeerId, ValidatorIndex, T), Unavailable>;
+
+enum Awaited {
+    Chunk(AwaitedData<ErasureChunk>),
+    FullData(AwaitedData<AvailableData>),
+}
 
-struct AwaitedChunk {
+struct AwaitedData<T> {
     issued_at: Instant,
     validator_index: ValidatorIndex,
     candidate_hash: CandidateHash,
-    response: ResponseChannel<ChunkResponse>,
+    response: ResponseChannel<DataResponse<T>>,
 }
 
 struct State {
@@ -40,8 +43,8 @@ struct State {
     interactions: Map<CandidateHash, InteractionHandle>,
     /// A recent block hash for which state should be available.
     live_block_hash: Hash,
-    discovering_validators: Map<AuthorityDiscoveryId, Vec<AwaitedChunk>>,
-    live_chunk_requests: Map<RequestId, (PeerId, AwaitedChunk)>,
+    discovering_validators: Map<AuthorityDiscoveryId, Vec<Awaited>>,
+    live_requests: Map<RequestId, (PeerId, Awaited)>,
     next_request_id: RequestId,
     connecting_validators: Stream<(AuthorityDiscoveryId, PeerId)>,
 
@@ -63,12 +66,19 @@ enum FromInteraction {
     // An interaction concluded.
     Concluded(CandidateHash, Result<AvailableData, RecoveryError>),
     // Make a request of a particular chunk from a particular validator.
-    MakeRequest(
+    MakeChunkRequest(
         AuthorityDiscoveryId, 
         CandidateHash, 
         ValidatorIndex, 
-        ResponseChannel<ChunkResponse>,
+        ResponseChannel<DataResponse<ErasureChunk>>,
     ),
+    // Make a request of the full data from a particular validator.
+    MakeDataRequest(
+        AuthorityDiscoveryId,
+        CandidateHash,
+        ValidatorIndex,
+        ResponseChannel<DataResponse<AvailableData>>,
+    )
     // Report a peer.
     ReportPeer(
         PeerId,
@@ -76,19 +86,35 @@ enum FromInteraction {
     ),
 }
 
-struct Interaction {
-    to_state: Sender<FromInteraction>,
+struct InteractionParams {
     validator_authority_keys: Vec<AuthorityId>,
     validators: Vec<ValidatorId>,
-    // a random shuffling of the validators which indicates the order in which we connect to the validators and
-    // request the chunk from them.
-    shuffling: Vec<ValidatorIndex>, 
     // The number of pieces needed.
     threshold: usize, 
     candidate_hash: Hash,
     erasure_root: Hash,
-    received_chunks: Map<ValidatorIndex, ErasureChunk>,
-    requesting_chunks: FuturesUnordered<Receiver<ChunkResponse>>,
+}
+
+enum InteractionPhase {
+    RequestFromBackers {
+        // a random shuffling of the validators from the backing group which indicates the order
+        // in which we connect to them and request the chunk.
+        shuffled_backers: Vec<ValidatorIndex>,
+        requesting_pov: Option<Receiver<DataResponse<AvailableData>>>
+    }
+    RequestChunks {
+        // a random shuffling of the validators which indicates the order in which we connect to the validators and
+        // request the chunk from them.
+        shuffling: Vec<ValidatorIndex>, 
+        received_chunks: Map<ValidatorIndex, ErasureChunk>,
+        requesting_chunks: FuturesUnordered<Receiver<DataResponse<ErasureChunk>>>,
+    }
+}
+
+struct Interaction {
+    to_state: Sender<FromInteraction>,
+    params: InteractionParams,
+    phase: InteractionPhase,
 }
 ```
 
@@ -100,7 +126,7 @@ Ignore `BlockFinalized` signals.
 
 On `Conclude`, shut down the subsystem.
 
-#### `AvailabilityRecoveryMessage::RecoverAvailableData(receipt, session, response)`
+#### `AvailabilityRecoveryMessage::RecoverAvailableData(receipt, session, Option<backing_group_index>, response)`
 
 1. Check the `availability_lru` for the candidate and return the data if so.
 1. Check if there is already an interaction handle for the request. If so, add the response handle to it.
@@ -114,9 +140,15 @@ On `Conclude`, shut down the subsystem.
 1. Load the entry from the `interactions` map. It should always exist, if not for logic errors. Send the result to each member of `awaiting`.
 1. Add the entry to the availability_lru.
 
-#### `FromInteraction::MakeRequest(discovery_pub, candidate_hash, validator_index, response)`
+#### `FromInteraction::MakeChunkRequest(discovery_pub, candidate_hash, validator_index, response)`
+
+1. Add an `Awaited::Chunk` to the `discovering_validators` map under `discovery_pub`.
+1. Issue a `NetworkBridgeMessage::ConnectToValidators`.
+1. Add the stream of connected validator events to `state.connecting_validators`.
+
+#### `FromInteraction::MakeDataRequest(discovery_pub, candidate_hash, validator_index, response)`
 
-1. Add an `AwaitedRequest` to the `discovering_validators` map under `discovery_pub`.
+1. Add an `Awaited::FullData` to the `discovering_validators` map under `discovery_pub`.
 1. Issue a `NetworkBridgeMessage::ConnectToValidators`.
 1. Add the stream of connected validator events to `state.connecting_validators`.
 
@@ -129,24 +161,39 @@ On `Conclude`, shut down the subsystem.
 #### On `connecting_validators` event:
 
 1. If the validator exists under `discovering_validators`, remove the entry.
-1. For each `AwaitedChunk` in the entry, issue a `AvailabilityRecoveryV1Message::RequestChunk(next_request_id, candidate_hash, validator_index)` and make an entry in the `live_chunk_requests` map.
+1. For each `Awaited` in the entry, 
+  1. If `Awaited::Chunk` issue a `AvailabilityRecoveryV1Message::RequestChunk(next_request_id, candidate_hash, validator_index)` and make an entry in the `live_requests` map.
+  1. If `Awaited::FullData` issue a `AvailabilityRecoveryV1Message::RequestFullData(next_request_id, candidate_hash, validator_index)` and make an entry in the `live_requests` map.
+  1. Increment `next_request_id`.
 
 #### On receiving `AvailabilityRecoveryV1::RequestChunk(r_id, candidate_hash, validator_index)`
 
-1. Issue a `AvailabilityStore::QueryChunk(candidate-hash, validator_index, response)` message.
+1. Issue a `AvailabilityStore::QueryChunk(candidate_hash, validator_index, response)` message.
 1. Whatever the result, issue a `AvailabilityRecoveryV1Message::Chunk(r_id, response)` message.
 
 #### On receiving `AvailabilityRecoveryV1::Chunk(r_id, chunk)`
 
-1. If there exists an entry under `r_id`, remove it. If there doesn't exist one, report the peer and return. If the peer in the entry doesn't match the sending peer, reinstate the entry, report the peer, and return.
+1. If there exists an entry under `r_id`, remove it. If there doesn't exist one, report the peer and return. If the entry is not `Awaited::Chunk` or the peer in the entry doesn't match the sending peer, reinstate the entry, report the peer, and return.
 1. Send the chunk response on the `awaited_chunk` for the interaction to handle.
 
+#### On receiving `AvailabilityRecoveryV1::RequestFullData(r_id, candidate_hash)`
+
+1. Issue a `AvailabilityStore::QueryAvailableData(candidate_hash, response)` message.
+1. Whatever the result, issue a `AvailabilityRecoveryV1Message::FullData(r_id, response)` message.
+
+#### On receiving `AvailabilityRecoveryV1::FullData(r_id, data)`
+
+1. If there exists an entry under `r_id`, remove it. If there doesn't exist one, report the peer and return. If the entry is not `Awaited::FullData` or the peer in the entry doesn't match the sending peer, reinstate the entry, report the peer, and return.
+1. Send the data response on the `response` channel for the interaction to handle.
+
 ### Interaction logic
 
-#### `launch_interaction(session_index, session_info, candidate_receipt, candidate_hash)`
+#### `launch_interaction(session_index, session_info, candidate_receipt, candidate_hash, Option<backing_group_index>)`
 
 1. Compute the threshold from the session info. It should be `f + 1`, where `n = 3f + k`, where `k in {1, 2, 3}`, and `n` is the number of validators.
-1. Set the various fields of `Interaction` based on the validator lists in `session_info`. Compute a random shuffling of the validator indices.
+1. Set the various fields of `InteractionParams` based on the validator lists in `session_info` and information about the candidate.
+1. If the `backing_group_index` is `Some`, start in the `RequestFromBackers` phase with a shuffling of the backing group validator indices and a `None` requesting value.
+1. Otherwise, start in the `RequestChunks` phase with `received_chunks` and `requesting_chunks` both empty.
 1. Set the `to_state` sender to be equal to a clone of `state.from_interaction_tx`.
 1. Initialize `received_chunks` to an empty set, as well as `requesting_chunks`.
 
@@ -160,10 +207,20 @@ const N_PARALLEL: usize = 50;
 ```
 
 Loop:
-  * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks, and any failures should lead to issuance of a `FromInteraction::ReportPeer` message.
-  * If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding of it doesn't match the expected erasure root, break and issue a `FromInteraction::Concluded(RecoveryError::Invalid)`. Otherwise, issue a `FromInteraction::Concluded(Ok(()))`.
-  * While there are fewer than `N_PARALLEL` entries in `requesting_chunks`,
-    * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, break and issue a `FromInteraction::Concluded(RecoveryError::Unavailable)`.
-    * Initialize `(tx, rx)`.
-    * Issue a `FromInteraction::MakeRequest(validator, candidate_hash, validator_index, tx)`.
-    * Add `rx` to `requesting_chunks`.
+  * If the phase is `InteractionPhase::RequestFromBackers`
+    * If the `requesting_pov` is `Some`, poll for updates on it. If it concludes, set `requesting_pov` to `None`. 
+        * If it concludes with a `None` result, return to beginning. 
+        * If it concludes with available data, attempt a re-encoding. 
+            * If it has the correct erasure-root, break and issue a `Concluded(Ok(available_data))`. 
+            * If it has an incorrect erasure-root, issue a `FromInteraction::ReportPeer` message and return to beginning.
+    * If the `requesting_pov` is `None`, take the next backer off the `shuffled_backers`.
+        * If the backer is `Some`, initialize `(tx, rx)`, issue a `FromInteraction::MakeFullDataRequest(validator, candidate_hash, validator_index, tx)`, set `requesting_pov` to `Some` and return.
+        * If the backer is `None`, set the phase to `InteractionPhase::RequestChunks` with a random shuffling of validators and empty `received_chunks` and `requesting_chunks`.
+  * If the phase is `InteractionPhase::RequestChunks`:
+    * Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks, and any failures should lead to issuance of a `FromInteraction::ReportPeer` message.
+    * If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Concluded(RecoveryError::Invalid)`. If correct, break and issue `Concluded(Ok(available_data))`.
+    * While there are fewer than `N_PARALLEL` entries in `requesting_chunks`,
+        * Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, break and set the phase to `Concluded(None)`.
+        * Initialize `(tx, rx)`.
+        * Issue a `FromInteraction::MakeChunkRequest(validator, candidate_hash, validator_index, tx)`.
+        * Add `rx` to `requesting_chunks`.
diff --git a/polkadot/roadmap/implementers-guide/src/types/network.md b/polkadot/roadmap/implementers-guide/src/types/network.md
index a2da009d7cf78390205a1460118b1d70b7158d63..cee03f70e07670d9294415aba193d267cfc7675d 100644
--- a/polkadot/roadmap/implementers-guide/src/types/network.md
+++ b/polkadot/roadmap/implementers-guide/src/types/network.md
@@ -58,6 +58,12 @@ enum AvailabilityRecoveryV1Message {
 	/// Respond with chunk for a given candidate hash and validator index.
 	/// The response may be `None` if the requestee does not have the chunk.
 	Chunk(RequestId, Option<ErasureChunk>),
+	/// Request the full data for a given candidate hash.
+	RequestFullData(RequestId, CandidateHash),
+	/// Respond with data for a given candidate hash and validator index.
+	/// The response may be `None` if the requestee does not have the data.
+	FullData(RequestId, Option<AvailableData>),
+
 }
 ```
 
diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
index 81ca8ef49026a5af21aea5b01243fc43ac547187..8f01459140db6faf7d50938dd7b2a5b73a7f06e2 100644
--- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
+++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
@@ -160,6 +160,7 @@ enum AvailabilityRecoveryMessage {
     RecoverAvailableData(
         CandidateReceipt,
         SessionIndex,
+        Option<GroupIndex>, // Backing validator group to request the data directly from.
         ResponseChannel<Result<AvailableData, RecoveryError>>,
     ),
 }