From b13e07bc47073c2972b1d1d82321cb359803874c Mon Sep 17 00:00:00 2001
From: Chris Sosnin <48099298+slumber@users.noreply.github.com>
Date: Wed, 5 Oct 2022 11:48:50 +0400
Subject: [PATCH] Buffered connection management for collator-protocol (#6022)

* Extract metrics into a separate module

* Introduce validators buffer

* Integrate buffer into the subsystem

* Only reconnect on new advertisements

* Test

* comma

* doc comment

* Make capacity buffer compile time non-zero

* Add doc comments

* nits

* remove derives

* review

* better naming

* check timeout

* Extract interval stream into lib

* Ensure collator disconnects after timeout

* spellcheck

* rename buf

* Remove double interval

* Add a log on timeout

* Cleanup buffer on timeout
---
 polkadot/Cargo.lock                           |   5 +-
 .../node/network/collator-protocol/Cargo.toml |   1 +
 .../src/collator_side/metrics.rs              | 123 +++++++
 .../src/collator_side/mod.rs                  | 254 +++++++-------
 .../src/collator_side/tests.rs                | 116 ++++++-
 .../src/collator_side/validators_buffer.rs    | 317 ++++++++++++++++++
 .../node/network/collator-protocol/src/lib.rs |  27 +-
 .../src/validator_side/mod.rs                 |  29 +-
 8 files changed, 707 insertions(+), 165 deletions(-)
 create mode 100644 polkadot/node/network/collator-protocol/src/collator_side/metrics.rs
 create mode 100644 polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index cb59e04104f..015056f42a3 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -555,9 +555,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
 
 [[package]]
 name = "bitvec"
-version = "1.0.0"
+version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1489fcb93a5bb47da0462ca93ad252ad6af2145cce58d10d46a83931ba9f016b"
+checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c"
 dependencies = [
  "funty",
  "radium",
@@ -6290,6 +6290,7 @@ version = "0.9.29"
 dependencies = [
  "always-assert",
  "assert_matches",
+ "bitvec",
  "env_logger 0.9.0",
  "fatality",
  "futures",
diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml
index df9e75c9e95..e089719106b 100644
--- a/polkadot/node/network/collator-protocol/Cargo.toml
+++ b/polkadot/node/network/collator-protocol/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2021"
 
 [dependencies]
 always-assert = "0.1.2"
+bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] }
 futures = "0.3.21"
 futures-timer = "3"
 gum = { package = "tracing-gum", path = "../../gum" }
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs b/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs
new file mode 100644
index 00000000000..85e00406b9b
--- /dev/null
+++ b/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs
@@ -0,0 +1,123 @@
+// Copyright 2017-2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use polkadot_node_subsystem_util::metrics::{self, prometheus};
+
+#[derive(Clone, Default)]
+pub struct Metrics(Option<MetricsInner>);
+
+impl Metrics {
+	pub fn on_advertisment_made(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.advertisements_made.inc();
+		}
+	}
+
+	pub fn on_collation_sent_requested(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.collations_send_requested.inc();
+		}
+	}
+
+	pub fn on_collation_sent(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.collations_sent.inc();
+		}
+	}
+
+	/// Provide a timer for `process_msg` which observes on drop.
+	pub fn time_process_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
+		self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
+	}
+
+	/// Provide a timer for `distribute_collation` which observes on drop.
+	pub fn time_collation_distribution(
+		&self,
+		label: &'static str,
+	) -> Option<prometheus::prometheus::HistogramTimer> {
+		self.0.as_ref().map(|metrics| {
+			metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
+		})
+	}
+}
+
+#[derive(Clone)]
+struct MetricsInner {
+	advertisements_made: prometheus::Counter<prometheus::U64>,
+	collations_sent: prometheus::Counter<prometheus::U64>,
+	collations_send_requested: prometheus::Counter<prometheus::U64>,
+	process_msg: prometheus::Histogram,
+	collation_distribution_time: prometheus::HistogramVec,
+}
+
+impl metrics::Metrics for Metrics {
+	fn try_register(
+		registry: &prometheus::Registry,
+	) -> std::result::Result<Self, prometheus::PrometheusError> {
+		let metrics = MetricsInner {
+			advertisements_made: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_collation_advertisements_made_total",
+					"A number of collation advertisements sent to validators.",
+				)?,
+				registry,
+			)?,
+			collations_send_requested: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_collations_sent_requested_total",
+					"A number of collations requested to be sent to validators.",
+				)?,
+				registry,
+			)?,
+			collations_sent: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_collations_sent_total",
+					"A number of collations sent to validators.",
+				)?,
+				registry,
+			)?,
+			process_msg: prometheus::register(
+				prometheus::Histogram::with_opts(
+					prometheus::HistogramOpts::new(
+						"polkadot_parachain_collator_protocol_collator_process_msg",
+						"Time spent within `collator_protocol_collator::process_msg`",
+					)
+					.buckets(vec![
+						0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
+						1.0,
+					]),
+				)?,
+				registry,
+			)?,
+			collation_distribution_time: prometheus::register(
+				prometheus::HistogramVec::new(
+					prometheus::HistogramOpts::new(
+						"polkadot_parachain_collator_protocol_collator_distribution_time",
+						"Time spent within `collator_protocol_collator::distribute_collation`",
+					)
+					.buckets(vec![
+						0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
+						1.0,
+					]),
+					&["state"],
+				)?,
+				registry,
+			)?,
+		};
+
+		Ok(Metrics(Some(metrics)))
+	}
+}
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
index c1a20a2a670..4f2eea2ca74 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
@@ -17,7 +17,7 @@
 use std::{
 	collections::{HashMap, HashSet, VecDeque},
 	pin::Pin,
-	time::Duration,
+	time::{Duration, Instant},
 };
 
 use futures::{
@@ -44,19 +44,25 @@ use polkadot_node_subsystem::{
 	overseer, FromOrchestra, OverseerSignal, PerLeafSpan,
 };
 use polkadot_node_subsystem_util::{
-	metrics::{self, prometheus},
 	runtime::{get_availability_cores, get_group_rotation_info, RuntimeInfo},
 	TimeoutExt,
 };
 use polkadot_primitives::v2::{
 	AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState,
-	Hash, Id as ParaId,
+	GroupIndex, Hash, Id as ParaId, SessionIndex,
 };
 
 use super::LOG_TARGET;
 use crate::error::{log_error, Error, FatalError, Result};
 use fatality::Split;
 
+mod metrics;
+mod validators_buffer;
+
+use validators_buffer::{ValidatorGroupsBuffer, VALIDATORS_BUFFER_CAPACITY};
+
+pub use metrics::Metrics;
+
 #[cfg(test)]
 mod tests;
 
@@ -73,111 +79,16 @@ const COST_APPARENT_FLOOD: Rep =
 /// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386
 const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150);
 
-#[derive(Clone, Default)]
-pub struct Metrics(Option<MetricsInner>);
-
-impl Metrics {
-	fn on_advertisment_made(&self) {
-		if let Some(metrics) = &self.0 {
-			metrics.advertisements_made.inc();
-		}
-	}
-
-	fn on_collation_sent_requested(&self) {
-		if let Some(metrics) = &self.0 {
-			metrics.collations_send_requested.inc();
-		}
-	}
-
-	fn on_collation_sent(&self) {
-		if let Some(metrics) = &self.0 {
-			metrics.collations_sent.inc();
-		}
-	}
-
-	/// Provide a timer for `process_msg` which observes on drop.
-	fn time_process_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
-		self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
-	}
-
-	/// Provide a timer for `distribute_collation` which observes on drop.
-	fn time_collation_distribution(
-		&self,
-		label: &'static str,
-	) -> Option<prometheus::prometheus::HistogramTimer> {
-		self.0.as_ref().map(|metrics| {
-			metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
-		})
-	}
-}
-
-#[derive(Clone)]
-struct MetricsInner {
-	advertisements_made: prometheus::Counter<prometheus::U64>,
-	collations_sent: prometheus::Counter<prometheus::U64>,
-	collations_send_requested: prometheus::Counter<prometheus::U64>,
-	process_msg: prometheus::Histogram,
-	collation_distribution_time: prometheus::HistogramVec,
-}
+/// Ensure that collator issues a connection request at least once every this many seconds.
+/// Usually it's done when advertising new collation. However, if the core stays occupied or
+/// it's not our turn to produce a candidate, it's important to disconnect from previous
+/// peers.
+///
+/// Validators are obtained from [`ValidatorGroupsBuffer::validators_to_connect`].
+const RECONNECT_TIMEOUT: Duration = Duration::from_secs(12);
 
-impl metrics::Metrics for Metrics {
-	fn try_register(
-		registry: &prometheus::Registry,
-	) -> std::result::Result<Self, prometheus::PrometheusError> {
-		let metrics = MetricsInner {
-			advertisements_made: prometheus::register(
-				prometheus::Counter::new(
-					"polkadot_parachain_collation_advertisements_made_total",
-					"A number of collation advertisements sent to validators.",
-				)?,
-				registry,
-			)?,
-			collations_send_requested: prometheus::register(
-				prometheus::Counter::new(
-					"polkadot_parachain_collations_sent_requested_total",
-					"A number of collations requested to be sent to validators.",
-				)?,
-				registry,
-			)?,
-			collations_sent: prometheus::register(
-				prometheus::Counter::new(
-					"polkadot_parachain_collations_sent_total",
-					"A number of collations sent to validators.",
-				)?,
-				registry,
-			)?,
-			process_msg: prometheus::register(
-				prometheus::Histogram::with_opts(
-					prometheus::HistogramOpts::new(
-						"polkadot_parachain_collator_protocol_collator_process_msg",
-						"Time spent within `collator_protocol_collator::process_msg`",
-					)
-					.buckets(vec![
-						0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
-						1.0,
-					]),
-				)?,
-				registry,
-			)?,
-			collation_distribution_time: prometheus::register(
-				prometheus::HistogramVec::new(
-					prometheus::HistogramOpts::new(
-						"polkadot_parachain_collator_protocol_collator_distribution_time",
-						"Time spent within `collator_protocol_collator::distribute_collation`",
-					)
-					.buckets(vec![
-						0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
-						1.0,
-					]),
-					&["state"],
-				)?,
-				registry,
-			)?,
-		};
-
-		Ok(Metrics(Some(metrics)))
-	}
-}
+/// How often to check for reconnect timeout.
+const RECONNECT_POLL: Duration = Duration::from_secs(1);
 
 /// Info about validators we are currently connected to.
 ///
@@ -269,8 +180,14 @@ struct WaitingCollationFetches {
 	waiting_peers: HashSet<PeerId>,
 }
 
+struct CollationSendResult {
+	relay_parent: Hash,
+	peer_id: PeerId,
+	timed_out: bool,
+}
+
 type ActiveCollationFetches =
-	FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, PeerId)> + Send + 'static>>>;
+	FuturesUnordered<Pin<Box<dyn Future<Output = CollationSendResult> + Send + 'static>>>;
 
 struct State {
 	/// Our network peer id.
@@ -308,6 +225,13 @@ struct State {
 	/// by `PeerConnected` events.
 	peer_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
 
+	/// Tracks which validators we want to stay connected to.
+	validator_groups_buf: ValidatorGroupsBuffer,
+
+	/// Timestamp of the last connection request to a non-empty list of validators,
+	/// `None` otherwise.
+	last_connected_at: Option<Instant>,
+
 	/// Metrics.
 	metrics: Metrics,
 
@@ -339,6 +263,8 @@ impl State {
 			collation_result_senders: Default::default(),
 			our_validators_groups: Default::default(),
 			peer_ids: Default::default(),
+			validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY),
+			last_connected_at: None,
 			waiting_collation_fetches: Default::default(),
 			active_collation_fetches: Default::default(),
 		}
@@ -373,6 +299,7 @@ async fn distribute_collation<Context>(
 	result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
 ) -> Result<()> {
 	let relay_parent = receipt.descriptor.relay_parent;
+	let candidate_hash = receipt.hash();
 
 	// This collation is not in the active-leaves set.
 	if !state.view.contains(&relay_parent) {
@@ -412,10 +339,10 @@ async fn distribute_collation<Context>(
 	};
 
 	// Determine the group on that core.
-	let current_validators =
+	let GroupValidators { validators, session_index, group_index } =
 		determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
 
-	if current_validators.validators.is_empty() {
+	if validators.is_empty() {
 		gum::warn!(
 			target: LOG_TARGET,
 			core = ?our_core,
@@ -425,24 +352,36 @@ async fn distribute_collation<Context>(
 		return Ok(())
 	}
 
+	// It's important to insert new collation bits **before**
+	// issuing a connection request.
+	//
+	// If a validator managed to fetch all the relevant collations
+	// but still assigned to our core, we keep the connection alive.
+	state.validator_groups_buf.note_collation_advertised(
+		relay_parent,
+		session_index,
+		group_index,
+		&validators,
+	);
+
 	gum::debug!(
 		target: LOG_TARGET,
 		para_id = %id,
 		relay_parent = %relay_parent,
-		candidate_hash = ?receipt.hash(),
+		?candidate_hash,
 		pov_hash = ?pov.hash(),
 		core = ?our_core,
-		?current_validators,
+		current_validators = ?validators,
 		"Accepted collation, connecting to validators."
 	);
 
-	// Issue a discovery request for the validators of the current group:
-	connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;
+	// Update a set of connected validators if necessary.
+	state.last_connected_at = connect_to_validators(ctx, &state.validator_groups_buf).await;
 
 	state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());
 
 	if let Some(result_sender) = result_sender {
-		state.collation_result_senders.insert(receipt.hash(), result_sender);
+		state.collation_result_senders.insert(candidate_hash, result_sender);
 	}
 
 	state
@@ -483,6 +422,9 @@ async fn determine_core(
 struct GroupValidators {
 	/// The validators of above group (their discovery keys).
 	validators: Vec<AuthorityDiscoveryId>,
+
+	session_index: SessionIndex,
+	group_index: GroupIndex,
 }
 
 /// Figure out current group of validators assigned to the para being collated on.
@@ -516,7 +458,11 @@ async fn determine_our_validators<Context>(
 	let current_validators =
 		current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
 
-	let current_validators = GroupValidators { validators: current_validators };
+	let current_validators = GroupValidators {
+		validators: current_validators,
+		session_index,
+		group_index: current_group_index,
+	};
 
 	Ok(current_validators)
 }
@@ -541,13 +487,19 @@ async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: PeerId) {
 	}
 }
 
-/// Issue a connection request to a set of validators and
-/// revoke the previous connection request.
+/// Updates a set of connected validators based on their advertisement-bits
+/// in a validators buffer.
+///
+/// Returns current timestamp if the connection request was non-empty, `None`
+/// otherwise.
 #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
 async fn connect_to_validators<Context>(
 	ctx: &mut Context,
-	validator_ids: Vec<AuthorityDiscoveryId>,
-) {
+	validator_groups_buf: &ValidatorGroupsBuffer,
+) -> Option<Instant> {
+	let validator_ids = validator_groups_buf.validators_to_connect();
+	let is_disconnect = validator_ids.is_empty();
+
 	// ignore address resolution failure
 	// will reissue a new request on new collation
 	let (failed, _) = oneshot::channel();
@@ -557,6 +509,8 @@ async fn connect_to_validators<Context>(
 		failed,
 	})
 	.await;
+
+	(!is_disconnect).then_some(Instant::now())
 }
 
 /// Advertise collation to the given `peer`.
@@ -715,15 +669,9 @@ async fn send_collation(
 	state.active_collation_fetches.push(
 		async move {
 			let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
-			if r.is_none() {
-				gum::debug!(
-					target: LOG_TARGET,
-					?relay_parent,
-					?peer_id,
-					"Sending collation to validator timed out, carrying on with next validator."
-				);
-			}
-			(relay_parent, peer_id)
+			let timed_out = r.is_none();
+
+			CollationSendResult { relay_parent, peer_id, timed_out }
 		}
 		.boxed(),
 	);
@@ -986,6 +934,7 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()>
 		state.our_validators_groups.remove(removed);
 		state.span_per_relay_parent.remove(removed);
 		state.waiting_collation_fetches.remove(removed);
+		state.validator_groups_buf.remove_relay_parent(removed);
 	}
 
 	state.view = view;
@@ -1007,6 +956,9 @@ pub(crate) async fn run<Context>(
 	let mut state = State::new(local_peer_id, collator_pair, metrics);
 	let mut runtime = RuntimeInfo::new(None);
 
+	let reconnect_stream = super::tick_stream(RECONNECT_POLL);
+	pin_mut!(reconnect_stream);
+
 	loop {
 		let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
 		pin_mut!(recv_req);
@@ -1022,7 +974,25 @@ pub(crate) async fn run<Context>(
 				FromOrchestra::Signal(BlockFinalized(..)) => {}
 				FromOrchestra::Signal(Conclude) => return Ok(()),
 			},
-			(relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => {
+			CollationSendResult {
+				relay_parent,
+				peer_id,
+				timed_out,
+			} = state.active_collation_fetches.select_next_some() => {
+				if timed_out {
+					gum::debug!(
+						target: LOG_TARGET,
+						?relay_parent,
+						?peer_id,
+						"Sending collation to validator timed out, carrying on with next validator",
+					);
+				} else {
+					for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
+						// Timeout not hit, this peer is no longer interested in this relay parent.
+						state.validator_groups_buf.reset_validator_interest(relay_parent, authority_id);
+					}
+				}
+
 				let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
 					waiting.waiting_peers.remove(&peer_id);
 					if let Some(next) = waiting.waiting.pop_front() {
@@ -1042,7 +1012,29 @@ pub(crate) async fn run<Context>(
 
 					send_collation(&mut state, next, receipt, pov).await;
 				}
-			}
+			},
+			_ = reconnect_stream.next() => {
+				let now = Instant::now();
+				if state
+					.last_connected_at
+					.map_or(false, |timestamp| now - timestamp > RECONNECT_TIMEOUT)
+				{
+					// Remove all advertisements from the buffer if the timeout was hit.
+					// Usually, it shouldn't be necessary as leaves get deactivated, rather
+					// serves as a safeguard against finality lags.
+					state.validator_groups_buf.clear_advertisements();
+					// Returns `None` if connection request is empty.
+					state.last_connected_at =
+						connect_to_validators(&mut ctx, &state.validator_groups_buf).await;
+
+					gum::debug!(
+						target: LOG_TARGET,
+						timeout = ?RECONNECT_TIMEOUT,
+						"Timeout hit, sent a connection request. Disconnected from all validators = {}",
+						state.last_connected_at.is_none(),
+					);
+				}
+			},
 			in_req = recv_req => {
 				match in_req {
 					Ok(req) => {
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
index 2d2f2cf043d..c20a2d6c97a 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
@@ -56,7 +56,7 @@ struct TestState {
 	group_rotation_info: GroupRotationInfo,
 	validator_peer_id: Vec<PeerId>,
 	relay_parent: Hash,
-	availability_core: CoreState,
+	availability_cores: Vec<CoreState>,
 	local_peer_id: PeerId,
 	collator_pair: CollatorPair,
 	session_index: SessionIndex,
@@ -88,14 +88,15 @@ impl Default for TestState {
 		let validator_peer_id =
 			std::iter::repeat_with(|| PeerId::random()).take(discovery_keys.len()).collect();
 
-		let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]]
+		let validator_groups = vec![vec![2, 0, 4], vec![1, 3]]
 			.into_iter()
 			.map(|g| g.into_iter().map(ValidatorIndex).collect())
 			.collect();
 		let group_rotation_info =
 			GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, now: 1 };
 
-		let availability_core = CoreState::Scheduled(ScheduledCore { para_id, collator: None });
+		let availability_cores =
+			vec![CoreState::Scheduled(ScheduledCore { para_id, collator: None }), CoreState::Free];
 
 		let relay_parent = Hash::random();
 
@@ -122,7 +123,7 @@ impl Default for TestState {
 			group_rotation_info,
 			validator_peer_id,
 			relay_parent,
-			availability_core,
+			availability_cores,
 			local_peer_id,
 			collator_pair,
 			session_index: 1,
@@ -132,7 +133,9 @@ impl Default for TestState {
 
 impl TestState {
 	fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
-		&self.session_info.validator_groups[0]
+		let core_num = self.availability_cores.len();
+		let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(CoreIndex(0), core_num);
+		&self.session_info.validator_groups[group_idx as usize]
 	}
 
 	fn current_session_index(&self) -> SessionIndex {
@@ -333,7 +336,7 @@ async fn distribute_collation(
 			RuntimeApiRequest::AvailabilityCores(tx)
 		)) => {
 			assert_eq!(relay_parent, test_state.relay_parent);
-			tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
+			tx.send(Ok(test_state.availability_cores.clone())).unwrap();
 		}
 	);
 
@@ -987,3 +990,104 @@ where
 		test_harness
 	});
 }
+
+#[test]
+fn connect_to_buffered_groups() {
+	let mut test_state = TestState::default();
+	let local_peer_id = test_state.local_peer_id.clone();
+	let collator_pair = test_state.collator_pair.clone();
+
+	test_harness(local_peer_id, collator_pair, |test_harness| async move {
+		let mut virtual_overseer = test_harness.virtual_overseer;
+		let mut req_cfg = test_harness.req_cfg;
+
+		setup_system(&mut virtual_overseer, &test_state).await;
+
+		let group_a = test_state.current_group_validator_authority_ids();
+		let peers_a = test_state.current_group_validator_peer_ids();
+		assert!(group_a.len() > 1);
+
+		distribute_collation(&mut virtual_overseer, &test_state, false).await;
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::NetworkBridgeTx(
+				NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. }
+			) => {
+				assert_eq!(group_a, validator_ids);
+			}
+		);
+
+		let head_a = test_state.relay_parent;
+
+		for (val, peer) in group_a.iter().zip(&peers_a) {
+			connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
+		}
+
+		for peer_id in &peers_a {
+			expect_declare_msg(&mut virtual_overseer, &test_state, peer_id).await;
+		}
+
+		// Update views.
+		for peed_id in &peers_a {
+			send_peer_view_change(&mut virtual_overseer, peed_id, vec![head_a]).await;
+			expect_advertise_collation_msg(&mut virtual_overseer, peed_id, head_a).await;
+		}
+
+		let peer = peers_a[0];
+		// Peer from the group fetches the collation.
+		let (pending_response, rx) = oneshot::channel();
+		req_cfg
+			.inbound_queue
+			.as_mut()
+			.unwrap()
+			.send(RawIncomingRequest {
+				peer,
+				payload: CollationFetchingRequest {
+					relay_parent: head_a,
+					para_id: test_state.para_id,
+				}
+				.encode(),
+				pending_response,
+			})
+			.await
+			.unwrap();
+		assert_matches!(
+			rx.await,
+			Ok(full_response) => {
+				let CollationFetchingResponse::Collation(..): CollationFetchingResponse =
+					CollationFetchingResponse::decode(
+						&mut full_response.result.expect("We should have a proper answer").as_ref(),
+					)
+					.expect("Decoding should work");
+			}
+		);
+
+		test_state.advance_to_new_round(&mut virtual_overseer, true).await;
+		test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation();
+
+		let head_b = test_state.relay_parent;
+		let group_b = test_state.current_group_validator_authority_ids();
+		assert_ne!(head_a, head_b);
+		assert_ne!(group_a, group_b);
+
+		distribute_collation(&mut virtual_overseer, &test_state, false).await;
+
+		// Should be connected to both groups except for the validator that fetched advertised
+		// collation.
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::NetworkBridgeTx(
+				NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. }
+			) => {
+				assert!(!validator_ids.contains(&group_a[0]));
+
+				for validator in group_a[1..].iter().chain(&group_b) {
+					assert!(validator_ids.contains(validator));
+				}
+			}
+		);
+
+		TestHarness { virtual_overseer, req_cfg }
+	});
+}
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs
new file mode 100644
index 00000000000..5bb31c72d6c
--- /dev/null
+++ b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs
@@ -0,0 +1,317 @@
+// Copyright 2017-2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Validator groups buffer for connection managements.
+//!
+//! Solves 2 problems:
+//! 	1. A collator may want to stay connected to multiple groups on rotation boundaries.
+//! 	2. It's important to disconnect from validator when there're no collations to be fetched.
+//!
+//! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement,
+//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise.
+//!
+//! The bit is set to 1 for the whole **group** whenever it's inserted into the buffer. Given a relay
+//! parent, one can reset a bit back to 0 for particular **validator**. For example, if a collation
+//! was fetched or some timeout has been hit.
+//!
+//! The bitwise OR over known advertisements gives us validators indices for connection request.
+
+use std::{
+	collections::{HashMap, VecDeque},
+	num::NonZeroUsize,
+	ops::Range,
+};
+
+use bitvec::{bitvec, vec::BitVec};
+
+use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex};
+
+/// The ring buffer stores at most this many unique validator groups.
+///
+/// This value should be chosen in way that all groups assigned to our para
+/// in the view can fit into the buffer.
+pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) {
+	Some(cap) => cap,
+	None => panic!("buffer capacity must be non-zero"),
+};
+
+/// Unique identifier of a validators group.
+#[derive(Debug)]
+struct ValidatorsGroupInfo {
+	/// Number of validators in the group.
+	len: usize,
+	session_index: SessionIndex,
+	group_index: GroupIndex,
+}
+
+/// Ring buffer of validator groups.
+///
+/// Tracks which peers we want to be connected to with respect to advertised collations.
+#[derive(Debug)]
+pub struct ValidatorGroupsBuffer {
+	/// Validator groups identifiers we **had** advertisements for.
+	group_infos: VecDeque<ValidatorsGroupInfo>,
+	/// Continuous buffer of validators discovery keys.
+	validators: VecDeque<AuthorityDiscoveryId>,
+	/// Mapping from relay-parent to bit-vectors with bits for all `validators`.
+	/// Invariants kept: All bit-vectors are guaranteed to have the same size.
+	should_be_connected: HashMap<Hash, BitVec>,
+	/// Buffer capacity, limits the number of **groups** tracked.
+	cap: NonZeroUsize,
+}
+
+impl ValidatorGroupsBuffer {
+	/// Creates a new buffer with a non-zero capacity.
+	pub fn with_capacity(cap: NonZeroUsize) -> Self {
+		Self {
+			group_infos: VecDeque::new(),
+			validators: VecDeque::new(),
+			should_be_connected: HashMap::new(),
+			cap,
+		}
+	}
+
+	/// Returns discovery ids of validators we have at least one advertised-but-not-fetched
+	/// collation for.
+	pub fn validators_to_connect(&self) -> Vec<AuthorityDiscoveryId> {
+		let validators_num = self.validators.len();
+		let bits = self
+			.should_be_connected
+			.values()
+			.fold(bitvec![0; validators_num], |acc, next| acc | next);
+
+		self.validators
+			.iter()
+			.enumerate()
+			.filter_map(|(idx, authority_id)| bits[idx].then_some(authority_id.clone()))
+			.collect()
+	}
+
+	/// Note a new advertisement, marking that we want to be connected to validators
+	/// from this group.
+	///
+	/// If max capacity is reached and the group is new, drops validators from the back
+	/// of the buffer.
+	pub fn note_collation_advertised(
+		&mut self,
+		relay_parent: Hash,
+		session_index: SessionIndex,
+		group_index: GroupIndex,
+		validators: &[AuthorityDiscoveryId],
+	) {
+		if validators.is_empty() {
+			return
+		}
+
+		match self.group_infos.iter().enumerate().find(|(_, group)| {
+			group.session_index == session_index && group.group_index == group_index
+		}) {
+			Some((idx, group)) => {
+				let group_start_idx = self.group_lengths_iter().take(idx).sum();
+				self.set_bits(relay_parent, group_start_idx..(group_start_idx + group.len));
+			},
+			None => self.push(relay_parent, session_index, group_index, validators),
+		}
+	}
+
+	/// Note that a validator is no longer interested in a given relay parent.
+	pub fn reset_validator_interest(
+		&mut self,
+		relay_parent: Hash,
+		authority_id: &AuthorityDiscoveryId,
+	) {
+		let bits = match self.should_be_connected.get_mut(&relay_parent) {
+			Some(bits) => bits,
+			None => return,
+		};
+
+		for (idx, auth_id) in self.validators.iter().enumerate() {
+			if auth_id == authority_id {
+				bits.set(idx, false);
+			}
+		}
+	}
+
+	/// Remove relay parent from the buffer.
+	///
+	/// The buffer will no longer track which validators are interested in a corresponding
+	/// advertisement.
+	pub fn remove_relay_parent(&mut self, relay_parent: &Hash) {
+		self.should_be_connected.remove(relay_parent);
+	}
+
+	/// Removes all advertisements from the buffer.
+	pub fn clear_advertisements(&mut self) {
+		self.should_be_connected.clear();
+	}
+
+	/// Pushes a new group to the buffer along with advertisement, setting all validators
+	/// bits to 1.
+	///
+	/// If the buffer is full, drops group from the tail.
+	fn push(
+		&mut self,
+		relay_parent: Hash,
+		session_index: SessionIndex,
+		group_index: GroupIndex,
+		validators: &[AuthorityDiscoveryId],
+	) {
+		let new_group_info =
+			ValidatorsGroupInfo { len: validators.len(), session_index, group_index };
+
+		let buf = &mut self.group_infos;
+		let cap = self.cap.get();
+
+		if buf.len() >= cap {
+			let pruned_group = buf.pop_front().expect("buf is not empty; qed");
+			self.validators.drain(..pruned_group.len);
+
+			self.should_be_connected.values_mut().for_each(|bits| {
+				bits.as_mut_bitslice().shift_left(pruned_group.len);
+			});
+		}
+
+		self.validators.extend(validators.iter().cloned());
+		buf.push_back(new_group_info);
+		let buf_len = buf.len();
+		let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum();
+
+		let new_len = self.validators.len();
+		self.should_be_connected
+			.values_mut()
+			.for_each(|bits| bits.resize(new_len, false));
+		self.set_bits(relay_parent, group_start_idx..(group_start_idx + validators.len()));
+	}
+
+	/// Sets advertisement bits to 1 in a given range (usually corresponding to some group).
+	/// If the relay parent is unknown, inserts 0-initialized bitvec first.
+	///
+	/// The range must be ensured to be within bounds.
+	fn set_bits(&mut self, relay_parent: Hash, range: Range<usize>) {
+		let bits = self
+			.should_be_connected
+			.entry(relay_parent)
+			.or_insert_with(|| bitvec![0; self.validators.len()]);
+
+		bits[range].fill(true);
+	}
+
+	/// Returns iterator over numbers of validators in groups.
+	///
+	/// Useful for getting an index of the first validator in i-th group.
+	fn group_lengths_iter(&self) -> impl Iterator<Item = usize> + '_ {
+		self.group_infos.iter().map(|group| group.len)
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use sp_keyring::Sr25519Keyring;
+
+	#[test]
+	fn one_capacity_buffer() {
+		let cap = NonZeroUsize::new(1).unwrap();
+		let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
+
+		let hash_a = Hash::repeat_byte(0x1);
+		let hash_b = Hash::repeat_byte(0x2);
+
+		let validators: Vec<_> = [
+			Sr25519Keyring::Alice,
+			Sr25519Keyring::Bob,
+			Sr25519Keyring::Charlie,
+			Sr25519Keyring::Dave,
+			Sr25519Keyring::Ferdie,
+		]
+		.into_iter()
+		.map(|key| AuthorityDiscoveryId::from(key.public()))
+		.collect();
+
+		assert!(buf.validators_to_connect().is_empty());
+
+		buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]);
+		assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
+
+		buf.reset_validator_interest(hash_a, &validators[1]);
+		assert_eq!(buf.validators_to_connect(), vec![validators[0].clone()]);
+
+		buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]);
+		assert_eq!(buf.validators_to_connect(), validators[2..].to_vec());
+
+		for validator in &validators[2..] {
+			buf.reset_validator_interest(hash_b, validator);
+		}
+		assert!(buf.validators_to_connect().is_empty());
+	}
+
+	#[test]
+	fn buffer_works() {
+		let cap = NonZeroUsize::new(3).unwrap();
+		let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
+
+		let hashes: Vec<_> = (0..5).map(Hash::repeat_byte).collect();
+
+		let validators: Vec<_> = [
+			Sr25519Keyring::Alice,
+			Sr25519Keyring::Bob,
+			Sr25519Keyring::Charlie,
+			Sr25519Keyring::Dave,
+			Sr25519Keyring::Ferdie,
+		]
+		.into_iter()
+		.map(|key| AuthorityDiscoveryId::from(key.public()))
+		.collect();
+
+		buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]);
+		buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]);
+		buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
+		buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
+
+		assert_eq!(buf.validators_to_connect(), validators[..4].to_vec());
+
+		for validator in &validators[2..4] {
+			buf.reset_validator_interest(hashes[2], validator);
+		}
+
+		buf.reset_validator_interest(hashes[1], &validators[0]);
+		assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
+
+		buf.reset_validator_interest(hashes[0], &validators[0]);
+		assert_eq!(buf.validators_to_connect(), vec![validators[1].clone()]);
+
+		buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]);
+		buf.note_collation_advertised(
+			hashes[4],
+			0,
+			GroupIndex(2),
+			std::slice::from_ref(&validators[4]),
+		);
+
+		buf.reset_validator_interest(hashes[3], &validators[2]);
+		buf.note_collation_advertised(
+			hashes[4],
+			0,
+			GroupIndex(3),
+			std::slice::from_ref(&validators[0]),
+		);
+
+		assert_eq!(
+			buf.validators_to_connect(),
+			vec![validators[3].clone(), validators[4].clone(), validators[0].clone()]
+		);
+	}
+}
diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs
index 66659e4b5be..b71acc127c8 100644
--- a/polkadot/node/network/collator-protocol/src/lib.rs
+++ b/polkadot/node/network/collator-protocol/src/lib.rs
@@ -21,9 +21,12 @@
 #![deny(unused_crate_dependencies)]
 #![recursion_limit = "256"]
 
-use std::time::Duration;
+use std::time::{Duration, Instant};
 
-use futures::{FutureExt, TryFutureExt};
+use futures::{
+	stream::{FusedStream, StreamExt},
+	FutureExt, TryFutureExt,
+};
 
 use sp_keystore::SyncCryptoStorePtr;
 
@@ -134,3 +137,23 @@ async fn modify_reputation(
 
 	sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await;
 }
+
+/// Wait until tick and return the timestamp for the following one.
+async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant {
+	let now = Instant::now();
+	let next_poll = last_poll + period;
+
+	if next_poll > now {
+		futures_timer::Delay::new(next_poll - now).await
+	}
+
+	Instant::now()
+}
+
+/// Returns an infinite stream that yields with an interval of `period`.
+fn tick_stream(period: Duration) -> impl FusedStream<Item = ()> {
+	futures::stream::unfold(Instant::now(), move |next_check| async move {
+		Some(((), wait_until_next_tick(next_check, period).await))
+	})
+	.fuse()
+}
diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs
index 47795aac0ce..b74c1d5b5a4 100644
--- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs
@@ -19,7 +19,7 @@ use futures::{
 	channel::oneshot,
 	future::{BoxFuture, Fuse, FusedFuture},
 	select,
-	stream::{FusedStream, FuturesUnordered},
+	stream::FuturesUnordered,
 	FutureExt, StreamExt,
 };
 use futures_timer::Delay;
@@ -57,7 +57,7 @@ use polkadot_primitives::v2::{CandidateReceipt, CollatorId, Hash, Id as ParaId};
 
 use crate::error::Result;
 
-use super::{modify_reputation, LOG_TARGET};
+use super::{modify_reputation, tick_stream, LOG_TARGET};
 
 #[cfg(test)]
 mod tests;
@@ -97,7 +97,7 @@ const ACTIVITY_POLL: Duration = Duration::from_millis(10);
 // How often to poll collation responses.
 // This is a hack that should be removed in a refactoring.
 // See https://github.com/paritytech/polkadot/issues/4182
-const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5);
+const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(50);
 
 #[derive(Clone, Default)]
 pub struct Metrics(Option<MetricsInner>);
@@ -1167,25 +1167,6 @@ async fn process_msg<Context>(
 	}
 }
 
-// wait until next inactivity check. returns the instant for the following check.
-async fn wait_until_next_check(last_poll: Instant) -> Instant {
-	let now = Instant::now();
-	let next_poll = last_poll + ACTIVITY_POLL;
-
-	if next_poll > now {
-		Delay::new(next_poll - now).await
-	}
-
-	Instant::now()
-}
-
-fn infinite_stream(every: Duration) -> impl FusedStream<Item = ()> {
-	futures::stream::unfold(Instant::now() + every, |next_check| async move {
-		Some(((), wait_until_next_check(next_check).await))
-	})
-	.fuse()
-}
-
 /// The main run loop.
 #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
 pub(crate) async fn run<Context>(
@@ -1196,10 +1177,10 @@ pub(crate) async fn run<Context>(
 ) -> std::result::Result<(), crate::error::FatalError> {
 	let mut state = State { metrics, ..Default::default() };
 
-	let next_inactivity_stream = infinite_stream(ACTIVITY_POLL);
+	let next_inactivity_stream = tick_stream(ACTIVITY_POLL);
 	futures::pin_mut!(next_inactivity_stream);
 
-	let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL);
+	let check_collations_stream = tick_stream(CHECK_COLLATIONS_POLL);
 	futures::pin_mut!(check_collations_stream);
 
 	loop {
-- 
GitLab