From a5742b9ec1b9d356f6bd6bd1568583b79ab36c45 Mon Sep 17 00:00:00 2001
From: Vsevolod Stakhov <vsevolod.stakhov@parity.io>
Date: Fri, 22 Apr 2022 11:19:36 +0100
Subject: [PATCH] Extract peers grid topology related code to a separate unit
 (#5365)

* Initial attempt to extract grid topology related code

* Use shared code in the approval distribution subsystem

* Fix spellcheck issues

* Moe Aggression stuff back to the approval-distribution subsystem

* Cargo fmt
---
 polkadot/Cargo.lock                           |   1 +
 .../network/approval-distribution/src/lib.rs  | 212 +++++-------------
 polkadot/node/network/protocol/Cargo.toml     |   3 +-
 .../network/protocol/src/grid_topology.rs     | 195 ++++++++++++++++
 polkadot/node/network/protocol/src/lib.rs     |   2 +
 polkadot/scripts/gitlab/lingua.dic            |   2 +
 6 files changed, 256 insertions(+), 159 deletions(-)
 create mode 100644 polkadot/node/network/protocol/src/grid_topology.rs

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 26f93724ad6..1cc70186503 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -6993,6 +6993,7 @@ dependencies = [
  "polkadot-node-jaeger",
  "polkadot-node-primitives",
  "polkadot-primitives",
+ "rand 0.8.5",
  "sc-authority-discovery",
  "sc-network",
  "strum 0.24.0",
diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs
index c8e8741cba8..03705bb173d 100644
--- a/polkadot/node/network/approval-distribution/src/lib.rs
+++ b/polkadot/node/network/approval-distribution/src/lib.rs
@@ -22,8 +22,9 @@
 
 use futures::{channel::oneshot, FutureExt as _};
 use polkadot_node_network_protocol::{
-	self as net_protocol, v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, Versioned,
-	View,
+	self as net_protocol,
+	grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
+	v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, Versioned, View,
 };
 use polkadot_node_primitives::approval::{
 	AssignmentCert, BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote,
@@ -62,14 +63,6 @@ const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message"
 const BENEFIT_VALID_MESSAGE_FIRST: Rep =
 	Rep::BenefitMinorFirst("Valid message with new information");
 
-/// The number of peers to randomly propagate messages to.
-const RANDOM_CIRCULATION: usize = 4;
-/// The sample rate for randomly propagating messages. This
-/// reduces the left tail of the binomial distribution but also
-/// introduces a bias towards peers who we sample before others
-/// (i.e. those who get a block before others).
-const RANDOM_SAMPLE_RATE: usize = polkadot_node_subsystem_util::MIN_GOSSIP_PEERS;
-
 /// The Approval Distribution subsystem.
 pub struct ApprovalDistribution {
 	metrics: Metrics,
@@ -98,99 +91,26 @@ impl RecentlyOutdated {
 	}
 }
 
-struct SessionTopology {
-	peers_x: HashSet<PeerId>,
-	validator_indices_x: HashSet<ValidatorIndex>,
-	peers_y: HashSet<PeerId>,
-	validator_indices_y: HashSet<ValidatorIndex>,
-}
-
-impl SessionTopology {
-	// Given the originator of a message, indicates the part of the topology
-	// we're meant to send the message to.
-	fn required_routing_for(&self, originator: ValidatorIndex, local: bool) -> RequiredRouting {
-		if local {
-			return RequiredRouting::GridXY
-		}
-
-		let grid_x = self.validator_indices_x.contains(&originator);
-		let grid_y = self.validator_indices_y.contains(&originator);
-
-		match (grid_x, grid_y) {
-			(false, false) => RequiredRouting::None,
-			(true, false) => RequiredRouting::GridY, // messages from X go to Y
-			(false, true) => RequiredRouting::GridX, // messages from Y go to X
-			(true, true) => RequiredRouting::GridXY, // if the grid works as expected, this shouldn't happen.
-		}
-	}
-
-	// Get a filter function based on this topology and the required routing
-	// which returns `true` for peers that are within the required routing set
-	// and false otherwise.
-	fn route_to_peer(&self, required_routing: RequiredRouting, peer: &PeerId) -> bool {
-		match required_routing {
-			RequiredRouting::All => true,
-			RequiredRouting::GridX => self.peers_x.contains(peer),
-			RequiredRouting::GridY => self.peers_y.contains(peer),
-			RequiredRouting::GridXY => self.peers_x.contains(peer) || self.peers_y.contains(peer),
-			RequiredRouting::None | RequiredRouting::PendingTopology => false,
-		}
-	}
-}
-
-impl From<network_bridge_event::NewGossipTopology> for SessionTopology {
-	fn from(topology: network_bridge_event::NewGossipTopology) -> Self {
-		let peers_x =
-			topology.our_neighbors_x.values().flat_map(|p| &p.peer_ids).cloned().collect();
-		let peers_y =
-			topology.our_neighbors_y.values().flat_map(|p| &p.peer_ids).cloned().collect();
-
-		let validator_indices_x =
-			topology.our_neighbors_x.values().map(|p| p.validator_index.clone()).collect();
-		let validator_indices_y =
-			topology.our_neighbors_y.values().map(|p| p.validator_index.clone()).collect();
-
-		SessionTopology { peers_x, peers_y, validator_indices_x, validator_indices_y }
-	}
-}
-
-#[derive(Default)]
-struct SessionTopologies {
-	inner: HashMap<SessionIndex, (Option<SessionTopology>, usize)>,
-}
-
-impl SessionTopologies {
-	fn get_topology(&self, session: SessionIndex) -> Option<&SessionTopology> {
-		self.inner.get(&session).and_then(|val| val.0.as_ref())
-	}
-
-	fn inc_session_refs(&mut self, session: SessionIndex) {
-		self.inner.entry(session).or_insert((None, 0)).1 += 1;
-	}
-
-	fn dec_session_refs(&mut self, session: SessionIndex) {
-		if let hash_map::Entry::Occupied(mut occupied) = self.inner.entry(session) {
-			occupied.get_mut().1 = occupied.get().1.saturating_sub(1);
-			if occupied.get().1 == 0 {
-				let _ = occupied.remove();
-			}
-		}
-	}
-
-	// No-op if already present.
-	fn insert_topology(&mut self, session: SessionIndex, topology: SessionTopology) {
-		let entry = self.inner.entry(session).or_insert((None, 0));
-		if entry.0.is_none() {
-			entry.0 = Some(topology);
-		}
-	}
-}
-
+// In case the original gtid topology mechanisms don't work on their own, we need to trade bandwidth
+// for protocol liveliness by introducing aggression.
+//
+// Aggression has 3 levels:
+//
+//  * Aggression Level 0: The basic behaviors described above.
+//  * Aggression Level 1: The originator of a message sends to all peers. Other peers follow the rules above.
+//  * Aggression Level 2: All peers send all messages to all their row and column neighbors.
+//    This means that each validator will, on average, receive each message approximately `2*sqrt(n)` times.
+// The aggression level of messages pertaining to a block increases when that block is unfinalized and
+// is a child of the finalized block.
+// This means that only one block at a time has its messages propagated with aggression > 0.
+//
 // A note on aggression thresholds: changes in propagation apply only to blocks which are the
 // _direct descendants_ of the finalized block which are older than the given threshold,
 // not to all blocks older than the threshold. Most likely, a few assignments struggle to
 // be propagated in a single block and this holds up all of its descendants blocks.
 // Accordingly, we only step on the gas for the block which is most obviously holding up finality.
+
+/// Aggression configuration representation
 #[derive(Clone)]
 struct AggressionConfig {
 	/// Aggression level 1: all validators send all their own messages to all peers.
@@ -203,6 +123,7 @@ struct AggressionConfig {
 }
 
 impl AggressionConfig {
+	/// Returns `true` if block is not too old depending on the aggression level
 	fn is_age_relevant(&self, block_age: BlockNumber) -> bool {
 		if let Some(t) = self.l1_threshold {
 			block_age >= t
@@ -224,6 +145,29 @@ impl Default for AggressionConfig {
 	}
 }
 
+struct ApprovalGridTopology(SessionGridTopology);
+
+impl From<network_bridge_event::NewGossipTopology> for ApprovalGridTopology {
+	fn from(topology: network_bridge_event::NewGossipTopology) -> Self {
+		let peers_x =
+			topology.our_neighbors_x.values().flat_map(|p| &p.peer_ids).cloned().collect();
+		let peers_y =
+			topology.our_neighbors_y.values().flat_map(|p| &p.peer_ids).cloned().collect();
+
+		let validator_indices_x =
+			topology.our_neighbors_x.values().map(|p| p.validator_index.clone()).collect();
+		let validator_indices_y =
+			topology.our_neighbors_y.values().map(|p| p.validator_index.clone()).collect();
+
+		ApprovalGridTopology(SessionGridTopology {
+			peers_x,
+			peers_y,
+			validator_indices_x,
+			validator_indices_y,
+		})
+	}
+}
+
 #[derive(PartialEq)]
 enum Resend {
 	Yes,
@@ -252,7 +196,7 @@ struct State {
 	peer_views: HashMap<PeerId, View>,
 
 	/// Keeps a topology for various different sessions.
-	topologies: SessionTopologies,
+	topologies: SessionGridTopologies,
 
 	/// Tracks recently finalized blocks.
 	recent_outdated_blocks: RecentlyOutdated,
@@ -361,58 +305,6 @@ impl ApprovalState {
 	}
 }
 
-#[derive(Debug, Clone, Copy, PartialEq)]
-enum RequiredRouting {
-	/// We don't know yet, because we're waiting for topology info
-	/// (race condition between learning about the first blocks in a new session
-	/// and getting the topology for that session)
-	PendingTopology,
-	/// Propagate to all peers of any kind.
-	All,
-	/// Propagate to all peers sharing either the X or Y dimension of the grid.
-	GridXY,
-	/// Propagate to all peers sharing the X dimension of the grid.
-	GridX,
-	/// Propagate to all peers sharing the Y dimension of the grid.
-	GridY,
-	/// No required propagation.
-	None,
-}
-
-impl RequiredRouting {
-	// Whether the required routing set is definitely empty.
-	fn is_empty(self) -> bool {
-		match self {
-			RequiredRouting::PendingTopology | RequiredRouting::None => true,
-			_ => false,
-		}
-	}
-}
-
-#[derive(Debug, Default, Clone, Copy)]
-struct RandomRouting {
-	// The number of peers to target.
-	target: usize,
-	// The number of peers this has been sent to.
-	sent: usize,
-}
-
-impl RandomRouting {
-	fn sample(&self, n_peers_total: usize, rng: &mut (impl CryptoRng + Rng)) -> bool {
-		if n_peers_total == 0 || self.sent >= self.target {
-			false
-		} else if RANDOM_SAMPLE_RATE > n_peers_total {
-			true
-		} else {
-			rng.gen_ratio(RANDOM_SAMPLE_RATE as _, n_peers_total as _)
-		}
-	}
-
-	fn inc_sent(&mut self) {
-		self.sent += 1
-	}
-}
-
 // routing state bundled with messages for the candidate. Corresponding assignments
 // and approvals are stored together and should be routed in the same way, with
 // assignments preceding approvals in all cases.
@@ -476,8 +368,12 @@ impl State {
 			},
 			NetworkBridgeEvent::NewGossipTopology(topology) => {
 				let session = topology.session;
-				self.handle_new_session_topology(ctx, session, SessionTopology::from(topology))
-					.await;
+				self.handle_new_session_topology(
+					ctx,
+					session,
+					ApprovalGridTopology::from(topology),
+				)
+				.await;
 			},
 			NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
 				self.handle_peer_view_change(ctx, metrics, peer_id, view, rng).await;
@@ -632,9 +528,9 @@ impl State {
 		ctx: &mut (impl SubsystemContext<Message = ApprovalDistributionMessage>
 		          + overseer::SubsystemContext<Message = ApprovalDistributionMessage>),
 		session: SessionIndex,
-		topology: SessionTopology,
+		topology: ApprovalGridTopology,
 	) {
-		self.topologies.insert_topology(session, topology);
+		self.topologies.insert_topology(session, topology.0);
 		let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
 
 		adjust_required_routing_and_propagate(
@@ -1000,7 +896,7 @@ impl State {
 				candidate_entry.messages.entry(validator_index).or_insert_with(|| MessageState {
 					required_routing,
 					local,
-					random_routing: RandomRouting { target: RANDOM_CIRCULATION, sent: 0 },
+					random_routing: Default::default(),
 					approval_state: ApprovalState::Assigned(assignment.cert.clone()),
 				})
 			},
@@ -1344,7 +1240,7 @@ impl State {
 		          + overseer::SubsystemContext<Message = ApprovalDistributionMessage>),
 		metrics: &Metrics,
 		entries: &mut HashMap<Hash, BlockEntry>,
-		topologies: &SessionTopologies,
+		topologies: &SessionGridTopologies,
 		total_peers: usize,
 		peer_id: PeerId,
 		view: View,
@@ -1592,7 +1488,7 @@ async fn adjust_required_routing_and_propagate(
 	ctx: &mut (impl SubsystemContext<Message = ApprovalDistributionMessage>
 	          + overseer::SubsystemContext<Message = ApprovalDistributionMessage>),
 	blocks: &mut HashMap<Hash, BlockEntry>,
-	topologies: &SessionTopologies,
+	topologies: &SessionGridTopologies,
 	block_filter: impl Fn(&mut BlockEntry) -> bool,
 	routing_modifier: impl Fn(&mut RequiredRouting, bool, &ValidatorIndex),
 ) {
diff --git a/polkadot/node/network/protocol/Cargo.toml b/polkadot/node/network/protocol/Cargo.toml
index ee8c0237240..08bd8bc3390 100644
--- a/polkadot/node/network/protocol/Cargo.toml
+++ b/polkadot/node/network/protocol/Cargo.toml
@@ -17,4 +17,5 @@ strum = { version = "0.24", features = ["derive"] }
 futures = "0.3.21"
 thiserror = "1.0.30"
 fatality = "0.0.6"
-derive_more = "0.99"
+rand = "0.8"
+derive_more = "0.99"
\ No newline at end of file
diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs
new file mode 100644
index 00000000000..f1823b345b5
--- /dev/null
+++ b/polkadot/node/network/protocol/src/grid_topology.rs
@@ -0,0 +1,195 @@
+// Copyright 2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Grid topology support implementation
+//! The basic operation of the 2D grid topology is that:
+//!   * A validator producing a message sends it to its row-neighbors and its column-neighbors
+//!   * A validator receiving a message originating from one of its row-neighbors sends it to its column-neighbors
+//!   * A validator receiving a message originating from one of its column-neighbors sends it to its row-neighbors
+//!
+//! This grid approach defines 2 unique paths for every validator to reach every other validator in at most 2 hops.
+//!
+//! However, we also supplement this with some degree of random propagation:
+//! every validator, upon seeing a message for the first time, propagates it to 8 random peers.
+//! This inserts some redundancy in case the grid topology isn't working or is being attacked -
+//! an adversary doesn't know which peers a validator will send to.
+//! This is combined with the property that the adversary doesn't know which validators will elect to check a block.
+//!
+
+use crate::PeerId;
+use polkadot_primitives::v2::{SessionIndex, ValidatorIndex};
+use rand::{CryptoRng, Rng};
+use std::{
+	collections::{hash_map, HashMap, HashSet},
+	fmt::Debug,
+};
+
+/// The sample rate for randomly propagating messages. This
+/// reduces the left tail of the binomial distribution but also
+/// introduces a bias towards peers who we sample before others
+/// (i.e. those who get a block before others).
+pub const DEFAULT_RANDOM_SAMPLE_RATE: usize = crate::MIN_GOSSIP_PEERS;
+
+/// The number of peers to randomly propagate messages to.
+pub const DEFAULT_RANDOM_CIRCULATION: usize = 4;
+
+/// Topology representation
+pub struct SessionGridTopology {
+	/// Represent peers in the X axis
+	pub peers_x: HashSet<PeerId>,
+	/// Represent validators in the X axis
+	pub validator_indices_x: HashSet<ValidatorIndex>,
+	/// Represent peers in the Y axis
+	pub peers_y: HashSet<PeerId>,
+	/// Represent validators in the Y axis
+	pub validator_indices_y: HashSet<ValidatorIndex>,
+}
+
+impl SessionGridTopology {
+	/// Given the originator of a message, indicates the part of the topology
+	/// we're meant to send the message to.
+	pub fn required_routing_for(&self, originator: ValidatorIndex, local: bool) -> RequiredRouting {
+		if local {
+			return RequiredRouting::GridXY
+		}
+
+		let grid_x = self.validator_indices_x.contains(&originator);
+		let grid_y = self.validator_indices_y.contains(&originator);
+
+		match (grid_x, grid_y) {
+			(false, false) => RequiredRouting::None,
+			(true, false) => RequiredRouting::GridY, // messages from X go to Y
+			(false, true) => RequiredRouting::GridX, // messages from Y go to X
+			(true, true) => RequiredRouting::GridXY, // if the grid works as expected, this shouldn't happen.
+		}
+	}
+
+	/// Get a filter function based on this topology and the required routing
+	/// which returns `true` for peers that are within the required routing set
+	/// and false otherwise.
+	pub fn route_to_peer(&self, required_routing: RequiredRouting, peer: &PeerId) -> bool {
+		match required_routing {
+			RequiredRouting::All => true,
+			RequiredRouting::GridX => self.peers_x.contains(peer),
+			RequiredRouting::GridY => self.peers_y.contains(peer),
+			RequiredRouting::GridXY => self.peers_x.contains(peer) || self.peers_y.contains(peer),
+			RequiredRouting::None | RequiredRouting::PendingTopology => false,
+		}
+	}
+}
+/// A set of topologies indexed by session
+#[derive(Default)]
+pub struct SessionGridTopologies {
+	inner: HashMap<SessionIndex, (Option<SessionGridTopology>, usize)>,
+}
+
+impl SessionGridTopologies {
+	/// Returns a topology for the specific session index
+	pub fn get_topology(&self, session: SessionIndex) -> Option<&SessionGridTopology> {
+		self.inner.get(&session).and_then(|val| val.0.as_ref())
+	}
+
+	/// Increase references counter for a specific topology
+	pub fn inc_session_refs(&mut self, session: SessionIndex) {
+		self.inner.entry(session).or_insert((None, 0)).1 += 1;
+	}
+
+	/// Decrease references counter for a specific topology
+	pub fn dec_session_refs(&mut self, session: SessionIndex) {
+		if let hash_map::Entry::Occupied(mut occupied) = self.inner.entry(session) {
+			occupied.get_mut().1 = occupied.get().1.saturating_sub(1);
+			if occupied.get().1 == 0 {
+				let _ = occupied.remove();
+			}
+		}
+	}
+
+	/// Insert a new topology, no-op if already present.
+	pub fn insert_topology(&mut self, session: SessionIndex, topology: SessionGridTopology) {
+		let entry = self.inner.entry(session).or_insert((None, 0));
+		if entry.0.is_none() {
+			entry.0 = Some(topology);
+		}
+	}
+}
+/// A representation of routing based on sample
+#[derive(Debug, Clone, Copy)]
+pub struct RandomRouting {
+	/// The number of peers to target.
+	target: usize,
+	/// The number of peers this has been sent to.
+	sent: usize,
+	/// Sampling rate
+	sample_rate: usize,
+}
+
+impl Default for RandomRouting {
+	fn default() -> Self {
+		RandomRouting {
+			target: DEFAULT_RANDOM_CIRCULATION,
+			sent: 0_usize,
+			sample_rate: DEFAULT_RANDOM_SAMPLE_RATE,
+		}
+	}
+}
+
+impl RandomRouting {
+	/// Perform random sampling for a specific peer
+	/// Returns `true` for a lucky peer
+	pub fn sample(&self, n_peers_total: usize, rng: &mut (impl CryptoRng + Rng)) -> bool {
+		if n_peers_total == 0 || self.sent >= self.target {
+			false
+		} else if self.sample_rate > n_peers_total {
+			true
+		} else {
+			rng.gen_ratio(self.sample_rate as _, n_peers_total as _)
+		}
+	}
+
+	/// Increase number of messages being sent
+	pub fn inc_sent(&mut self) {
+		self.sent += 1
+	}
+}
+
+/// Routing mode
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum RequiredRouting {
+	/// We don't know yet, because we're waiting for topology info
+	/// (race condition between learning about the first blocks in a new session
+	/// and getting the topology for that session)
+	PendingTopology,
+	/// Propagate to all peers of any kind.
+	All,
+	/// Propagate to all peers sharing either the X or Y dimension of the grid.
+	GridXY,
+	/// Propagate to all peers sharing the X dimension of the grid.
+	GridX,
+	/// Propagate to all peers sharing the Y dimension of the grid.
+	GridY,
+	/// No required propagation.
+	None,
+}
+
+impl RequiredRouting {
+	/// Whether the required routing set is definitely empty.
+	pub fn is_empty(self) -> bool {
+		match self {
+			RequiredRouting::PendingTopology | RequiredRouting::None => true,
+			_ => false,
+		}
+	}
+}
diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs
index cd659ad090b..78727ae67e8 100644
--- a/polkadot/node/network/protocol/src/lib.rs
+++ b/polkadot/node/network/protocol/src/lib.rs
@@ -40,6 +40,8 @@ pub mod request_response;
 
 /// Accessing authority discovery service
 pub mod authority_discovery;
+/// Grid topology support module
+pub mod grid_topology;
 
 /// A version of the protocol.
 pub type ProtocolVersion = u32;
diff --git a/polkadot/scripts/gitlab/lingua.dic b/polkadot/scripts/gitlab/lingua.dic
index bf70b0512e4..89f944315f0 100644
--- a/polkadot/scripts/gitlab/lingua.dic
+++ b/polkadot/scripts/gitlab/lingua.dic
@@ -1,4 +1,5 @@
 150
+2D
 A&V
 accessor/MS
 AccountId
@@ -270,6 +271,7 @@ tera/M
 teleports
 timeframe
 timestamp/MS
+topologies
 tradeoff
 transitionary
 trie/MS
-- 
GitLab