From a7097681b76bdaef21dcde9aec8c33205f480e44 Mon Sep 17 00:00:00 2001
From: Andrei Eres <eresav@me.com>
Date: Mon, 27 May 2024 21:23:58 +0200
Subject: [PATCH] [subsystem-benchmarks] Add statement-distribution benchmarks
 (#3863)

Fixes https://github.com/paritytech/polkadot-sdk/issues/3748

Adds a subsystem benchmark for statements-distribution subsystem.

Results in CI (reference hw):
```
$ cargo bench -p polkadot-statement-distribution --bench statement-distribution-regression-bench --features subsystem-benchmarks

[Sent to peers] standart_deviation 0.07%
[Received from peers] standart_deviation 0.00%
[statement-distribution] standart_deviation 0.97%
[test-environment] standart_deviation 1.03%

Network usage, KiB                     total   per block
Received from peers                1088.0000    108.8000
Sent to peers                      1238.1800    123.8180

CPU usage, seconds                     total   per block
statement-distribution                0.3897      0.0390
test-environment                      0.4715      0.0472
```
---
 .gitlab/pipeline/publish.yml                  |   4 +
 .gitlab/pipeline/test.yml                     |   7 +
 Cargo.lock                                    |   2 +
 .../network/statement-distribution/Cargo.toml |  10 +
 ...statement-distribution-regression-bench.rs |  78 +++
 .../network/statement-distribution/src/lib.rs |   1 -
 polkadot/node/subsystem-bench/Cargo.toml      |   1 +
 .../examples/statement_distribution.yaml      |   5 +
 .../src/cli/subsystem-bench.rs                |  14 +-
 .../subsystem-bench/src/lib/approval/mod.rs   |  10 +-
 .../src/lib/availability/mod.rs               |   3 +-
 .../subsystem-bench/src/lib/configuration.rs  |  31 +-
 polkadot/node/subsystem-bench/src/lib/lib.rs  |   3 +-
 .../subsystem-bench/src/lib/mock/av_store.rs  |   3 +-
 .../src/lib/mock/candidate_backing.rs         | 171 +++++++
 .../node/subsystem-bench/src/lib/mock/mod.rs  |   2 +
 .../src/lib/mock/network_bridge.rs            |  49 +-
 .../src/lib/mock/prospective_parachains.rs    |  74 +++
 .../src/lib/mock/runtime_api.rs               |  83 +++-
 .../node/subsystem-bench/src/lib/network.rs   |  90 +++-
 .../subsystem-bench/src/lib/statement/mod.rs  | 450 ++++++++++++++++++
 .../src/lib/statement/test_state.rs           | 436 +++++++++++++++++
 22 files changed, 1480 insertions(+), 47 deletions(-)
 create mode 100644 polkadot/node/network/statement-distribution/benches/statement-distribution-regression-bench.rs
 create mode 100644 polkadot/node/subsystem-bench/examples/statement_distribution.yaml
 create mode 100644 polkadot/node/subsystem-bench/src/lib/mock/candidate_backing.rs
 create mode 100644 polkadot/node/subsystem-bench/src/lib/mock/prospective_parachains.rs
 create mode 100644 polkadot/node/subsystem-bench/src/lib/statement/mod.rs
 create mode 100644 polkadot/node/subsystem-bench/src/lib/statement/test_state.rs

diff --git a/.gitlab/pipeline/publish.yml b/.gitlab/pipeline/publish.yml
index 8b27c724748..44cd1933a9c 100644
--- a/.gitlab/pipeline/publish.yml
+++ b/.gitlab/pipeline/publish.yml
@@ -76,6 +76,8 @@ publish-subsystem-benchmarks:
       artifacts: true
     - job: subsystem-benchmark-approval-voting
       artifacts: true
+    - job: subsystem-benchmark-statement-distribution
+      artifacts: true
     - job: publish-rustdoc
       artifacts: false
   script:
@@ -119,6 +121,8 @@ trigger_workflow:
       artifacts: true
     - job: subsystem-benchmark-approval-voting
       artifacts: true
+    - job: subsystem-benchmark-statement-distribution
+      artifacts: true
   script:
     - echo "Triggering workflow"
     - >
diff --git a/.gitlab/pipeline/test.yml b/.gitlab/pipeline/test.yml
index 1851581746a..d171a8a1942 100644
--- a/.gitlab/pipeline/test.yml
+++ b/.gitlab/pipeline/test.yml
@@ -630,3 +630,10 @@ subsystem-benchmark-approval-voting:
   script:
     - cargo bench -p polkadot-node-core-approval-voting --bench approval-voting-regression-bench --features subsystem-benchmarks
   allow_failure: true
+
+subsystem-benchmark-statement-distribution:
+  extends:
+    - .subsystem-benchmark-template
+  script:
+    - cargo bench -p polkadot-statement-distribution --bench statement-distribution-regression-bench --features subsystem-benchmarks
+  allow_failure: true
diff --git a/Cargo.lock b/Cargo.lock
index 82dfd34c252..acbda4f0326 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -14474,6 +14474,7 @@ dependencies = [
  "polkadot-node-subsystem-util",
  "polkadot-primitives",
  "polkadot-primitives-test-helpers",
+ "polkadot-subsystem-bench",
  "rand_chacha 0.3.1",
  "sc-keystore",
  "sc-network",
@@ -14538,6 +14539,7 @@ dependencies = [
  "polkadot-overseer",
  "polkadot-primitives",
  "polkadot-primitives-test-helpers",
+ "polkadot-statement-distribution",
  "prometheus",
  "pyroscope",
  "pyroscope_pprofrs",
diff --git a/polkadot/node/network/statement-distribution/Cargo.toml b/polkadot/node/network/statement-distribution/Cargo.toml
index 1fe761bd0e3..65224f9e2be 100644
--- a/polkadot/node/network/statement-distribution/Cargo.toml
+++ b/polkadot/node/network/statement-distribution/Cargo.toml
@@ -42,3 +42,13 @@ sc-network = { path = "../../../../substrate/client/network" }
 futures-timer = "3.0.2"
 polkadot-primitives-test-helpers = { path = "../../../primitives/test-helpers" }
 rand_chacha = "0.3"
+polkadot-subsystem-bench = { path = "../../subsystem-bench" }
+
+[[bench]]
+name = "statement-distribution-regression-bench"
+path = "benches/statement-distribution-regression-bench.rs"
+harness = false
+required-features = ["subsystem-benchmarks"]
+
+[features]
+subsystem-benchmarks = []
diff --git a/polkadot/node/network/statement-distribution/benches/statement-distribution-regression-bench.rs b/polkadot/node/network/statement-distribution/benches/statement-distribution-regression-bench.rs
new file mode 100644
index 00000000000..abcb1e6783f
--- /dev/null
+++ b/polkadot/node/network/statement-distribution/benches/statement-distribution-regression-bench.rs
@@ -0,0 +1,78 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! statement-distribution regression tests
+//!
+//! Statement distribution benchmark based on Kusama parameters and scale.
+
+use polkadot_subsystem_bench::{
+	configuration::TestConfiguration,
+	statement::{benchmark_statement_distribution, prepare_test, TestState},
+	usage::BenchmarkUsage,
+	utils::save_to_file,
+};
+use std::io::Write;
+
+const BENCH_COUNT: usize = 50;
+
+fn main() -> Result<(), String> {
+	let mut messages = vec![];
+	let mut config = TestConfiguration::default();
+	config.n_cores = 100;
+	config.n_validators = 500;
+	config.num_blocks = 10;
+	config.connectivity = 100;
+	config.generate_pov_sizes();
+	let state = TestState::new(&config);
+
+	println!("Benchmarking...");
+	let usages: Vec<BenchmarkUsage> = (0..BENCH_COUNT)
+		.map(|n| {
+			print!("\r[{}{}]", "#".repeat(n), "_".repeat(BENCH_COUNT - n));
+			std::io::stdout().flush().unwrap();
+			let (mut env, _cfgs) = prepare_test(&state, false);
+			env.runtime().block_on(benchmark_statement_distribution(
+				"statement-distribution",
+				&mut env,
+				&state,
+			))
+		})
+		.collect();
+	println!("\rDone!{}", " ".repeat(BENCH_COUNT));
+
+	let average_usage = BenchmarkUsage::average(&usages);
+	save_to_file(
+		"charts/statement-distribution-regression-bench.json",
+		average_usage.to_chart_json().map_err(|e| e.to_string())?,
+	)
+	.map_err(|e| e.to_string())?;
+	println!("{}", average_usage);
+
+	// We expect no variance for received and sent
+	// but use 0.001 because we operate with floats
+	messages.extend(average_usage.check_network_usage(&[
+		("Received from peers", 106.4000, 0.001),
+		("Sent to peers", 127.9100, 0.001),
+	]));
+	messages.extend(average_usage.check_cpu_usage(&[("statement-distribution", 0.0390, 0.1)]));
+
+	if messages.is_empty() {
+		Ok(())
+	} else {
+		eprintln!("{}", messages.join("\n"));
+		Err("Regressions found".to_string())
+	}
+}
diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs
index 4ca199c3378..4d56c795f13 100644
--- a/polkadot/node/network/statement-distribution/src/lib.rs
+++ b/polkadot/node/network/statement-distribution/src/lib.rs
@@ -19,7 +19,6 @@
 //! This is responsible for distributing signed statements about candidate
 //! validity among validators.
 
-#![deny(unused_crate_dependencies)]
 #![warn(missing_docs)]
 
 use error::{log_error, FatalResult};
diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml
index 37c6681b273..21eaed832c4 100644
--- a/polkadot/node/subsystem-bench/Cargo.toml
+++ b/polkadot/node/subsystem-bench/Cargo.toml
@@ -28,6 +28,7 @@ polkadot-primitives = { path = "../../primitives" }
 polkadot-node-network-protocol = { path = "../network/protocol" }
 polkadot-availability-recovery = { path = "../network/availability-recovery", features = ["subsystem-benchmarks"] }
 polkadot-availability-distribution = { path = "../network/availability-distribution" }
+polkadot-statement-distribution = { path = "../network/statement-distribution" }
 polkadot-node-core-av-store = { path = "../core/av-store" }
 polkadot-node-core-chain-api = { path = "../core/chain-api" }
 polkadot-availability-bitfield-distribution = { path = "../network/bitfield-distribution" }
diff --git a/polkadot/node/subsystem-bench/examples/statement_distribution.yaml b/polkadot/node/subsystem-bench/examples/statement_distribution.yaml
new file mode 100644
index 00000000000..e86669ffefc
--- /dev/null
+++ b/polkadot/node/subsystem-bench/examples/statement_distribution.yaml
@@ -0,0 +1,5 @@
+TestConfiguration:
+- objective: StatementDistribution
+  num_blocks: 10
+  n_cores: 100
+  n_validators: 500
diff --git a/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs b/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs
index 10953b6c783..1e921500a4d 100644
--- a/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs
+++ b/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs
@@ -20,7 +20,7 @@
 use clap::Parser;
 use color_eyre::eyre;
 use colored::Colorize;
-use polkadot_subsystem_bench::{approval, availability, configuration};
+use polkadot_subsystem_bench::{approval, availability, configuration, statement};
 use pyroscope::PyroscopeAgent;
 use pyroscope_pprofrs::{pprof_backend, PprofConfig};
 use serde::{Deserialize, Serialize};
@@ -40,6 +40,8 @@ pub enum TestObjective {
 	DataAvailabilityWrite,
 	/// Benchmark the approval-voting and approval-distribution subsystems.
 	ApprovalVoting(approval::ApprovalsOptions),
+	// Benchmark the statement-distribution subsystem
+	StatementDistribution,
 }
 
 impl std::fmt::Display for TestObjective {
@@ -51,6 +53,7 @@ impl std::fmt::Display for TestObjective {
 				Self::DataAvailabilityRead(_) => "DataAvailabilityRead",
 				Self::DataAvailabilityWrite => "DataAvailabilityWrite",
 				Self::ApprovalVoting(_) => "ApprovalVoting",
+				Self::StatementDistribution => "StatementDistribution",
 			}
 		)
 	}
@@ -170,6 +173,15 @@ impl BenchCli {
 						state,
 					))
 				},
+				TestObjective::StatementDistribution => {
+					let state = statement::TestState::new(&test_config);
+					let (mut env, _protocol_config) = statement::prepare_test(&state, true);
+					env.runtime().block_on(statement::benchmark_statement_distribution(
+						&benchmark_name,
+						&mut env,
+						&state,
+					))
+				},
 			};
 			println!("{}", usage);
 		}
diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs
index 6ac0776d2d3..4a479b6af29 100644
--- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs
+++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs
@@ -30,7 +30,7 @@ use crate::{
 	mock::{
 		chain_api::{ChainApiState, MockChainApi},
 		network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx},
-		runtime_api::MockRuntimeApi,
+		runtime_api::{MockRuntimeApi, MockRuntimeApiCoreState},
 		AlwaysSupportsParachains, TestSyncOracle,
 	},
 	network::{
@@ -465,8 +465,9 @@ impl ApprovalTestState {
 	}
 }
 
+#[async_trait::async_trait]
 impl HandleNetworkMessage for ApprovalTestState {
-	fn handle(
+	async fn handle(
 		&self,
 		_message: crate::network::NetworkMessage,
 		_node_sender: &mut futures::channel::mpsc::UnboundedSender<crate::network::NetworkMessage>,
@@ -807,6 +808,7 @@ fn build_overseer(
 		state.candidate_events_by_block(),
 		Some(state.babe_epoch.clone()),
 		1,
+		MockRuntimeApiCoreState::Occupied,
 	);
 	let mock_tx_bridge = MockNetworkBridgeTx::new(
 		network.clone(),
@@ -915,7 +917,9 @@ pub async fn bench_approvals_run(
 
 	// First create the initialization messages that make sure that then node under
 	// tests receives notifications about the topology used and the connected peers.
-	let mut initialization_messages = env.network().generate_peer_connected();
+	let mut initialization_messages = env.network().generate_peer_connected(|e| {
+		AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(e))
+	});
 	initialization_messages.extend(generate_new_session_topology(
 		&state.test_authorities,
 		ValidatorIndex(NODE_UNDER_TEST),
diff --git a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs
index 5b93c3d862d..f7d65589565 100644
--- a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs
+++ b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs
@@ -22,7 +22,7 @@ use crate::{
 		av_store::{self, MockAvailabilityStore, NetworkAvailabilityState},
 		chain_api::{ChainApiState, MockChainApi},
 		network_bridge::{self, MockNetworkBridgeRx, MockNetworkBridgeTx},
-		runtime_api::{self, MockRuntimeApi},
+		runtime_api::{self, MockRuntimeApi, MockRuntimeApiCoreState},
 		AlwaysSupportsParachains,
 	},
 	network::new_network,
@@ -189,6 +189,7 @@ pub fn prepare_test(
 		Default::default(),
 		Default::default(),
 		0,
+		MockRuntimeApiCoreState::Occupied,
 	);
 
 	let (overseer, overseer_handle) = match &mode {
diff --git a/polkadot/node/subsystem-bench/src/lib/configuration.rs b/polkadot/node/subsystem-bench/src/lib/configuration.rs
index 1e0efb72a7d..f614a5e552a 100644
--- a/polkadot/node/subsystem-bench/src/lib/configuration.rs
+++ b/polkadot/node/subsystem-bench/src/lib/configuration.rs
@@ -18,12 +18,13 @@
 
 use crate::keyring::Keyring;
 use itertools::Itertools;
-use polkadot_primitives::{AssignmentId, AuthorityDiscoveryId, ValidatorId};
+use polkadot_primitives::{AssignmentId, AuthorityDiscoveryId, ValidatorId, ValidatorPair};
 use rand::thread_rng;
 use rand_distr::{Distribution, Normal, Uniform};
 use sc_network_types::PeerId;
 use serde::{Deserialize, Serialize};
 use sp_consensus_babe::AuthorityId;
+use sp_core::Pair;
 use std::collections::HashMap;
 
 /// Peer networking latency configuration.
@@ -89,6 +90,15 @@ fn default_n_delay_tranches() -> usize {
 fn default_no_show_slots() -> usize {
 	3
 }
+fn default_minimum_backing_votes() -> u32 {
+	2
+}
+fn default_max_candidate_depth() -> u32 {
+	3
+}
+fn default_allowed_ancestry_len() -> u32 {
+	2
+}
 
 /// The test input parameters
 #[derive(Clone, Debug, Serialize, Deserialize)]
@@ -137,6 +147,15 @@ pub struct TestConfiguration {
 	pub connectivity: usize,
 	/// Number of blocks to run the test for
 	pub num_blocks: usize,
+	/// Number of minimum backing votes
+	#[serde(default = "default_minimum_backing_votes")]
+	pub minimum_backing_votes: u32,
+	/// Async Backing max_candidate_depth
+	#[serde(default = "default_max_candidate_depth")]
+	pub max_candidate_depth: u32,
+	/// Async Backing allowed_ancestry_len
+	#[serde(default = "default_allowed_ancestry_len")]
+	pub allowed_ancestry_len: u32,
 }
 
 impl Default for TestConfiguration {
@@ -158,6 +177,9 @@ impl Default for TestConfiguration {
 			latency: default_peer_latency(),
 			connectivity: default_connectivity(),
 			num_blocks: Default::default(),
+			minimum_backing_votes: default_minimum_backing_votes(),
+			max_candidate_depth: default_max_candidate_depth(),
+			allowed_ancestry_len: default_allowed_ancestry_len(),
 		}
 	}
 }
@@ -208,6 +230,11 @@ impl TestConfiguration {
 			.map(|(peer_id, authority_id)| (*peer_id, authority_id.clone()))
 			.collect();
 
+		let validator_pairs = key_seeds
+			.iter()
+			.map(|seed| ValidatorPair::from_string_with_seed(seed, None).unwrap().0)
+			.collect();
+
 		TestAuthorities {
 			keyring,
 			validator_public,
@@ -217,6 +244,7 @@ impl TestConfiguration {
 			validator_assignment_id,
 			key_seeds,
 			peer_id_to_authority,
+			validator_pairs,
 		}
 	}
 }
@@ -246,6 +274,7 @@ pub struct TestAuthorities {
 	pub key_seeds: Vec<String>,
 	pub peer_ids: Vec<PeerId>,
 	pub peer_id_to_authority: HashMap<PeerId, AuthorityDiscoveryId>,
+	pub validator_pairs: Vec<ValidatorPair>,
 }
 
 /// Sample latency (in milliseconds) from a normal distribution with parameters
diff --git a/polkadot/node/subsystem-bench/src/lib/lib.rs b/polkadot/node/subsystem-bench/src/lib/lib.rs
index ef2724abc98..e18227af8be 100644
--- a/polkadot/node/subsystem-bench/src/lib/lib.rs
+++ b/polkadot/node/subsystem-bench/src/lib/lib.rs
@@ -14,7 +14,7 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-// The validator index that represent the node that is under test.
+// The validator index that represents the node that is under test.
 pub const NODE_UNDER_TEST: u32 = 0;
 
 pub mod approval;
@@ -25,5 +25,6 @@ pub(crate) mod environment;
 pub(crate) mod keyring;
 pub(crate) mod mock;
 pub(crate) mod network;
+pub mod statement;
 pub mod usage;
 pub mod utils;
diff --git a/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs b/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs
index fba33523be8..a035bf01897 100644
--- a/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs
+++ b/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs
@@ -49,8 +49,9 @@ pub struct NetworkAvailabilityState {
 }
 
 // Implement access to the state.
+#[async_trait::async_trait]
 impl HandleNetworkMessage for NetworkAvailabilityState {
-	fn handle(
+	async fn handle(
 		&self,
 		message: NetworkMessage,
 		_node_sender: &mut futures::channel::mpsc::UnboundedSender<NetworkMessage>,
diff --git a/polkadot/node/subsystem-bench/src/lib/mock/candidate_backing.rs b/polkadot/node/subsystem-bench/src/lib/mock/candidate_backing.rs
new file mode 100644
index 00000000000..51494016e18
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/lib/mock/candidate_backing.rs
@@ -0,0 +1,171 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! A generic candidate backing subsystem mockup suitable to be used in benchmarks.
+
+use crate::{configuration::TestConfiguration, NODE_UNDER_TEST};
+use futures::FutureExt;
+use polkadot_node_primitives::{SignedFullStatementWithPVD, Statement, StatementWithPVD};
+use polkadot_node_subsystem::{
+	messages::CandidateBackingMessage, overseer, SpawnedSubsystem, SubsystemError,
+};
+use polkadot_node_subsystem_types::OverseerSignal;
+use polkadot_primitives::{
+	CandidateHash, Hash, PersistedValidationData, SigningContext, ValidatorIndex, ValidatorPair,
+};
+use sp_core::Pair;
+use std::collections::HashMap;
+
+const LOG_TARGET: &str = "subsystem-bench::candidate-backing-mock";
+
+struct MockCandidateBackingState {
+	pair: ValidatorPair,
+	pvd: PersistedValidationData,
+	own_backing_group: Vec<ValidatorIndex>,
+}
+
+pub struct MockCandidateBacking {
+	config: TestConfiguration,
+	state: MockCandidateBackingState,
+}
+
+impl MockCandidateBacking {
+	pub fn new(
+		config: TestConfiguration,
+		pair: ValidatorPair,
+		pvd: PersistedValidationData,
+		own_backing_group: Vec<ValidatorIndex>,
+	) -> Self {
+		Self { config, state: MockCandidateBackingState { pair, pvd, own_backing_group } }
+	}
+
+	fn handle_statement(
+		&self,
+		relay_parent: Hash,
+		statement: SignedFullStatementWithPVD,
+		statements_tracker: &mut HashMap<CandidateHash, u32>,
+	) -> Vec<polkadot_node_subsystem::messages::StatementDistributionMessage> {
+		let mut messages = vec![];
+		let validator_id = statement.validator_index();
+		let is_own_backing_group = self.state.own_backing_group.contains(&validator_id);
+
+		match statement.payload() {
+			StatementWithPVD::Seconded(receipt, _pvd) => {
+				let candidate_hash = receipt.hash();
+				statements_tracker
+					.entry(candidate_hash)
+					.and_modify(|v| {
+						*v += 1;
+					})
+					.or_insert(1);
+
+				let statements_received_count = *statements_tracker.get(&candidate_hash).unwrap();
+				if statements_received_count == (self.config.minimum_backing_votes - 1) &&
+					is_own_backing_group
+				{
+					let statement = Statement::Valid(candidate_hash);
+					let context = SigningContext { parent_hash: relay_parent, session_index: 0 };
+					let payload = statement.to_compact().signing_payload(&context);
+					let message =
+						polkadot_node_subsystem::messages::StatementDistributionMessage::Share(
+							relay_parent,
+							SignedFullStatementWithPVD::new(
+								statement.supply_pvd(self.state.pvd.clone()),
+								ValidatorIndex(NODE_UNDER_TEST),
+								self.state.pair.sign(&payload[..]),
+								&context,
+								&self.state.pair.public(),
+							)
+							.unwrap(),
+						);
+					messages.push(message);
+				}
+
+				if statements_received_count == self.config.minimum_backing_votes {
+					let message =
+						polkadot_node_subsystem::messages::StatementDistributionMessage::Backed(
+							candidate_hash,
+						);
+					messages.push(message);
+				}
+			},
+			StatementWithPVD::Valid(candidate_hash) => {
+				statements_tracker
+					.entry(*candidate_hash)
+					.and_modify(|v| {
+						*v += 1;
+					})
+					.or_insert(1);
+
+				let statements_received_count = *statements_tracker.get(candidate_hash).unwrap();
+				if statements_received_count == self.config.minimum_backing_votes {
+					let message =
+						polkadot_node_subsystem::messages::StatementDistributionMessage::Backed(
+							*candidate_hash,
+						);
+					messages.push(message);
+				}
+			},
+		}
+
+		messages
+	}
+}
+
+#[overseer::subsystem(CandidateBacking, error=SubsystemError, prefix=self::overseer)]
+impl<Context> MockCandidateBacking {
+	fn start(self, ctx: Context) -> SpawnedSubsystem {
+		let future = self.run(ctx).map(|_| Ok(())).boxed();
+
+		SpawnedSubsystem { name: "test-environment", future }
+	}
+}
+
+#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
+impl MockCandidateBacking {
+	async fn run<Context>(self, mut ctx: Context) {
+		let mut statements_tracker: HashMap<CandidateHash, u32> = Default::default();
+
+		loop {
+			let msg = ctx.recv().await.expect("Overseer never fails us");
+			match msg {
+				orchestra::FromOrchestra::Signal(signal) =>
+					if signal == OverseerSignal::Conclude {
+						return
+					},
+				orchestra::FromOrchestra::Communication { msg } => {
+					gum::trace!(target: LOG_TARGET, msg=?msg, "recv message");
+
+					match msg {
+						CandidateBackingMessage::Statement(relay_parent, statement) => {
+							let messages = self.handle_statement(
+								relay_parent,
+								statement,
+								&mut statements_tracker,
+							);
+							for message in messages {
+								ctx.send_message(message).await;
+							}
+						},
+						_ => {
+							unimplemented!("Unexpected candidate-backing message")
+						},
+					}
+				},
+			}
+		}
+	}
+}
diff --git a/polkadot/node/subsystem-bench/src/lib/mock/mod.rs b/polkadot/node/subsystem-bench/src/lib/mock/mod.rs
index 6dda9a47d39..12766374bfa 100644
--- a/polkadot/node/subsystem-bench/src/lib/mock/mod.rs
+++ b/polkadot/node/subsystem-bench/src/lib/mock/mod.rs
@@ -19,9 +19,11 @@ use polkadot_node_subsystem_types::Hash;
 use sp_consensus::SyncOracle;
 
 pub mod av_store;
+pub mod candidate_backing;
 pub mod chain_api;
 pub mod dummy;
 pub mod network_bridge;
+pub mod prospective_parachains;
 pub mod runtime_api;
 
 pub struct AlwaysSupportsParachains {}
diff --git a/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs b/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs
index ec66ad4e279..10508f456a4 100644
--- a/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs
+++ b/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs
@@ -27,14 +27,19 @@ use polkadot_node_subsystem::{
 	messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, SubsystemError,
 };
 use polkadot_node_subsystem_types::{
-	messages::{ApprovalDistributionMessage, BitfieldDistributionMessage, NetworkBridgeEvent},
+	messages::{
+		ApprovalDistributionMessage, BitfieldDistributionMessage, NetworkBridgeEvent,
+		StatementDistributionMessage,
+	},
 	OverseerSignal,
 };
 use sc_network::{request_responses::ProtocolConfig, RequestFailure};
 
 const LOG_TARGET: &str = "subsystem-bench::network-bridge";
-const CHUNK_REQ_PROTOCOL_NAME_V1: &str =
-	"/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_chunk/1";
+const ALLOWED_PROTOCOLS: &[&str] = &[
+	"/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_chunk/1",
+	"/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_attested_candidate/2",
+];
 
 /// A mock of the network bridge tx subsystem.
 pub struct MockNetworkBridgeTx {
@@ -106,8 +111,15 @@ impl MockNetworkBridgeTx {
 					NetworkBridgeTxMessage::SendRequests(requests, _if_disconnected) => {
 						for request in requests {
 							gum::debug!(target: LOG_TARGET, request = ?request, "Processing request");
-							let peer_id =
-								request.authority_id().expect("all nodes are authorities").clone();
+							let peer_id = match request.authority_id() {
+								Some(v) => v.clone(),
+								None => self
+									.test_authorities
+									.peer_id_to_authority
+									.get(request.peer_id().expect("Should exist"))
+									.expect("Should exist")
+									.clone(),
+							};
 
 							if !self.network.is_peer_connected(&peer_id) {
 								// Attempting to send a request to a disconnected peer.
@@ -141,7 +153,23 @@ impl MockNetworkBridgeTx {
 								.expect("Should not fail");
 						}
 					},
-					_ => unimplemented!("Unexpected network bridge message"),
+					NetworkBridgeTxMessage::SendValidationMessages(messages) => {
+						for (peers, message) in messages {
+							for peer in peers {
+								self.to_network_interface
+									.unbounded_send(NetworkMessage::MessageFromNode(
+										self.test_authorities
+											.peer_id_to_authority
+											.get(&peer)
+											.unwrap()
+											.clone(),
+										message.clone(),
+									))
+									.expect("Should not fail");
+							}
+						}
+					},
+					message => unimplemented!("Unexpected network bridge message {:?}", message),
 				},
 			}
 		}
@@ -175,13 +203,20 @@ impl MockNetworkBridgeRx {
 										ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg)))
 									).await;
 								}
+								Versioned::V3(
+									polkadot_node_network_protocol::v3::ValidationProtocol::StatementDistribution(msg)
+								) => {
+									ctx.send_message(
+										StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg)))
+									).await;
+								}
 								_ => {
 									unimplemented!("We only talk v2 network protocol")
 								},
 							},
 							NetworkMessage::RequestFromPeer(request) => {
 								if let Some(protocol) = self.chunk_request_sender.as_mut() {
-									assert_eq!(&*protocol.name, CHUNK_REQ_PROTOCOL_NAME_V1);
+									assert!(ALLOWED_PROTOCOLS.contains(&&*protocol.name));
 									if let Some(inbound_queue) = protocol.inbound_queue.as_ref() {
 										inbound_queue
 											.send(request)
diff --git a/polkadot/node/subsystem-bench/src/lib/mock/prospective_parachains.rs b/polkadot/node/subsystem-bench/src/lib/mock/prospective_parachains.rs
new file mode 100644
index 00000000000..8a865af21a0
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/lib/mock/prospective_parachains.rs
@@ -0,0 +1,74 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! A generic prospective parachains subsystem mockup suitable to be used in benchmarks.
+
+use futures::FutureExt;
+use polkadot_node_subsystem::{
+	messages::ProspectiveParachainsMessage, overseer, SpawnedSubsystem, SubsystemError,
+};
+use polkadot_node_subsystem_types::OverseerSignal;
+use polkadot_primitives::Hash;
+
+pub struct MockProspectiveParachains {}
+
+impl MockProspectiveParachains {
+	pub fn new() -> Self {
+		Self {}
+	}
+}
+
+#[overseer::subsystem(ProspectiveParachains, error=SubsystemError, prefix=self::overseer)]
+impl<Context> MockProspectiveParachains {
+	fn start(self, ctx: Context) -> SpawnedSubsystem {
+		let future = self.run(ctx).map(|_| Ok(())).boxed();
+
+		SpawnedSubsystem { name: "test-environment", future }
+	}
+}
+
+#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
+impl MockProspectiveParachains {
+	async fn run<Context>(self, mut ctx: Context) {
+		loop {
+			let msg = ctx.recv().await.expect("Overseer never fails us");
+			match msg {
+				orchestra::FromOrchestra::Signal(signal) =>
+					if signal == OverseerSignal::Conclude {
+						return
+					},
+				orchestra::FromOrchestra::Communication { msg } => match msg {
+					ProspectiveParachainsMessage::GetMinimumRelayParents(_relay_parent, tx) => {
+						tx.send(vec![]).unwrap();
+					},
+					ProspectiveParachainsMessage::GetHypotheticalMembership(req, tx) => {
+						tx.send(
+							req.candidates
+								.iter()
+								.cloned()
+								.map(|candidate| (candidate, vec![Hash::repeat_byte(0)]))
+								.collect(),
+						)
+						.unwrap();
+					},
+					_ => {
+						unimplemented!("Unexpected chain-api message")
+					},
+				},
+			}
+		}
+	}
+}
diff --git a/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs b/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs
index b73d61321cd..9788a1123ec 100644
--- a/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs
+++ b/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs
@@ -26,8 +26,9 @@ use polkadot_node_subsystem::{
 };
 use polkadot_node_subsystem_types::OverseerSignal;
 use polkadot_primitives::{
-	CandidateEvent, CandidateReceipt, CoreState, GroupIndex, IndexedVec, NodeFeatures,
-	OccupiedCore, SessionIndex, SessionInfo, ValidatorIndex,
+	AsyncBackingParams, CandidateEvent, CandidateReceipt, CoreState, GroupIndex, GroupRotationInfo,
+	IndexedVec, NodeFeatures, OccupiedCore, ScheduledCore, SessionIndex, SessionInfo,
+	ValidatorIndex,
 };
 use sp_consensus_babe::Epoch as BabeEpoch;
 use sp_core::H256;
@@ -49,11 +50,20 @@ pub struct RuntimeApiState {
 	session_index: SessionIndex,
 }
 
+#[derive(Clone)]
+pub enum MockRuntimeApiCoreState {
+	Occupied,
+	Scheduled,
+	#[allow(dead_code)]
+	Free,
+}
+
 /// A mocked `runtime-api` subsystem.
 #[derive(Clone)]
 pub struct MockRuntimeApi {
 	state: RuntimeApiState,
 	config: TestConfiguration,
+	core_state: MockRuntimeApiCoreState,
 }
 
 impl MockRuntimeApi {
@@ -64,6 +74,7 @@ impl MockRuntimeApi {
 		included_candidates: HashMap<H256, Vec<CandidateEvent>>,
 		babe_epoch: Option<BabeEpoch>,
 		session_index: SessionIndex,
+		core_state: MockRuntimeApiCoreState,
 	) -> MockRuntimeApi {
 		Self {
 			state: RuntimeApiState {
@@ -74,6 +85,7 @@ impl MockRuntimeApi {
 				session_index,
 			},
 			config,
+			core_state,
 		}
 	}
 
@@ -198,16 +210,26 @@ impl MockRuntimeApi {
 									// Ensure test breaks if badly configured.
 									assert!(index < validator_group_count);
 
-									CoreState::Occupied(OccupiedCore {
-										next_up_on_available: None,
-										occupied_since: 0,
-										time_out_at: 0,
-										next_up_on_time_out: None,
-										availability: BitVec::default(),
-										group_responsible: GroupIndex(index as u32),
-										candidate_hash: candidate_receipt.hash(),
-										candidate_descriptor: candidate_receipt.descriptor.clone(),
-									})
+									use MockRuntimeApiCoreState::*;
+									match self.core_state {
+										Occupied => CoreState::Occupied(OccupiedCore {
+											next_up_on_available: None,
+											occupied_since: 0,
+											time_out_at: 0,
+											next_up_on_time_out: None,
+											availability: BitVec::default(),
+											group_responsible: GroupIndex(index as u32),
+											candidate_hash: candidate_receipt.hash(),
+											candidate_descriptor: candidate_receipt
+												.descriptor
+												.clone(),
+										}),
+										Scheduled => CoreState::Scheduled(ScheduledCore {
+											para_id: (index + 1).into(),
+											collator: None,
+										}),
+										Free => todo!(),
+									}
 								})
 								.collect::<Vec<_>>();
 
@@ -223,6 +245,43 @@ impl MockRuntimeApi {
 								.clone()
 								.expect("Babe epoch unpopulated")));
 						},
+						RuntimeApiMessage::Request(
+							_block_hash,
+							RuntimeApiRequest::AsyncBackingParams(sender),
+						) => {
+							let _ = sender.send(Ok(AsyncBackingParams {
+								max_candidate_depth: self.config.max_candidate_depth,
+								allowed_ancestry_len: self.config.allowed_ancestry_len,
+							}));
+						},
+						RuntimeApiMessage::Request(_parent, RuntimeApiRequest::Version(tx)) => {
+							tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT))
+								.unwrap();
+						},
+						RuntimeApiMessage::Request(
+							_parent,
+							RuntimeApiRequest::DisabledValidators(tx),
+						) => {
+							tx.send(Ok(vec![])).unwrap();
+						},
+						RuntimeApiMessage::Request(
+							_parent,
+							RuntimeApiRequest::MinimumBackingVotes(_session_index, tx),
+						) => {
+							tx.send(Ok(self.config.minimum_backing_votes)).unwrap();
+						},
+						RuntimeApiMessage::Request(
+							_parent,
+							RuntimeApiRequest::ValidatorGroups(tx),
+						) => {
+							let groups = self.session_info().validator_groups.to_vec();
+							let group_rotation_info = GroupRotationInfo {
+								session_start_block: 1,
+								group_rotation_frequency: 12,
+								now: 1,
+							};
+							tx.send(Ok((groups, group_rotation_info))).unwrap();
+						},
 						// Long term TODO: implement more as needed.
 						message => {
 							unimplemented!("Unexpected runtime-api message: {:?}", message)
diff --git a/polkadot/node/subsystem-bench/src/lib/network.rs b/polkadot/node/subsystem-bench/src/lib/network.rs
index 9bf2415e5a8..9686f456b9e 100644
--- a/polkadot/node/subsystem-bench/src/lib/network.rs
+++ b/polkadot/node/subsystem-bench/src/lib/network.rs
@@ -51,13 +51,14 @@ use futures::{
 };
 use itertools::Itertools;
 use net_protocol::{
-	peer_set::{ProtocolVersion, ValidationVersion},
+	peer_set::ValidationVersion,
 	request_response::{Recipient, Requests, ResponseSender},
-	ObservedRole, VersionedValidationProtocol,
+	ObservedRole, VersionedValidationProtocol, View,
 };
 use parity_scale_codec::Encode;
 use polkadot_node_network_protocol::{self as net_protocol, Versioned};
-use polkadot_node_subsystem_types::messages::{ApprovalDistributionMessage, NetworkBridgeEvent};
+use polkadot_node_subsystem::messages::StatementDistributionMessage;
+use polkadot_node_subsystem_types::messages::NetworkBridgeEvent;
 use polkadot_node_subsystem_util::metrics::prometheus::{
 	self, CounterVec, Opts, PrometheusError, Registry,
 };
@@ -437,6 +438,7 @@ pub struct EmulatedPeerHandle {
 	/// Send actions to be performed by the peer.
 	actions_tx: UnboundedSender<NetworkMessage>,
 	peer_id: PeerId,
+	authority_id: AuthorityDiscoveryId,
 }
 
 impl EmulatedPeerHandle {
@@ -496,29 +498,31 @@ impl EmulatedPeer {
 }
 
 /// Interceptor pattern for handling messages.
+#[async_trait::async_trait]
 pub trait HandleNetworkMessage {
 	/// Returns `None` if the message was handled, or the `message`
 	/// otherwise.
 	///
 	/// `node_sender` allows sending of messages to the node in response
 	/// to the handled message.
-	fn handle(
+	async fn handle(
 		&self,
 		message: NetworkMessage,
 		node_sender: &mut UnboundedSender<NetworkMessage>,
 	) -> Option<NetworkMessage>;
 }
 
+#[async_trait::async_trait]
 impl<T> HandleNetworkMessage for Arc<T>
 where
-	T: HandleNetworkMessage,
+	T: HandleNetworkMessage + Sync + Send,
 {
-	fn handle(
+	async fn handle(
 		&self,
 		message: NetworkMessage,
 		node_sender: &mut UnboundedSender<NetworkMessage>,
 	) -> Option<NetworkMessage> {
-		self.as_ref().handle(message, node_sender)
+		T::handle(self, message, node_sender).await
 	}
 }
 
@@ -551,7 +555,7 @@ async fn emulated_peer_loop(
 					for handler in handlers.iter() {
 						// The check below guarantees that message is always `Some`: we are still
 						// inside the loop.
-						message = handler.handle(message.unwrap(), &mut to_network_interface);
+						message = handler.handle(message.unwrap(), &mut to_network_interface).await;
 						if message.is_none() {
 							break
 						}
@@ -613,6 +617,7 @@ async fn emulated_peer_loop(
 }
 
 /// Creates a new peer emulator task and returns a handle to it.
+#[allow(clippy::too_many_arguments)]
 pub fn new_peer(
 	bandwidth: usize,
 	spawn_task_handle: SpawnTaskHandle,
@@ -621,6 +626,7 @@ pub fn new_peer(
 	to_network_interface: UnboundedSender<NetworkMessage>,
 	latency_ms: usize,
 	peer_id: PeerId,
+	authority_id: AuthorityDiscoveryId,
 ) -> EmulatedPeerHandle {
 	let (messages_tx, messages_rx) = mpsc::unbounded::<NetworkMessage>();
 	let (actions_tx, actions_rx) = mpsc::unbounded::<NetworkMessage>();
@@ -649,7 +655,7 @@ pub fn new_peer(
 		.boxed(),
 	);
 
-	EmulatedPeerHandle { messages_tx, actions_tx, peer_id }
+	EmulatedPeerHandle { messages_tx, actions_tx, peer_id, authority_id }
 }
 
 /// Book keeping of sent and received bytes.
@@ -714,6 +720,18 @@ impl Peer {
 			Peer::Disconnected(ref emulator) => emulator,
 		}
 	}
+
+	pub fn authority_id(&self) -> AuthorityDiscoveryId {
+		match self {
+			Peer::Connected(handle) | Peer::Disconnected(handle) => handle.authority_id.clone(),
+		}
+	}
+
+	pub fn peer_id(&self) -> PeerId {
+		match self {
+			Peer::Connected(handle) | Peer::Disconnected(handle) => handle.peer_id,
+		}
+	}
 }
 
 /// A ha emulated network implementation.
@@ -728,21 +746,34 @@ pub struct NetworkEmulatorHandle {
 }
 
 impl NetworkEmulatorHandle {
+	pub fn generate_statement_distribution_peer_view_change(&self, view: View) -> Vec<AllMessages> {
+		self.peers
+			.iter()
+			.filter(|peer| peer.is_connected())
+			.map(|peer| {
+				AllMessages::StatementDistribution(
+					StatementDistributionMessage::NetworkBridgeUpdate(
+						NetworkBridgeEvent::PeerViewChange(peer.peer_id(), view.clone()),
+					),
+				)
+			})
+			.collect_vec()
+	}
+
 	/// Generates peer_connected messages for all peers in `test_authorities`
-	pub fn generate_peer_connected(&self) -> Vec<AllMessages> {
+	pub fn generate_peer_connected<F, T>(&self, mapper: F) -> Vec<AllMessages>
+	where
+		F: Fn(NetworkBridgeEvent<T>) -> AllMessages,
+	{
 		self.peers
 			.iter()
 			.filter(|peer| peer.is_connected())
 			.map(|peer| {
-				let network = NetworkBridgeEvent::PeerConnected(
+				mapper(NetworkBridgeEvent::PeerConnected(
 					peer.handle().peer_id,
-					ObservedRole::Full,
-					ProtocolVersion::from(ValidationVersion::V3),
-					None,
-				);
-
-				AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(
-					network,
+					ObservedRole::Authority,
+					ValidationVersion::V3.into(),
+					Some(vec![peer.authority_id()].into_iter().collect()),
 				))
 			})
 			.collect_vec()
@@ -772,7 +803,7 @@ pub fn new_network(
 	let (stats, mut peers): (_, Vec<_>) = (0..n_peers)
 		.zip(authorities.validator_authority_id.clone())
 		.map(|(peer_index, authority_id)| {
-			validator_authority_id_mapping.insert(authority_id, peer_index);
+			validator_authority_id_mapping.insert(authority_id.clone(), peer_index);
 			let stats = Arc::new(PeerEmulatorStats::new(peer_index, metrics.clone()));
 			(
 				stats.clone(),
@@ -784,6 +815,7 @@ pub fn new_network(
 					to_network_interface.clone(),
 					random_latency(config.latency.as_ref()),
 					*authorities.peer_ids.get(peer_index).unwrap(),
+					authority_id,
 				)),
 			)
 		})
@@ -971,6 +1003,8 @@ impl Metrics {
 pub trait RequestExt {
 	/// Get the authority id if any from the request.
 	fn authority_id(&self) -> Option<&AuthorityDiscoveryId>;
+	/// Get the peer id if any from the request.
+	fn peer_id(&self) -> Option<&PeerId>;
 	/// Consume self and return the response sender.
 	fn into_response_sender(self) -> ResponseSender;
 	/// Allows to change the `ResponseSender` in place.
@@ -996,12 +1030,26 @@ impl RequestExt for Requests {
 					None
 				}
 			},
+			// Requested by PeerId
+			Requests::AttestedCandidateV2(_) => None,
 			request => {
 				unimplemented!("RequestAuthority not implemented for {:?}", request)
 			},
 		}
 	}
 
+	fn peer_id(&self) -> Option<&PeerId> {
+		match self {
+			Requests::AttestedCandidateV2(request) => match &request.peer {
+				Recipient::Authority(_) => None,
+				Recipient::Peer(peer_id) => Some(peer_id),
+			},
+			request => {
+				unimplemented!("peer_id() is not implemented for {:?}", request)
+			},
+		}
+	}
+
 	fn into_response_sender(self) -> ResponseSender {
 		match self {
 			Requests::ChunkFetchingV1(outgoing_request) => outgoing_request.pending_response,
@@ -1018,6 +1066,8 @@ impl RequestExt for Requests {
 				std::mem::replace(&mut outgoing_request.pending_response, new_sender),
 			Requests::AvailableDataFetchingV1(outgoing_request) =>
 				std::mem::replace(&mut outgoing_request.pending_response, new_sender),
+			Requests::AttestedCandidateV2(outgoing_request) =>
+				std::mem::replace(&mut outgoing_request.pending_response, new_sender),
 			_ => unimplemented!("unsupported request type"),
 		}
 	}
@@ -1028,6 +1078,8 @@ impl RequestExt for Requests {
 			Requests::ChunkFetchingV1(outgoing_request) => outgoing_request.payload.encoded_size(),
 			Requests::AvailableDataFetchingV1(outgoing_request) =>
 				outgoing_request.payload.encoded_size(),
+			Requests::AttestedCandidateV2(outgoing_request) =>
+				outgoing_request.payload.encoded_size(),
 			_ => unimplemented!("received an unexpected request"),
 		}
 	}
diff --git a/polkadot/node/subsystem-bench/src/lib/statement/mod.rs b/polkadot/node/subsystem-bench/src/lib/statement/mod.rs
new file mode 100644
index 00000000000..508dd9179f7
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/lib/statement/mod.rs
@@ -0,0 +1,450 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use crate::{
+	configuration::TestAuthorities,
+	dummy_builder,
+	environment::{TestEnvironment, TestEnvironmentDependencies, GENESIS_HASH},
+	mock::{
+		candidate_backing::MockCandidateBacking,
+		chain_api::{ChainApiState, MockChainApi},
+		network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx},
+		prospective_parachains::MockProspectiveParachains,
+		runtime_api::{MockRuntimeApi, MockRuntimeApiCoreState},
+		AlwaysSupportsParachains,
+	},
+	network::{new_network, NetworkEmulatorHandle, NetworkInterface, NetworkInterfaceReceiver},
+	usage::BenchmarkUsage,
+	NODE_UNDER_TEST,
+};
+use bitvec::vec::BitVec;
+use colored::Colorize;
+use itertools::Itertools;
+use polkadot_node_metrics::metrics::Metrics;
+use polkadot_node_network_protocol::{
+	grid_topology::{SessionGridTopology, TopologyPeerInfo},
+	request_response::{IncomingRequest, ReqProtocolNames},
+	v3::{self, BackedCandidateManifest, StatementFilter},
+	view, Versioned, View,
+};
+use polkadot_node_subsystem::messages::{
+	network_bridge_event::NewGossipTopology, AllMessages, NetworkBridgeEvent,
+	StatementDistributionMessage,
+};
+use polkadot_overseer::{
+	Handle as OverseerHandle, Overseer, OverseerConnector, OverseerMetrics, SpawnGlue,
+};
+use polkadot_primitives::{
+	AuthorityDiscoveryId, Block, GroupIndex, Hash, Id, ValidatorId, ValidatorIndex,
+};
+use polkadot_statement_distribution::StatementDistributionSubsystem;
+use rand::SeedableRng;
+use sc_keystore::LocalKeystore;
+use sc_network::request_responses::ProtocolConfig;
+use sc_network_types::PeerId;
+use sc_service::SpawnTaskHandle;
+use sp_keystore::{Keystore, KeystorePtr};
+use sp_runtime::RuntimeAppPublic;
+use std::{
+	sync::{atomic::Ordering, Arc},
+	time::{Duration, Instant},
+};
+pub use test_state::TestState;
+
+mod test_state;
+
+const LOG_TARGET: &str = "subsystem-bench::statement";
+
+pub fn make_keystore() -> KeystorePtr {
+	let keystore: KeystorePtr = Arc::new(LocalKeystore::in_memory());
+	Keystore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some("//Node0"))
+		.expect("Insert key into keystore");
+	Keystore::sr25519_generate_new(&*keystore, AuthorityDiscoveryId::ID, Some("//Node0"))
+		.expect("Insert key into keystore");
+	keystore
+}
+
+fn build_overseer(
+	state: &TestState,
+	network: NetworkEmulatorHandle,
+	network_interface: NetworkInterface,
+	network_receiver: NetworkInterfaceReceiver,
+	dependencies: &TestEnvironmentDependencies,
+) -> (
+	Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>,
+	OverseerHandle,
+	Vec<ProtocolConfig>,
+) {
+	let overseer_connector = OverseerConnector::with_event_capacity(64000);
+	let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
+	let spawn_task_handle = dependencies.task_manager.spawn_handle();
+	let mock_runtime_api = MockRuntimeApi::new(
+		state.config.clone(),
+		state.test_authorities.clone(),
+		state.candidate_receipts.clone(),
+		Default::default(),
+		Default::default(),
+		0,
+		MockRuntimeApiCoreState::Scheduled,
+	);
+	let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() };
+	let mock_chain_api = MockChainApi::new(chain_api_state);
+	let mock_prospective_parachains = MockProspectiveParachains::new();
+	let mock_candidate_backing = MockCandidateBacking::new(
+		state.config.clone(),
+		state
+			.test_authorities
+			.validator_pairs
+			.get(NODE_UNDER_TEST as usize)
+			.unwrap()
+			.clone(),
+		state.pvd.clone(),
+		state.own_backing_group.clone(),
+	);
+	let (statement_req_receiver, statement_req_cfg) = IncomingRequest::get_config_receiver::<
+		Block,
+		sc_network::NetworkWorker<Block, Hash>,
+	>(&ReqProtocolNames::new(GENESIS_HASH, None));
+	let (candidate_req_receiver, candidate_req_cfg) = IncomingRequest::get_config_receiver::<
+		Block,
+		sc_network::NetworkWorker<Block, Hash>,
+	>(&ReqProtocolNames::new(GENESIS_HASH, None));
+	let keystore = make_keystore();
+	let subsystem = StatementDistributionSubsystem::new(
+		keystore.clone(),
+		statement_req_receiver,
+		candidate_req_receiver,
+		Metrics::try_register(&dependencies.registry).unwrap(),
+		rand::rngs::StdRng::from_entropy(),
+	);
+	let network_bridge_tx = MockNetworkBridgeTx::new(
+		network,
+		network_interface.subsystem_sender(),
+		state.test_authorities.clone(),
+	);
+	let network_bridge_rx = MockNetworkBridgeRx::new(network_receiver, Some(candidate_req_cfg));
+
+	let dummy = dummy_builder!(spawn_task_handle, overseer_metrics)
+		.replace_runtime_api(|_| mock_runtime_api)
+		.replace_chain_api(|_| mock_chain_api)
+		.replace_prospective_parachains(|_| mock_prospective_parachains)
+		.replace_candidate_backing(|_| mock_candidate_backing)
+		.replace_statement_distribution(|_| subsystem)
+		.replace_network_bridge_tx(|_| network_bridge_tx)
+		.replace_network_bridge_rx(|_| network_bridge_rx);
+	let (overseer, raw_handle) = dummy.build_with_connector(overseer_connector).unwrap();
+	let overseer_handle = OverseerHandle::new(raw_handle);
+
+	(overseer, overseer_handle, vec![statement_req_cfg])
+}
+
+pub fn prepare_test(
+	state: &TestState,
+	with_prometheus_endpoint: bool,
+) -> (TestEnvironment, Vec<ProtocolConfig>) {
+	let dependencies = TestEnvironmentDependencies::default();
+	let (network, network_interface, network_receiver) = new_network(
+		&state.config,
+		&dependencies,
+		&state.test_authorities,
+		vec![Arc::new(state.clone())],
+	);
+	let (overseer, overseer_handle, cfg) =
+		build_overseer(state, network.clone(), network_interface, network_receiver, &dependencies);
+
+	(
+		TestEnvironment::new(
+			dependencies,
+			state.config.clone(),
+			network,
+			overseer,
+			overseer_handle,
+			state.test_authorities.clone(),
+			with_prometheus_endpoint,
+		),
+		cfg,
+	)
+}
+
+pub fn generate_peer_view_change(block_hash: Hash, peer_id: PeerId) -> AllMessages {
+	let network = NetworkBridgeEvent::PeerViewChange(peer_id, View::new([block_hash], 0));
+
+	AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(network))
+}
+
+pub fn generate_new_session_topology(
+	topology: &SessionGridTopology,
+	test_node: ValidatorIndex,
+) -> Vec<AllMessages> {
+	let event = NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
+		session: 0,
+		topology: topology.clone(),
+		local_index: Some(test_node),
+	});
+	vec![AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(
+		event,
+	))]
+}
+
+/// Generates a topology to be used for this benchmark.
+pub fn generate_topology(test_authorities: &TestAuthorities) -> SessionGridTopology {
+	let keyrings = test_authorities
+		.validator_authority_id
+		.clone()
+		.into_iter()
+		.zip(test_authorities.peer_ids.clone())
+		.collect_vec();
+
+	let topology = keyrings
+		.clone()
+		.into_iter()
+		.enumerate()
+		.map(|(index, (discovery_id, peer_id))| TopologyPeerInfo {
+			peer_ids: vec![peer_id],
+			validator_index: ValidatorIndex(index as u32),
+			discovery_id,
+		})
+		.collect_vec();
+	let shuffled = (0..keyrings.len()).collect_vec();
+
+	SessionGridTopology::new(shuffled, topology)
+}
+
+pub async fn benchmark_statement_distribution(
+	benchmark_name: &str,
+	env: &mut TestEnvironment,
+	state: &TestState,
+) -> BenchmarkUsage {
+	state.reset_trackers();
+
+	let connected_validators = state
+		.test_authorities
+		.validator_authority_id
+		.iter()
+		.enumerate()
+		.filter_map(|(i, id)| if env.network().is_peer_connected(id) { Some(i) } else { None })
+		.collect_vec();
+	let seconding_validator_in_own_backing_group = state
+		.own_backing_group
+		.iter()
+		.find(|v| connected_validators.contains(&(v.0 as usize)))
+		.unwrap()
+		.to_owned();
+
+	let config = env.config().clone();
+	let groups = state.session_info.validator_groups.clone();
+	let own_backing_group_index = groups
+		.iter()
+		.position(|group| group.iter().any(|v| v.0 == NODE_UNDER_TEST))
+		.unwrap();
+
+	env.metrics().set_n_validators(config.n_validators);
+	env.metrics().set_n_cores(config.n_cores);
+
+	let topology = generate_topology(&state.test_authorities);
+	let peer_connected_messages = env.network().generate_peer_connected(|e| {
+		AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(e))
+	});
+	let new_session_topology_messages =
+		generate_new_session_topology(&topology, ValidatorIndex(NODE_UNDER_TEST));
+	for message in peer_connected_messages.into_iter().chain(new_session_topology_messages) {
+		env.send_message(message).await;
+	}
+
+	let test_start = Instant::now();
+	let mut candidates_advertised = 0;
+	for block_info in state.block_infos.iter() {
+		let block_num = block_info.number as usize;
+		gum::info!(target: LOG_TARGET, "Current block {}/{} {:?}", block_num, config.num_blocks, block_info.hash);
+		env.metrics().set_current_block(block_num);
+		env.import_block(block_info.clone()).await;
+
+		for peer_view_change in env
+			.network()
+			.generate_statement_distribution_peer_view_change(view![block_info.hash])
+		{
+			env.send_message(peer_view_change).await;
+		}
+
+		let seconding_peer_id = *state
+			.test_authorities
+			.peer_ids
+			.get(seconding_validator_in_own_backing_group.0 as usize)
+			.unwrap();
+		let candidate = state.candidate_receipts.get(&block_info.hash).unwrap().first().unwrap();
+		let candidate_hash = candidate.hash();
+		let statement = state
+			.statements
+			.get(&candidate_hash)
+			.unwrap()
+			.get(seconding_validator_in_own_backing_group.0 as usize)
+			.unwrap()
+			.clone();
+		let message = AllMessages::StatementDistribution(
+			StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
+				seconding_peer_id,
+				Versioned::V3(v3::StatementDistributionMessage::Statement(
+					block_info.hash,
+					statement,
+				)),
+			)),
+		);
+		env.send_message(message).await;
+
+		let max_messages_per_candidate = state.config.max_candidate_depth + 1;
+		// One was just sent for the own backing group
+		let mut messages_tracker = (0..groups.len())
+			.map(|i| if i == own_backing_group_index { max_messages_per_candidate } else { 0 })
+			.collect_vec();
+
+		let neighbors =
+			topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap();
+		let connected_neighbors_x = neighbors
+			.validator_indices_x
+			.iter()
+			.filter(|&v| connected_validators.contains(&(v.0 as usize)))
+			.cloned()
+			.collect_vec();
+		let connected_neighbors_y = neighbors
+			.validator_indices_y
+			.iter()
+			.filter(|&v| connected_validators.contains(&(v.0 as usize)))
+			.cloned()
+			.collect_vec();
+		let one_hop_peers_and_groups = connected_neighbors_x
+			.iter()
+			.chain(connected_neighbors_y.iter())
+			.map(|validator_index| {
+				let peer_id =
+					*state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap();
+				let group_index =
+					groups.iter().position(|group| group.contains(validator_index)).unwrap();
+				(peer_id, group_index)
+			})
+			.collect_vec();
+		let two_hop_x_peers_and_groups = connected_neighbors_x
+			.iter()
+			.flat_map(|validator_index| {
+				let peer_id =
+					*state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap();
+				topology
+					.compute_grid_neighbors_for(*validator_index)
+					.unwrap()
+					.validator_indices_y
+					.iter()
+					.map(|validator_neighbor| {
+						let group_index = groups
+							.iter()
+							.position(|group| group.contains(validator_neighbor))
+							.unwrap();
+						(peer_id, group_index)
+					})
+					.collect_vec()
+			})
+			.collect_vec();
+		let two_hop_y_peers_and_groups = connected_neighbors_y
+			.iter()
+			.flat_map(|validator_index| {
+				let peer_id =
+					*state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap();
+				topology
+					.compute_grid_neighbors_for(*validator_index)
+					.unwrap()
+					.validator_indices_x
+					.iter()
+					.map(|validator_neighbor| {
+						let group_index = groups
+							.iter()
+							.position(|group| group.contains(validator_neighbor))
+							.unwrap();
+						(peer_id, group_index)
+					})
+					.collect_vec()
+			})
+			.collect_vec();
+
+		for (seconding_peer_id, group_index) in one_hop_peers_and_groups
+			.into_iter()
+			.chain(two_hop_x_peers_and_groups)
+			.chain(two_hop_y_peers_and_groups)
+		{
+			let messages_sent_count = messages_tracker.get_mut(group_index).unwrap();
+			if *messages_sent_count == max_messages_per_candidate {
+				continue
+			}
+			*messages_sent_count += 1;
+
+			let candidate_hash = state
+				.candidate_receipts
+				.get(&block_info.hash)
+				.unwrap()
+				.get(group_index)
+				.unwrap()
+				.hash();
+			let manifest = BackedCandidateManifest {
+				relay_parent: block_info.hash,
+				candidate_hash,
+				group_index: GroupIndex(group_index as u32),
+				para_id: Id::new(group_index as u32 + 1),
+				parent_head_data_hash: state.pvd.parent_head.hash(),
+				statement_knowledge: StatementFilter {
+					seconded_in_group: BitVec::from_iter(
+						groups.get(GroupIndex(group_index as u32)).unwrap().iter().map(|_| true),
+					),
+					validated_in_group: BitVec::from_iter(
+						groups.get(GroupIndex(group_index as u32)).unwrap().iter().map(|_| false),
+					),
+				},
+			};
+			let message = AllMessages::StatementDistribution(
+				StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
+					seconding_peer_id,
+					Versioned::V3(v3::StatementDistributionMessage::BackedCandidateManifest(
+						manifest,
+					)),
+				)),
+			);
+			env.send_message(message).await;
+		}
+
+		candidates_advertised += messages_tracker.iter().filter(|&&v| v > 0).collect_vec().len();
+
+		loop {
+			let manifests_count = state
+				.manifests_tracker
+				.values()
+				.filter(|v| v.load(Ordering::SeqCst))
+				.collect::<Vec<_>>()
+				.len();
+			gum::debug!(target: LOG_TARGET, "{}/{} manifest exchanges", manifests_count, candidates_advertised);
+
+			if manifests_count == candidates_advertised {
+				break;
+			}
+			tokio::time::sleep(Duration::from_millis(50)).await;
+		}
+	}
+
+	let duration: u128 = test_start.elapsed().as_millis();
+	gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{:?}ms", duration).cyan());
+	gum::info!(target: LOG_TARGET,
+		"Avg block time: {}",
+		format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
+	);
+
+	env.stop().await;
+	env.collect_resource_usage(benchmark_name, &["statement-distribution"])
+}
diff --git a/polkadot/node/subsystem-bench/src/lib/statement/test_state.rs b/polkadot/node/subsystem-bench/src/lib/statement/test_state.rs
new file mode 100644
index 00000000000..b8ea64c7e33
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/lib/statement/test_state.rs
@@ -0,0 +1,436 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use crate::{
+	configuration::{TestAuthorities, TestConfiguration},
+	mock::runtime_api::session_info_for_peers,
+	network::{HandleNetworkMessage, NetworkMessage},
+	NODE_UNDER_TEST,
+};
+use bitvec::vec::BitVec;
+use futures::channel::oneshot;
+use itertools::Itertools;
+use parity_scale_codec::{Decode, Encode};
+use polkadot_node_network_protocol::{
+	request_response::{
+		v2::{AttestedCandidateRequest, AttestedCandidateResponse},
+		Requests,
+	},
+	v3::{
+		BackedCandidateAcknowledgement, StatementDistributionMessage, StatementFilter,
+		ValidationProtocol,
+	},
+	Versioned,
+};
+use polkadot_node_primitives::{AvailableData, BlockData, PoV};
+use polkadot_node_subsystem_test_helpers::{
+	derive_erasure_chunks_with_proofs_and_root, mock::new_block_import_info,
+};
+use polkadot_overseer::BlockInfo;
+use polkadot_primitives::{
+	BlockNumber, CandidateHash, CandidateReceipt, CommittedCandidateReceipt, CompactStatement,
+	Hash, Header, Id, PersistedValidationData, SessionInfo, SignedStatement, SigningContext,
+	UncheckedSigned, ValidatorIndex, ValidatorPair,
+};
+use polkadot_primitives_test_helpers::{
+	dummy_committed_candidate_receipt, dummy_hash, dummy_head_data, dummy_pvd,
+};
+use sc_network::{config::IncomingRequest, ProtocolName};
+use sp_core::{Pair, H256};
+use std::{
+	collections::HashMap,
+	sync::{
+		atomic::{AtomicBool, Ordering},
+		Arc,
+	},
+};
+
+#[derive(Clone)]
+pub struct TestState {
+	// Full test config
+	pub config: TestConfiguration,
+	// Authority keys for the network emulation.
+	pub test_authorities: TestAuthorities,
+	// Relay chain block infos
+	pub block_infos: Vec<BlockInfo>,
+	// Map from generated candidate receipts
+	pub candidate_receipts: HashMap<H256, Vec<CandidateReceipt>>,
+	// Map from generated commited candidate receipts
+	pub commited_candidate_receipts: HashMap<H256, Vec<CommittedCandidateReceipt>>,
+	// PersistedValidationData, we use one for all candidates
+	pub pvd: PersistedValidationData,
+	// Relay chain block headers
+	pub block_headers: HashMap<H256, Header>,
+	// Session info
+	pub session_info: SessionInfo,
+	// Pregenerated statements
+	pub statements: HashMap<CandidateHash, Vec<UncheckedSigned<CompactStatement>>>,
+	// Indices in the backing group where the node under test is
+	pub own_backing_group: Vec<ValidatorIndex>,
+	// Tracks how many statements we received for a candidates
+	pub statements_tracker: HashMap<CandidateHash, Vec<Arc<AtomicBool>>>,
+	// Tracks if manifest exchange happened
+	pub manifests_tracker: HashMap<CandidateHash, Arc<AtomicBool>>,
+}
+
+impl TestState {
+	pub fn new(config: &TestConfiguration) -> Self {
+		let test_authorities = config.generate_authorities();
+		let session_info = session_info_for_peers(config, &test_authorities);
+		let own_backing_group = session_info
+			.validator_groups
+			.iter()
+			.find(|g| g.contains(&ValidatorIndex(NODE_UNDER_TEST)))
+			.unwrap()
+			.clone();
+		let mut state = Self {
+			config: config.clone(),
+			test_authorities,
+			block_infos: (1..=config.num_blocks).map(generate_block_info).collect(),
+			candidate_receipts: Default::default(),
+			commited_candidate_receipts: Default::default(),
+			pvd: dummy_pvd(dummy_head_data(), 0),
+			block_headers: Default::default(),
+			statements_tracker: Default::default(),
+			manifests_tracker: Default::default(),
+			session_info,
+			own_backing_group,
+			statements: Default::default(),
+		};
+
+		state.block_headers = state.block_infos.iter().map(generate_block_header).collect();
+
+		// For each unique pov we create a candidate receipt.
+		let pov_sizes = Vec::from(config.pov_sizes()); // For n_cores
+		let pov_size_to_candidate = generate_pov_size_to_candidate(&pov_sizes);
+		let receipt_templates =
+			generate_receipt_templates(&pov_size_to_candidate, config.n_validators, &state.pvd);
+
+		for block_info in state.block_infos.iter() {
+			for core_idx in 0..config.n_cores {
+				let pov_size = pov_sizes.get(core_idx).expect("This is a cycle; qed");
+				let candidate_index =
+					*pov_size_to_candidate.get(pov_size).expect("pov_size always exists; qed");
+				let mut receipt = receipt_templates[candidate_index].clone();
+				receipt.descriptor.para_id = Id::new(core_idx as u32 + 1);
+				receipt.descriptor.relay_parent = block_info.hash;
+
+				state.candidate_receipts.entry(block_info.hash).or_default().push(
+					CandidateReceipt {
+						descriptor: receipt.descriptor.clone(),
+						commitments_hash: receipt.commitments.hash(),
+					},
+				);
+				state.statements_tracker.entry(receipt.hash()).or_default().extend(
+					(0..config.n_validators)
+						.map(|_| Arc::new(AtomicBool::new(false)))
+						.collect_vec(),
+				);
+				state.manifests_tracker.insert(receipt.hash(), Arc::new(AtomicBool::new(false)));
+				state
+					.commited_candidate_receipts
+					.entry(block_info.hash)
+					.or_default()
+					.push(receipt);
+			}
+		}
+
+		let groups = state.session_info.validator_groups.clone();
+
+		for block_info in state.block_infos.iter() {
+			for (index, group) in groups.iter().enumerate() {
+				let candidate =
+					state.candidate_receipts.get(&block_info.hash).unwrap().get(index).unwrap();
+				let statements = group
+					.iter()
+					.map(|&v| {
+						sign_statement(
+							CompactStatement::Seconded(candidate.hash()),
+							block_info.hash,
+							v,
+							state.test_authorities.validator_pairs.get(v.0 as usize).unwrap(),
+						)
+					})
+					.collect_vec();
+				state.statements.insert(candidate.hash(), statements);
+			}
+		}
+
+		state
+	}
+
+	pub fn reset_trackers(&self) {
+		self.statements_tracker.values().for_each(|v| {
+			v.iter()
+				.enumerate()
+				.for_each(|(index, v)| v.as_ref().store(index <= 1, Ordering::SeqCst))
+		});
+		self.manifests_tracker
+			.values()
+			.for_each(|v| v.as_ref().store(false, Ordering::SeqCst));
+	}
+}
+
+fn sign_statement(
+	statement: CompactStatement,
+	relay_parent: H256,
+	validator_index: ValidatorIndex,
+	pair: &ValidatorPair,
+) -> UncheckedSigned<CompactStatement> {
+	let context = SigningContext { parent_hash: relay_parent, session_index: 0 };
+	let payload = statement.signing_payload(&context);
+
+	SignedStatement::new(
+		statement,
+		validator_index,
+		pair.sign(&payload[..]),
+		&context,
+		&pair.public(),
+	)
+	.unwrap()
+	.as_unchecked()
+	.to_owned()
+}
+
+fn generate_block_info(block_num: usize) -> BlockInfo {
+	new_block_import_info(Hash::repeat_byte(block_num as u8), block_num as BlockNumber)
+}
+
+fn generate_block_header(info: &BlockInfo) -> (H256, Header) {
+	(
+		info.hash,
+		Header {
+			digest: Default::default(),
+			number: info.number,
+			parent_hash: info.parent_hash,
+			extrinsics_root: Default::default(),
+			state_root: Default::default(),
+		},
+	)
+}
+
+fn generate_pov_size_to_candidate(pov_sizes: &[usize]) -> HashMap<usize, usize> {
+	pov_sizes
+		.iter()
+		.cloned()
+		.unique()
+		.enumerate()
+		.map(|(index, pov_size)| (pov_size, index))
+		.collect()
+}
+
+fn generate_receipt_templates(
+	pov_size_to_candidate: &HashMap<usize, usize>,
+	n_validators: usize,
+	pvd: &PersistedValidationData,
+) -> Vec<CommittedCandidateReceipt> {
+	pov_size_to_candidate
+		.iter()
+		.map(|(&pov_size, &index)| {
+			let mut receipt = dummy_committed_candidate_receipt(dummy_hash());
+			let (_, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
+				n_validators,
+				&AvailableData {
+					validation_data: pvd.clone(),
+					pov: Arc::new(PoV { block_data: BlockData(vec![index as u8; pov_size]) }),
+				},
+				|_, _| {},
+			);
+			receipt.descriptor.persisted_validation_data_hash = pvd.hash();
+			receipt.descriptor.erasure_root = erasure_root;
+			receipt
+		})
+		.collect()
+}
+
+#[async_trait::async_trait]
+impl HandleNetworkMessage for TestState {
+	async fn handle(
+		&self,
+		message: NetworkMessage,
+		node_sender: &mut futures::channel::mpsc::UnboundedSender<NetworkMessage>,
+	) -> Option<NetworkMessage> {
+		match message {
+			NetworkMessage::RequestFromNode(_authority_id, Requests::AttestedCandidateV2(req)) => {
+				let payload = req.payload;
+				let candidate_receipt = self
+					.commited_candidate_receipts
+					.values()
+					.flatten()
+					.find(|v| v.hash() == payload.candidate_hash)
+					.unwrap()
+					.clone();
+				let persisted_validation_data = self.pvd.clone();
+				let statements = self.statements.get(&payload.candidate_hash).unwrap().clone();
+				let res = AttestedCandidateResponse {
+					candidate_receipt,
+					persisted_validation_data,
+					statements,
+				};
+				let _ = req.pending_response.send(Ok((res.encode(), ProtocolName::from(""))));
+				None
+			},
+			NetworkMessage::MessageFromNode(
+				authority_id,
+				Versioned::V3(ValidationProtocol::StatementDistribution(
+					StatementDistributionMessage::Statement(relay_parent, statement),
+				)),
+			) => {
+				let index = self
+					.test_authorities
+					.validator_authority_id
+					.iter()
+					.position(|v| v == &authority_id)
+					.unwrap();
+				let candidate_hash = *statement.unchecked_payload().candidate_hash();
+
+				let statements_sent_count = self
+					.statements_tracker
+					.get(&candidate_hash)
+					.unwrap()
+					.get(index)
+					.unwrap()
+					.as_ref();
+				if statements_sent_count.load(Ordering::SeqCst) {
+					return None
+				} else {
+					statements_sent_count.store(true, Ordering::SeqCst);
+				}
+
+				let group_statements = self.statements.get(&candidate_hash).unwrap();
+				if !group_statements.iter().any(|s| s.unchecked_validator_index().0 == index as u32)
+				{
+					return None
+				}
+
+				let statement = CompactStatement::Valid(candidate_hash);
+				let context = SigningContext { parent_hash: relay_parent, session_index: 0 };
+				let payload = statement.signing_payload(&context);
+				let pair = self.test_authorities.validator_pairs.get(index).unwrap();
+				let signature = pair.sign(&payload[..]);
+				let statement = SignedStatement::new(
+					statement,
+					ValidatorIndex(index as u32),
+					signature,
+					&context,
+					&pair.public(),
+				)
+				.unwrap()
+				.as_unchecked()
+				.to_owned();
+
+				node_sender
+					.start_send(NetworkMessage::MessageFromPeer(
+						*self.test_authorities.peer_ids.get(index).unwrap(),
+						Versioned::V3(ValidationProtocol::StatementDistribution(
+							StatementDistributionMessage::Statement(relay_parent, statement),
+						)),
+					))
+					.unwrap();
+				None
+			},
+			NetworkMessage::MessageFromNode(
+				authority_id,
+				Versioned::V3(ValidationProtocol::StatementDistribution(
+					StatementDistributionMessage::BackedCandidateManifest(manifest),
+				)),
+			) => {
+				let index = self
+					.test_authorities
+					.validator_authority_id
+					.iter()
+					.position(|v| v == &authority_id)
+					.unwrap();
+				let backing_group =
+					self.session_info.validator_groups.get(manifest.group_index).unwrap();
+				let group_size = backing_group.len();
+				let is_own_backing_group = backing_group.contains(&ValidatorIndex(NODE_UNDER_TEST));
+				let mut seconded_in_group =
+					BitVec::from_iter((0..group_size).map(|_| !is_own_backing_group));
+				let mut validated_in_group = BitVec::from_iter((0..group_size).map(|_| false));
+
+				if is_own_backing_group {
+					let (pending_response, response_receiver) = oneshot::channel();
+					let peer_id = self.test_authorities.peer_ids.get(index).unwrap().to_owned();
+					node_sender
+						.start_send(NetworkMessage::RequestFromPeer(IncomingRequest {
+							peer: peer_id,
+							payload: AttestedCandidateRequest {
+								candidate_hash: manifest.candidate_hash,
+								mask: StatementFilter::blank(self.own_backing_group.len()),
+							}
+							.encode(),
+							pending_response,
+						}))
+						.unwrap();
+
+					let response = response_receiver.await.unwrap();
+					let response =
+						AttestedCandidateResponse::decode(&mut response.result.unwrap().as_ref())
+							.unwrap();
+
+					for statement in response.statements {
+						let validator_index = statement.unchecked_validator_index();
+						let position_in_group =
+							backing_group.iter().position(|v| *v == validator_index).unwrap();
+						match statement.unchecked_payload() {
+							CompactStatement::Seconded(_) =>
+								seconded_in_group.set(position_in_group, true),
+							CompactStatement::Valid(_) =>
+								validated_in_group.set(position_in_group, true),
+						}
+					}
+				}
+
+				let ack = BackedCandidateAcknowledgement {
+					candidate_hash: manifest.candidate_hash,
+					statement_knowledge: StatementFilter { seconded_in_group, validated_in_group },
+				};
+				node_sender
+					.start_send(NetworkMessage::MessageFromPeer(
+						*self.test_authorities.peer_ids.get(index).unwrap(),
+						Versioned::V3(ValidationProtocol::StatementDistribution(
+							StatementDistributionMessage::BackedCandidateKnown(ack),
+						)),
+					))
+					.unwrap();
+
+				self.manifests_tracker
+					.get(&manifest.candidate_hash)
+					.unwrap()
+					.as_ref()
+					.store(true, Ordering::SeqCst);
+
+				None
+			},
+			NetworkMessage::MessageFromNode(
+				_authority_id,
+				Versioned::V3(ValidationProtocol::StatementDistribution(
+					StatementDistributionMessage::BackedCandidateKnown(ack),
+				)),
+			) => {
+				self.manifests_tracker
+					.get(&ack.candidate_hash)
+					.unwrap()
+					.as_ref()
+					.store(true, Ordering::SeqCst);
+
+				None
+			},
+			_ => Some(message),
+		}
+	}
+}
-- 
GitLab