From d1d33abdf8d0c74608987a0249082b465f7699f6 Mon Sep 17 00:00:00 2001
From: Robert Klotzner <eskimor@users.noreply.github.com>
Date: Mon, 12 Apr 2021 16:15:25 +0200
Subject: [PATCH] More tests for new request based statement distribution
 (#2875)

* More test coverage.

* Preserve peer order.

* Better test coverage.

* Even more test coverage.

* Add doc comment to `IndexMap`.

* Fix flaky test.

* Review remarks.

* Review remarks.
---
 polkadot/Cargo.lock                           |   2 +
 .../network/statement-distribution/Cargo.toml |   2 +
 .../network/statement-distribution/src/lib.rs | 225 ++++++++++++++++--
 .../statement-distribution/src/requester.rs   |   5 +
 4 files changed, 213 insertions(+), 21 deletions(-)

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index b37207105ee..4d0f4712b6c 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -6507,6 +6507,7 @@ dependencies = [
  "arrayvec 0.5.2",
  "assert_matches",
  "futures 0.3.13",
+ "futures-timer 3.0.2",
  "indexmap",
  "parity-scale-codec",
  "polkadot-node-network-protocol",
@@ -6522,6 +6523,7 @@ dependencies = [
  "sp-keyring",
  "sp-keystore",
  "sp-staking",
+ "sp-tracing",
  "tracing",
 ]
 
diff --git a/polkadot/node/network/statement-distribution/Cargo.toml b/polkadot/node/network/statement-distribution/Cargo.toml
index ead19ecdb4b..b4648e1ac2d 100644
--- a/polkadot/node/network/statement-distribution/Cargo.toml
+++ b/polkadot/node/network/statement-distribution/Cargo.toml
@@ -26,4 +26,6 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master
 sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
+futures-timer = "3.0.2"
diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs
index 3d8c428e8ef..f794f0f23ed 100644
--- a/polkadot/node/network/statement-distribution/src/lib.rs
+++ b/polkadot/node/network/statement-distribution/src/lib.rs
@@ -53,7 +53,7 @@ use polkadot_node_network_protocol::{
 
 use futures::{channel::mpsc, future::RemoteHandle, prelude::*};
 use futures::channel::oneshot;
-use indexmap::IndexSet;
+use indexmap::{IndexSet, IndexMap, map::Entry as IEntry};
 
 use std::collections::{HashMap, HashSet, hash_map::Entry};
 
@@ -508,7 +508,10 @@ enum LargeStatementStatus {
 struct FetchingInfo {
 	/// All peers that send us a `LargeStatement` or a `Valid` statement for the given
 	/// `CandidateHash`, together with their originally sent messages.
-	available_peers: HashMap<PeerId, Vec<protocol_v1::StatementDistributionMessage>>,
+	///
+	/// We use an `IndexMap` here to preserve the ordering of peers sending us messages. This is
+	/// desirable because we reward first sending peers with reputation.
+	available_peers: IndexMap<PeerId, Vec<protocol_v1::StatementDistributionMessage>>,
 	/// Peers left to try in case the background task needs it.
 	peers_to_try: Vec<PeerId>,
 	/// Sender for sending fresh peers to the fetching task in case of failure.
@@ -1057,22 +1060,24 @@ async fn retrieve_statement_from_message<'a>(
 			match occupied.get_mut() {
 				LargeStatementStatus::Fetching(info) => {
 
-					let is_new_peer = !info.available_peers.contains_key(&peer);
 					let is_large_statement = message.is_large_statement();
 
-					match info.available_peers.entry(peer) {
-						Entry::Occupied(mut occupied) => {
-							occupied.get_mut().push(message);
-						}
-						Entry::Vacant(vacant) => {
-							vacant.insert(vec![message]);
-						}
-					}
+					let is_new_peer =
+						match info.available_peers.entry(peer) {
+							IEntry::Occupied(mut occupied) => {
+								occupied.get_mut().push(message);
+								false
+							}
+							IEntry::Vacant(vacant) => {
+								vacant.insert(vec![message]);
+								true
+							}
+					};
 
 					if is_new_peer & is_large_statement {
 						info.peers_to_try.push(peer);
 						// Answer any pending request for more peers:
-						if let Some(sender) = std::mem::take(&mut info.peer_sender) {
+						if let Some(sender) = info.peer_sender.take() {
 							let to_send = std::mem::take(&mut info.peers_to_try);
 							if let Err(peers) = sender.send(to_send) {
 								// Requester no longer interested for now, might want them
@@ -1181,7 +1186,7 @@ async fn launch_request(
 		return None
 	}
 	let available_peers = {
-		let mut m = HashMap::new();
+		let mut m = IndexMap::new();
 		m.insert(peer, vec![protocol_v1::StatementDistributionMessage::LargeStatement(meta)]);
 		m
 	};
@@ -1950,6 +1955,7 @@ impl metrics::Metrics for Metrics {
 
 #[cfg(test)]
 mod tests {
+	use std::time::Duration;
 	use parity_scale_codec::{Decode, Encode};
 	use super::*;
 	use std::sync::Arc;
@@ -1959,9 +1965,10 @@ mod tests {
 	use polkadot_primitives::v1::{CommittedCandidateReceipt, ValidationCode};
 	use assert_matches::assert_matches;
 	use futures::executor::{self, block_on};
+	use futures_timer::Delay;
 	use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
 	use sc_keystore::LocalKeystore;
-	use polkadot_node_network_protocol::{view, ObservedRole};
+	use polkadot_node_network_protocol::{view, ObservedRole, request_response::Recipient};
 	use polkadot_subsystem::{jaeger, ActivatedLeaf};
 	use polkadot_node_network_protocol::request_response::{
 		Requests,
@@ -2699,6 +2706,7 @@ mod tests {
 
 	#[test]
 	fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing() {
+		sp_tracing::try_init_simple();
 		let hash_a = Hash::repeat_byte(1);
 		let hash_b = Hash::repeat_byte(2);
 
@@ -2712,6 +2720,8 @@ mod tests {
 
 		let peer_a = PeerId::random();
 		let peer_b = PeerId::random();
+		let peer_c = PeerId::random();
+		let peer_bad = PeerId::random();
 
 		let validators = vec![
 			Sr25519Keyring::Alice.public().into(),
@@ -2780,6 +2790,16 @@ mod tests {
 					NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
 				)
 			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full)
+				)
+			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full)
+				)
+			}).await;
 
 			handle.send(FromOverseer::Communication {
 				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
@@ -2792,8 +2812,19 @@ mod tests {
 					NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a])
 				)
 			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a])
+				)
+			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a])
+				)
+			}).await;
 
-			// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
+			// receive a seconded statement from peer A, which does not provide the request data,
+			// then get that data from peer C. It should be propagated onwards to peer B and to
 			// candidate backing.
 			let statement = {
 				let signing_context = SigningContext {
@@ -2815,7 +2846,7 @@ mod tests {
 				).await.ok().flatten().expect("should be signed")
 			};
 
-			let metadata = 
+			let metadata =
 				protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()).get_metadata();
 
 			handle.send(FromOverseer::Communication {
@@ -2842,6 +2873,150 @@ mod tests {
 					let req = outgoing.payload;
 					assert_eq!(req.relay_parent, metadata.relay_parent);
 					assert_eq!(req.candidate_hash, metadata.candidate_hash);
+					assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
+					// Just drop request - should trigger error.
+				}
+			);
+
+			// There is a race between request handler asking for more peers and processing of the
+			// coming `PeerMessage`s, we want the request handler to ask first here for better test
+			// coverage:
+			Delay::new(Duration::from_millis(20)).await;
+
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerMessage(
+						peer_c.clone(),
+						protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
+					)
+				)
+			}).await;
+
+			// Malicious peer:
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerMessage(
+						peer_bad.clone(),
+						protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
+					)
+				)
+			}).await;
+
+			// Let c fail once too:
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::SendRequests(
+						mut reqs, IfDisconnected::ImmediateError
+					)
+				) => {
+					let reqs = reqs.pop().unwrap();
+					let outgoing = match reqs {
+						Requests::StatementFetching(outgoing) => outgoing,
+						_ => panic!("Unexpected request"),
+					};
+					let req = outgoing.payload;
+					assert_eq!(req.relay_parent, metadata.relay_parent);
+					assert_eq!(req.candidate_hash, metadata.candidate_hash);
+					assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
+				}
+			);
+
+			// a fails again:
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::SendRequests(
+						mut reqs, IfDisconnected::ImmediateError
+					)
+				) => {
+					let reqs = reqs.pop().unwrap();
+					let outgoing = match reqs {
+						Requests::StatementFetching(outgoing) => outgoing,
+						_ => panic!("Unexpected request"),
+					};
+					let req = outgoing.payload;
+					assert_eq!(req.relay_parent, metadata.relay_parent);
+					assert_eq!(req.candidate_hash, metadata.candidate_hash);
+					// On retry, we should have reverse order:
+					assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
+				}
+			);
+
+			// Send invalid response (all other peers have been tried now):
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::SendRequests(
+						mut reqs, IfDisconnected::ImmediateError
+					)
+				) => {
+					let reqs = reqs.pop().unwrap();
+					let outgoing = match reqs {
+						Requests::StatementFetching(outgoing) => outgoing,
+						_ => panic!("Unexpected request"),
+					};
+					let req = outgoing.payload;
+					assert_eq!(req.relay_parent, metadata.relay_parent);
+					assert_eq!(req.candidate_hash, metadata.candidate_hash);
+					assert_eq!(outgoing.peer, Recipient::Peer(peer_bad));
+					let bad_candidate = {
+						let mut bad = candidate.clone();
+						bad.descriptor.para_id = 0xeadbeaf.into();
+						bad
+					};
+					let response = StatementFetchingResponse::Statement(bad_candidate);
+					outgoing.pending_response.send(Ok(response.encode())).unwrap();
+				}
+			);
+
+			// Should get punished and never tried again:
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::ReportPeer(p, r)
+				) if p == peer_bad && r == COST_WRONG_HASH => {}
+			);
+
+			// a is tried again (retried in reverse order):
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::SendRequests(
+						mut reqs, IfDisconnected::ImmediateError
+					)
+				) => {
+					let reqs = reqs.pop().unwrap();
+					let outgoing = match reqs {
+						Requests::StatementFetching(outgoing) => outgoing,
+						_ => panic!("Unexpected request"),
+					};
+					let req = outgoing.payload;
+					assert_eq!(req.relay_parent, metadata.relay_parent);
+					assert_eq!(req.candidate_hash, metadata.candidate_hash);
+					// On retry, we should have reverse order:
+					assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
+				}
+			);
+
+			// c succeeds now:
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::SendRequests(
+						mut reqs, IfDisconnected::ImmediateError
+					)
+				) => {
+					let reqs = reqs.pop().unwrap();
+					let outgoing = match reqs {
+						Requests::StatementFetching(outgoing) => outgoing,
+						_ => panic!("Unexpected request"),
+					};
+					let req = outgoing.payload;
+					assert_eq!(req.relay_parent, metadata.relay_parent);
+					assert_eq!(req.candidate_hash, metadata.candidate_hash);
+					// On retry, we should have reverse order:
+					assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
 					let response = StatementFetchingResponse::Statement(candidate.clone());
 					outgoing.pending_response.send(Ok(response.encode())).unwrap();
 				}
@@ -2851,7 +3026,14 @@ mod tests {
 				handle.recv().await,
 				AllMessages::NetworkBridge(
 					NetworkBridgeMessage::ReportPeer(p, r)
-				) if p == peer_a && r == BENEFIT_VALID_RESPONSE => {}
+				) if p == peer_a && r == COST_FETCH_FAIL => {}
+			);
+
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::ReportPeer(p, r)
+				) if p == peer_c && r == BENEFIT_VALID_RESPONSE => {}
 			);
 
 			assert_matches!(
@@ -2869,17 +3051,18 @@ mod tests {
 			);
 
 
+			// Now messages should go out:
 			assert_matches!(
 				handle.recv().await,
 				AllMessages::NetworkBridge(
 					NetworkBridgeMessage::SendValidationMessage(
-						recipients,
+						mut recipients,
 						protocol_v1::ValidationProtocol::StatementDistribution(
 							protocol_v1::StatementDistributionMessage::LargeStatement(meta)
 						),
 					)
 				) => {
-					assert_eq!(recipients, vec![peer_b.clone()]);
+					assert_eq!(recipients.sort(), vec![peer_b.clone(), peer_c.clone()].sort());
 					assert_eq!(meta.relay_parent, hash_a);
 					assert_eq!(meta.candidate_hash, statement.payload().candidate_hash());
 					assert_eq!(meta.signed_by, statement.validator_index());
@@ -2889,7 +3072,7 @@ mod tests {
 
 			// Now that it has the candidate it should answer requests accordingly (even after a
 			// failed request):
-			
+
 			// Failing request first:
 			let (pending_response, response_rx) = oneshot::channel();
 			let inner_req = StatementFetchingRequest {
@@ -2906,7 +3089,7 @@ mod tests {
 				response_rx.await.unwrap().result,
 				Err(()) => {}
 			);
-		
+
 			// Now the working one:
 			let (pending_response, response_rx) = oneshot::channel();
 			let inner_req = StatementFetchingRequest {
diff --git a/polkadot/node/network/statement-distribution/src/requester.rs b/polkadot/node/network/statement-distribution/src/requester.rs
index 5d786d52acf..66a7979eda2 100644
--- a/polkadot/node/network/statement-distribution/src/requester.rs
+++ b/polkadot/node/network/statement-distribution/src/requester.rs
@@ -178,6 +178,11 @@ pub async fn fetch(
 		// All our peers failed us - try getting new ones before trying again:
 		match try_get_new_peers(relay_parent, candidate_hash, &mut sender, &span).await {
 			Ok(Some(mut peers)) => {
+				tracing::trace!(
+					target: LOG_TARGET,
+					?peers,
+					"Received new peers."
+				);
 				// New arrivals will be tried first:
 				new_peers.append(&mut peers);
 			}
-- 
GitLab