Fill up requests slots via `launch_parallel_requests` (#3681)

in case waiting for the next response takes too long.
......@@ -21,6 +21,7 @@
use std::{
collections::{HashMap, VecDeque},
use futures::{
......@@ -43,7 +44,7 @@ use polkadot_node_network_protocol::{
IfDisconnected, UnifiedReputationChange as Rep,
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_node_subsystem_util::request_session_info;
use polkadot_node_subsystem_util::{request_session_info, TimeoutExt};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex,
Hash, HashT, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex,
......@@ -72,6 +73,10 @@ const LRU_SIZE: usize = 16;
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
/// Max time we want to wait for responses, before calling `launch_parallel_requests` again to fill
/// up slots.
const MAX_CHUNK_WAIT: Duration = Duration::from_secs(1);
/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
fast_path: bool,
......@@ -285,7 +290,11 @@ impl RequestChunksPhase {
async fn wait_for_chunks(&mut self, params: &InteractionParams) {
// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
while let Some(request_result) = self.requesting_chunks.next().await {
// We will also stop, if there has not been a response for `MAX_CHUNK_WAIT`, so
// `launch_parallel_requests` cann fill up slots again.
while let Some(request_result) =
match request_result {
Ok(Some(chunk)) => {
// Check merkle proofs of any received chunks.
