lib.rs 55.2 KiB
Newer Older
							&mut active_heads,
							&mut ctx,
							relay_parent,
							statement,
							&metrics,
					StatementDistributionMessage::NetworkBridgeUpdateV1(event) => {
						let _timer = metrics.time_network_bridge_update_v1();

						handle_network_update(
							&mut peers,
							&mut active_heads,
							&mut ctx,
							&mut our_view,
							event,
							&metrics,
					StatementDistributionMessage::RegisterStatementListener(tx) => {
						statement_listeners.push(tx);
					}
		Ok(())
	}
}

#[derive(Clone)]
struct MetricsInner {
	statements_distributed: prometheus::Counter<prometheus::U64>,
	active_leaves_update: prometheus::Histogram,
	share: prometheus::Histogram,
	network_bridge_update_v1: prometheus::Histogram,
}

/// Statement Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_statement_distributed(&self) {
		if let Some(metrics) = &self.0 {
			metrics.statements_distributed.inc();
		}
	}

	/// Provide a timer for `active_leaves_update` which observes on drop.
	fn time_active_leaves_update(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer())
	}

	/// Provide a timer for `share` which observes on drop.
	fn time_share(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.share.start_timer())
	}

	/// Provide a timer for `network_bridge_update_v1` which observes on drop.
	fn time_network_bridge_update_v1(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.network_bridge_update_v1.start_timer())
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			statements_distributed: prometheus::register(
				prometheus::Counter::new(
					"parachain_statements_distributed_total",
					"Number of candidate validity statements distributed to other peers."
				)?,
				registry,
			)?,
			active_leaves_update: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_statement_distribution_active_leaves_update",
						"Time spent within `statement_distribution::active_leaves_update`",
					)
				)?,
				registry,
			)?,
			share: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_statement_distribution_share",
						"Time spent within `statement_distribution::share`",
					)
				)?,
				registry,
			)?,
			network_bridge_update_v1: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_statement_distribution_network_bridge_update_v1",
						"Time spent within `statement_distribution::network_bridge_update_v1`",
					)
				)?,
				registry,
			)?,
		};
		Ok(Metrics(Some(metrics)))
	use sp_keyring::Sr25519Keyring;
	use sp_application_crypto::AppKey;
	use node_primitives::Statement;
	use polkadot_primitives::v1::CommittedCandidateReceipt;
	use assert_matches::assert_matches;
	use futures::executor::{self, block_on};
	use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
	use sc_keystore::LocalKeystore;
	use polkadot_node_network_protocol::{view, ObservedRole};

	#[test]
	fn active_head_accepts_only_2_seconded_per_validator() {
		let validators = vec![
			Sr25519Keyring::Alice.public().into(),
			Sr25519Keyring::Bob.public().into(),
			Sr25519Keyring::Charlie.public().into(),
		];
		let parent_hash: Hash = [1; 32].into();

		let session_index = 1;
		let signing_context = SigningContext {
			parent_hash,
			session_index,
		};

		let candidate_a = {
			let mut c = CommittedCandidateReceipt::default();
			c.descriptor.relay_parent = parent_hash;
			c.descriptor.para_id = 1.into();
			let mut c = CommittedCandidateReceipt::default();
			c.descriptor.relay_parent = parent_hash;
			c.descriptor.para_id = 2.into();
			let mut c = CommittedCandidateReceipt::default();
			c.descriptor.relay_parent = parent_hash;
			c.descriptor.para_id = 3.into();
		let mut head_data = ActiveHeadData::new(validators, session_index, &parent_hash);
		let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
		let alice_public = SyncCryptoStore::sr25519_generate_new(
			&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
		).unwrap();
		let bob_public = SyncCryptoStore::sr25519_generate_new(
			&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Bob.to_seed())
		).unwrap();

		let a_seconded_val_0 = block_on(SignedFullStatement::sign(
			&keystore,
			Statement::Seconded(candidate_a.clone()),
			&signing_context,
			0,
			&alice_public.into(),
		)).expect("should be signed");
		let noted = head_data.note_statement(a_seconded_val_0.clone());

		assert_matches!(noted, NotedStatement::Fresh(_));

		// note A (duplicate)
		let noted = head_data.note_statement(a_seconded_val_0);

		assert_matches!(noted, NotedStatement::UsefulButKnown);

		// note B
		let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
			&keystore,
			Statement::Seconded(candidate_b.clone()),
			&signing_context,
			0,
			&alice_public.into(),
		)).expect("should be signed"));

		assert_matches!(noted, NotedStatement::Fresh(_));

		// note C (beyond 2 - ignored)
		let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
			&keystore,
			Statement::Seconded(candidate_c.clone()),
			&signing_context,
			0,
			&alice_public.into(),
		)).expect("should be signed"));

		assert_matches!(noted, NotedStatement::NotUseful);

		// note B (new validator)
		let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
			&keystore,
			Statement::Seconded(candidate_b.clone()),
			&signing_context,
			1,
			&bob_public.into(),
		)).expect("should be signed"));

		assert_matches!(noted, NotedStatement::Fresh(_));

		// note C (new validator)
		let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
			&keystore,
			Statement::Seconded(candidate_c.clone()),
			&signing_context,
			1,
			&bob_public.into(),
		)).expect("should be signed"));

		assert_matches!(noted, NotedStatement::Fresh(_));
	}

	#[test]
	fn note_local_works() {
		let hash_a = CandidateHash([1; 32].into());
		let hash_b = CandidateHash([2; 32].into());

		let mut per_peer_tracker = VcPerPeerTracker::default();
		per_peer_tracker.note_local(hash_a.clone());
		per_peer_tracker.note_local(hash_b.clone());

		assert!(per_peer_tracker.local_observed.contains(&hash_a));
		assert!(per_peer_tracker.local_observed.contains(&hash_b));

		assert!(!per_peer_tracker.remote_observed.contains(&hash_a));
		assert!(!per_peer_tracker.remote_observed.contains(&hash_b));
	}

	#[test]
	fn note_remote_works() {
		let hash_a = CandidateHash([1; 32].into());
		let hash_b = CandidateHash([2; 32].into());
		let hash_c = CandidateHash([3; 32].into());

		let mut per_peer_tracker = VcPerPeerTracker::default();
		assert!(per_peer_tracker.note_remote(hash_a.clone()));
		assert!(per_peer_tracker.note_remote(hash_b.clone()));
		assert!(!per_peer_tracker.note_remote(hash_c.clone()));

		assert!(per_peer_tracker.remote_observed.contains(&hash_a));
		assert!(per_peer_tracker.remote_observed.contains(&hash_b));
		assert!(!per_peer_tracker.remote_observed.contains(&hash_c));

		assert!(!per_peer_tracker.local_observed.contains(&hash_a));
		assert!(!per_peer_tracker.local_observed.contains(&hash_b));
		assert!(!per_peer_tracker.local_observed.contains(&hash_c));
	}

	#[test]
	fn per_peer_relay_parent_knowledge_send() {
		let mut knowledge = PeerRelayParentKnowledge::default();

		let hash_a = CandidateHash([1; 32].into());

		// Sending an un-pinned statement should not work and should have no effect.
		assert!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)).is_none());
		assert!(!knowledge.known_candidates.contains(&hash_a));
		assert!(knowledge.sent_statements.is_empty());
		assert!(knowledge.received_statements.is_empty());
		assert!(knowledge.seconded_counts.is_empty());
		assert!(knowledge.received_message_count.is_empty());

		// Make the peer aware of the candidate.
		assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)), Some(true));
		assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 1)), Some(false));
		assert!(knowledge.known_candidates.contains(&hash_a));
		assert_eq!(knowledge.sent_statements.len(), 2);
		assert!(knowledge.received_statements.is_empty());
		assert_eq!(knowledge.seconded_counts.len(), 2);
		assert!(knowledge.received_message_count.get(&hash_a).is_none());

		// And now it should accept the dependent message.
		assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)), Some(false));
		assert!(knowledge.known_candidates.contains(&hash_a));
		assert_eq!(knowledge.sent_statements.len(), 3);
		assert!(knowledge.received_statements.is_empty());
		assert_eq!(knowledge.seconded_counts.len(), 2);
		assert!(knowledge.received_message_count.get(&hash_a).is_none());
	}

	#[test]
	fn cant_send_after_receiving() {
		let mut knowledge = PeerRelayParentKnowledge::default();

		let hash_a = CandidateHash([1; 32].into());
		assert!(knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3).unwrap());
		assert!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)).is_none());
	}

	#[test]
	fn per_peer_relay_parent_knowledge_receive() {
		let mut knowledge = PeerRelayParentKnowledge::default();

		let hash_a = CandidateHash([1; 32].into());

		assert_eq!(
			knowledge.receive(&(CompactStatement::Valid(hash_a), 0), 3),
			Err(COST_UNEXPECTED_STATEMENT),
		);

		assert_eq!(
			knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3),
			Ok(true),
		);

		// Push statements up to the flood limit.
		assert_eq!(
			knowledge.receive(&(CompactStatement::Valid(hash_a), 1), 3),
			Ok(false),
		);

		assert!(knowledge.known_candidates.contains(&hash_a));
		assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2);

		assert_eq!(
			knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3),
			Ok(false),
		);

		assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3);

		assert_eq!(
			knowledge.receive(&(CompactStatement::Valid(hash_a), 7), 3),
			Err(COST_APPARENT_FLOOD),
		);

		assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3);
		assert_eq!(knowledge.received_statements.len(), 3); // number of prior `Ok`s.

		// Now make sure that the seconding limit is respected.
		let hash_b = CandidateHash([2; 32].into());
		let hash_c = CandidateHash([3; 32].into());

		assert_eq!(
			knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3),
			Ok(true),
		);

		assert_eq!(
			knowledge.receive(&(CompactStatement::Candidate(hash_c), 0), 3),
			Err(COST_UNEXPECTED_STATEMENT),
		);

		// Last, make sure that already-known statements are disregarded.
		assert_eq!(
			knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3),
			Err(COST_DUPLICATE_STATEMENT),
		);

		assert_eq!(
			knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3),
			Err(COST_DUPLICATE_STATEMENT),
		);
	}

	#[test]
	fn peer_view_update_sends_messages() {
		let hash_a = Hash::repeat_byte(1);
		let hash_b = Hash::repeat_byte(2);
		let hash_c = Hash::repeat_byte(3);
			let mut c = CommittedCandidateReceipt::default();
			c.descriptor.relay_parent = hash_c;
			c.descriptor.para_id = 1.into();
			c
		};
		let candidate_hash = candidate.hash();

		let old_view = view![hash_a, hash_b];
		let new_view = view![hash_b, hash_c];

		let mut active_heads = HashMap::new();
		let validators = vec![
			Sr25519Keyring::Alice.public().into(),
			Sr25519Keyring::Bob.public().into(),
			Sr25519Keyring::Charlie.public().into(),
		];

		let session_index = 1;
		let signing_context = SigningContext {
			parent_hash: hash_c,
			session_index,
		};

		let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());

		let alice_public = SyncCryptoStore::sr25519_generate_new(
			&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
		).unwrap();
		let bob_public = SyncCryptoStore::sr25519_generate_new(
			&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Bob.to_seed())
		).unwrap();
		let charlie_public = SyncCryptoStore::sr25519_generate_new(
			&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Charlie.to_seed())
		).unwrap();

			let mut data = ActiveHeadData::new(validators, session_index, &hash_c);
			let noted = data.note_statement(block_on(SignedFullStatement::sign(
				&keystore,
				Statement::Seconded(candidate.clone()),
				&signing_context,
				0,
				&alice_public.into(),
			)).expect("should be signed"));

			assert_matches!(noted, NotedStatement::Fresh(_));

			let noted = data.note_statement(block_on(SignedFullStatement::sign(
				&keystore,
				Statement::Valid(candidate_hash),
				&signing_context,
				1,
				&bob_public.into(),
			)).expect("should be signed"));

			assert_matches!(noted, NotedStatement::Fresh(_));

			let noted = data.note_statement(block_on(SignedFullStatement::sign(
				&keystore,
				Statement::Valid(candidate_hash),
				&signing_context,
				2,
				&charlie_public.into(),
			)).expect("should be signed"));

			assert_matches!(noted, NotedStatement::Fresh(_));

			data
		};

		active_heads.insert(hash_c, new_head_data);

		let mut peer_data = PeerData {
			view: old_view,
			view_knowledge: {
				let mut k = HashMap::new();

				k.insert(hash_a, Default::default());
				k.insert(hash_b, Default::default());

				k
			},
		};

		let pool = sp_core::testing::TaskExecutor::new();
		let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
		let peer = PeerId::random();

		executor::block_on(async move {
			update_peer_view_and_send_unlocked(
				peer.clone(),
				&mut peer_data,
				&mut ctx,
				&active_heads,
				new_view.clone(),
				&Default::default(),

			assert_eq!(peer_data.view, new_view);
			assert!(!peer_data.view_knowledge.contains_key(&hash_a));
			assert!(peer_data.view_knowledge.contains_key(&hash_b));

			let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap();

			assert!(c_knowledge.known_candidates.contains(&candidate_hash));
			assert!(c_knowledge.sent_statements.contains(
				&(CompactStatement::Candidate(candidate_hash), 0)
			));
			assert!(c_knowledge.sent_statements.contains(
				&(CompactStatement::Valid(candidate_hash), 1)
			));
			assert!(c_knowledge.sent_statements.contains(
				&(CompactStatement::Valid(candidate_hash), 2)
			));

			// now see if we got the 3 messages from the active head data.
			let active_head = active_heads.get(&hash_c).unwrap();

			// semi-fragile because hashmap iterator ordering is undefined, but in practice
			// it will not change between runs of the program.
			for statement in active_head.statements_about(candidate_hash) {
				let message = handle.recv().await;
				let expected_to = vec![peer.clone()];
				let expected_payload
					= statement_message(hash_c, statement.statement.clone());
					AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
						to,
						payload,
					)) => {
						assert_eq!(to, expected_to);
						assert_eq!(payload, expected_payload)
					}
				)
			}
		});
	}

	#[test]
	fn circulated_statement_goes_to_all_peers_with_view() {
		let hash_a = Hash::repeat_byte(1);
		let hash_b = Hash::repeat_byte(2);
		let hash_c = Hash::repeat_byte(3);
			let mut c = CommittedCandidateReceipt::default();
			c.descriptor.relay_parent = hash_b;
			c.descriptor.para_id = 1.into();
			c
		};

		let peer_a = PeerId::random();
		let peer_b = PeerId::random();
		let peer_c = PeerId::random();

		let peer_a_view = view![hash_a];
		let peer_b_view = view![hash_a, hash_b];
		let peer_c_view = view![hash_b, hash_c];

		let session_index = 1;

		let peer_data_from_view = |view: View| PeerData {
			view: view.clone(),
			view_knowledge: view.heads.iter().map(|v| (v.clone(), Default::default())).collect(),
		};

		let mut peer_data: HashMap<_, _> = vec![
			(peer_a.clone(), peer_data_from_view(peer_a_view)),
			(peer_b.clone(), peer_data_from_view(peer_b_view)),
			(peer_c.clone(), peer_data_from_view(peer_c_view)),
		].into_iter().collect();

		let pool = sp_core::testing::TaskExecutor::new();
		let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);

		executor::block_on(async move {
			let statement = {
				let signing_context = SigningContext {
					parent_hash: hash_b,
					session_index,
				};

				let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
				let alice_public = CryptoStore::sr25519_generate_new(
					&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
				).await.unwrap();

				let statement = SignedFullStatement::sign(
					Statement::Seconded(candidate),
					&signing_context,
					0,
					&alice_public.into(),
				).await.expect("should be signed");

				StoredStatement {
					comparator: StoredStatementComparator {
						compact: statement.payload().to_compact(),
						validator_index: 0,
						signature: statement.signature().clone()
					},
					statement,
				}
			};

			let needs_dependents = circulate_statement(
				&mut peer_data,
				&mut ctx,
				hash_b,
				&statement,

			{
				assert_eq!(needs_dependents.len(), 2);
				assert!(needs_dependents.contains(&peer_b));
				assert!(needs_dependents.contains(&peer_c));
			}

			let fingerprint = (statement.compact().clone(), 0);

			assert!(
				peer_data.get(&peer_b).unwrap()
				.view_knowledge.get(&hash_b).unwrap()
				.sent_statements.contains(&fingerprint),
			);

			assert!(
				peer_data.get(&peer_c).unwrap()
				.view_knowledge.get(&hash_b).unwrap()
				.sent_statements.contains(&fingerprint),
			);

			let message = handle.recv().await;
			assert_matches!(
				message,
				AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
					to,
					payload,
				)) => {
					assert_eq!(to.len(), 2);
					assert!(to.contains(&peer_b));
					assert!(to.contains(&peer_c));

					assert_eq!(
						payload,
						statement_message(hash_b, statement.statement.clone()),

	#[test]
	fn receiving_from_one_sends_to_another_and_to_candidate_backing() {
		let hash_a = Hash::repeat_byte(1);

		let candidate = {
			let mut c = CommittedCandidateReceipt::default();
			c.descriptor.relay_parent = hash_a;
			c.descriptor.para_id = 1.into();
			c
		};

		let peer_a = PeerId::random();
		let peer_b = PeerId::random();

		let validators = vec![
			Sr25519Keyring::Alice.public().into(),
			Sr25519Keyring::Bob.public().into(),
			Sr25519Keyring::Charlie.public().into(),
		];

		let session_index = 1;

		let pool = sp_core::testing::TaskExecutor::new();
		let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);

		let bg = async move {
			let s = StatementDistribution { metrics: Default::default() };
			s.run(ctx).await.unwrap();
		};

		let test_fut = async move {
			// register our active heads.
			handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
				activated: vec![hash_a].into(),
				deactivated: vec![].into(),
			}))).await;

			assert_matches!(
				handle.recv().await,
				AllMessages::RuntimeApi(
					RuntimeApiMessage::Request(r, RuntimeApiRequest::Validators(tx))
				)
					if r == hash_a
				=> {
					let _ = tx.send(Ok(validators));
				}
			);

			assert_matches!(
				handle.recv().await,
				AllMessages::RuntimeApi(
					RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
				)
					if r == hash_a
				=> {
					let _ = tx.send(Ok(session_index));
				}
			);

			// notify of peers and view
			handle.send(FromOverseer::Communication {
				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
					NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full)
				)
			}).await;

			handle.send(FromOverseer::Communication {
				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
					NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
				)
			}).await;

			handle.send(FromOverseer::Communication {
				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
					NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a])
				)
			}).await;

			handle.send(FromOverseer::Communication {
				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
					NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a])
				)
			}).await;

			handle.send(FromOverseer::Communication {
				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
					NetworkBridgeEvent::OurViewChange(view![hash_a])
				)
			}).await;

			// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
			// candidate backing.
			let statement = {
				let signing_context = SigningContext {
					parent_hash: hash_a,
					session_index,
				};

				let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
				let alice_public = CryptoStore::sr25519_generate_new(
					&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
				).await.unwrap();

				SignedFullStatement::sign(
					&keystore,
					Statement::Seconded(candidate),
					&signing_context,
					0,
					&alice_public.into(),
				).await.expect("should be signed")
			};

			handle.send(FromOverseer::Communication {
				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
					NetworkBridgeEvent::PeerMessage(
						peer_a.clone(),
						protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()),
					)
				)
			}).await;

			assert_matches!(
				handle.recv().await,
				AllMessages::NetworkBridge(
					NetworkBridgeMessage::ReportPeer(p, r)
				) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {}
			);

			assert_matches!(
				handle.recv().await,
				AllMessages::CandidateBacking(
					CandidateBackingMessage::Statement(r, s)
				) if r == hash_a && s == statement => {}
			);

			assert_matches!(
				handle.recv().await,
				AllMessages::NetworkBridge(
					NetworkBridgeMessage::SendValidationMessage(
						recipients,
						protocol_v1::ValidationProtocol::StatementDistribution(
							protocol_v1::StatementDistributionMessage::Statement(r, s)
						),
					)
				) => {
					assert_eq!(recipients, vec![peer_b.clone()]);
					assert_eq!(r, hash_a);
					assert_eq!(s, statement);
				}
			);
		};

		futures::pin_mut!(test_fut);
		futures::pin_mut!(bg);

		executor::block_on(future::select(test_fut, bg));
	}