diff --git a/Cargo.lock b/Cargo.lock
index 663d8ccdd39374b7945b0da71a86a189fcf87f1d..540bcc327a3f6fef57fec31b216c17fed1226bae 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -13467,6 +13467,7 @@ version = "1.0.0"
 dependencies = [
  "assert_matches",
  "async-trait",
+ "bincode",
  "bitvec",
  "clap 4.4.18",
  "clap-num",
@@ -13475,16 +13476,19 @@ dependencies = [
  "env_logger 0.9.3",
  "futures",
  "futures-timer",
+ "hex",
  "itertools 0.11.0",
  "kvdb-memorydb",
  "log",
  "orchestra",
  "parity-scale-codec",
  "paste",
+ "polkadot-approval-distribution",
  "polkadot-availability-bitfield-distribution",
  "polkadot-availability-distribution",
  "polkadot-availability-recovery",
  "polkadot-erasure-coding",
+ "polkadot-node-core-approval-voting",
  "polkadot-node-core-av-store",
  "polkadot-node-core-chain-api",
  "polkadot-node-metrics",
@@ -13501,17 +13505,24 @@ dependencies = [
  "pyroscope",
  "pyroscope_pprofrs",
  "rand",
+ "rand_chacha 0.3.1",
+ "rand_core 0.6.4",
  "rand_distr",
  "sc-keystore",
  "sc-network",
  "sc-service",
+ "schnorrkel 0.9.1",
  "serde",
  "serde_yaml",
+ "sha1",
  "sp-application-crypto",
  "sp-consensus",
+ "sp-consensus-babe",
  "sp-core",
  "sp-keyring",
  "sp-keystore",
+ "sp-runtime",
+ "sp-timestamp",
  "substrate-prometheus-endpoint",
  "tokio",
  "tracing-gum",
@@ -17208,9 +17219,9 @@ dependencies = [
 
 [[package]]
 name = "sha1"
-version = "0.10.5"
+version = "0.10.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
+checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
 dependencies = [
  "cfg-if",
  "cpufeatures",
diff --git a/polkadot/node/core/approval-voting/src/criteria.rs b/polkadot/node/core/approval-voting/src/criteria.rs
index 1af61e72d7affa81744d8fcdb48ed5b1e80ae4d6..1ebea2641b62198c291951448c8c552d9cfb34cf 100644
--- a/polkadot/node/core/approval-voting/src/criteria.rs
+++ b/polkadot/node/core/approval-voting/src/criteria.rs
@@ -55,11 +55,11 @@ pub struct OurAssignment {
 }
 
 impl OurAssignment {
-	pub(crate) fn cert(&self) -> &AssignmentCertV2 {
+	pub fn cert(&self) -> &AssignmentCertV2 {
 		&self.cert
 	}
 
-	pub(crate) fn tranche(&self) -> DelayTranche {
+	pub fn tranche(&self) -> DelayTranche {
 		self.tranche
 	}
 
@@ -225,7 +225,7 @@ fn assigned_core_transcript(core_index: CoreIndex) -> Transcript {
 
 /// Information about the world assignments are being produced in.
 #[derive(Clone, Debug)]
-pub(crate) struct Config {
+pub struct Config {
 	/// The assignment public keys for validators.
 	assignment_keys: Vec<AssignmentId>,
 	/// The groups of validators assigned to each core.
@@ -321,7 +321,7 @@ impl AssignmentCriteria for RealAssignmentCriteria {
 /// different times. The idea is that most assignments are never triggered and fall by the wayside.
 ///
 /// This will not assign to anything the local validator was part of the backing group for.
-pub(crate) fn compute_assignments(
+pub fn compute_assignments(
 	keystore: &LocalKeystore,
 	relay_vrf_story: RelayVRFStory,
 	config: &Config,
diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs
index 3161d6186a1e64192951b5357ff22ef78a9d284f..456ae319787b07740bb0817dcfbe260500eb9dd8 100644
--- a/polkadot/node/core/approval-voting/src/lib.rs
+++ b/polkadot/node/core/approval-voting/src/lib.rs
@@ -92,11 +92,11 @@ use time::{slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClo
 mod approval_checking;
 pub mod approval_db;
 mod backend;
-mod criteria;
+pub mod criteria;
 mod import;
 mod ops;
 mod persisted_entries;
-mod time;
+pub mod time;
 
 use crate::{
 	approval_checking::{Check, TranchesToApproveResult},
@@ -159,6 +159,7 @@ pub struct ApprovalVotingSubsystem {
 	db: Arc<dyn Database>,
 	mode: Mode,
 	metrics: Metrics,
+	clock: Box<dyn Clock + Send + Sync>,
 }
 
 #[derive(Clone)]
@@ -444,6 +445,25 @@ impl ApprovalVotingSubsystem {
 		keystore: Arc<LocalKeystore>,
 		sync_oracle: Box<dyn SyncOracle + Send>,
 		metrics: Metrics,
+	) -> Self {
+		ApprovalVotingSubsystem::with_config_and_clock(
+			config,
+			db,
+			keystore,
+			sync_oracle,
+			metrics,
+			Box::new(SystemClock {}),
+		)
+	}
+
+	/// Create a new approval voting subsystem with the given keystore, config, and database.
+	pub fn with_config_and_clock(
+		config: Config,
+		db: Arc<dyn Database>,
+		keystore: Arc<LocalKeystore>,
+		sync_oracle: Box<dyn SyncOracle + Send>,
+		metrics: Metrics,
+		clock: Box<dyn Clock + Send + Sync>,
 	) -> Self {
 		ApprovalVotingSubsystem {
 			keystore,
@@ -452,6 +472,7 @@ impl ApprovalVotingSubsystem {
 			db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
 			mode: Mode::Syncing(sync_oracle),
 			metrics,
+			clock,
 		}
 	}
 
@@ -493,15 +514,10 @@ fn db_sanity_check(db: Arc<dyn Database>, config: DatabaseConfig) -> SubsystemRe
 impl<Context: Send> ApprovalVotingSubsystem {
 	fn start(self, ctx: Context) -> SpawnedSubsystem {
 		let backend = DbBackend::new(self.db.clone(), self.db_config);
-		let future = run::<DbBackend, Context>(
-			ctx,
-			self,
-			Box::new(SystemClock),
-			Box::new(RealAssignmentCriteria),
-			backend,
-		)
-		.map_err(|e| SubsystemError::with_origin("approval-voting", e))
-		.boxed();
+		let future =
+			run::<DbBackend, Context>(ctx, self, Box::new(RealAssignmentCriteria), backend)
+				.map_err(|e| SubsystemError::with_origin("approval-voting", e))
+				.boxed();
 
 		SpawnedSubsystem { name: "approval-voting-subsystem", future }
 	}
@@ -909,7 +925,6 @@ enum Action {
 async fn run<B, Context>(
 	mut ctx: Context,
 	mut subsystem: ApprovalVotingSubsystem,
-	clock: Box<dyn Clock + Send + Sync>,
 	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
 	mut backend: B,
 ) -> SubsystemResult<()>
@@ -923,7 +938,7 @@ where
 	let mut state = State {
 		keystore: subsystem.keystore,
 		slot_duration_millis: subsystem.slot_duration_millis,
-		clock,
+		clock: subsystem.clock,
 		assignment_criteria,
 		spans: HashMap::new(),
 	};
diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs
index 7a0bde6a55e28135036dbc9b8ee8137a0485aa28..9220e84a2554a48c14468567e87e74319ce94151 100644
--- a/polkadot/node/core/approval-voting/src/tests.rs
+++ b/polkadot/node/core/approval-voting/src/tests.rs
@@ -549,7 +549,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
 
 	let subsystem = run(
 		context,
-		ApprovalVotingSubsystem::with_config(
+		ApprovalVotingSubsystem::with_config_and_clock(
 			Config {
 				col_approval_data: test_constants::TEST_CONFIG.col_approval_data,
 				slot_duration_millis: SLOT_DURATION_MILLIS,
@@ -558,8 +558,8 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
 			Arc::new(keystore),
 			sync_oracle,
 			Metrics::default(),
+			clock.clone(),
 		),
-		clock.clone(),
 		assignment_criteria,
 		backend,
 	);
diff --git a/polkadot/node/core/approval-voting/src/time.rs b/polkadot/node/core/approval-voting/src/time.rs
index 61091f3c34cdab4aed00c24bcbc8a40d6a77a116..99dfbe07678f2f0956565e92ffaf196b6f482af9 100644
--- a/polkadot/node/core/approval-voting/src/time.rs
+++ b/polkadot/node/core/approval-voting/src/time.rs
@@ -33,14 +33,14 @@ use std::{
 };
 
 use polkadot_primitives::{Hash, ValidatorIndex};
-const TICK_DURATION_MILLIS: u64 = 500;
+pub const TICK_DURATION_MILLIS: u64 = 500;
 
 /// A base unit of time, starting from the Unix epoch, split into half-second intervals.
-pub(crate) type Tick = u64;
+pub type Tick = u64;
 
 /// A clock which allows querying of the current tick as well as
 /// waiting for a tick to be reached.
-pub(crate) trait Clock {
+pub trait Clock {
 	/// Yields the current tick.
 	fn tick_now(&self) -> Tick;
 
@@ -49,7 +49,7 @@ pub(crate) trait Clock {
 }
 
 /// Extension methods for clocks.
-pub(crate) trait ClockExt {
+pub trait ClockExt {
 	fn tranche_now(&self, slot_duration_millis: u64, base_slot: Slot) -> DelayTranche;
 }
 
@@ -61,7 +61,8 @@ impl<C: Clock + ?Sized> ClockExt for C {
 }
 
 /// A clock which uses the actual underlying system clock.
-pub(crate) struct SystemClock;
+#[derive(Clone)]
+pub struct SystemClock;
 
 impl Clock for SystemClock {
 	/// Yields the current tick.
@@ -93,11 +94,22 @@ fn tick_to_time(tick: Tick) -> SystemTime {
 }
 
 /// assumes `slot_duration_millis` evenly divided by tick duration.
-pub(crate) fn slot_number_to_tick(slot_duration_millis: u64, slot: Slot) -> Tick {
+pub fn slot_number_to_tick(slot_duration_millis: u64, slot: Slot) -> Tick {
 	let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS;
 	u64::from(slot) * ticks_per_slot
 }
 
+/// Converts a tick to the slot number.
+pub fn tick_to_slot_number(slot_duration_millis: u64, tick: Tick) -> Slot {
+	let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS;
+	(tick / ticks_per_slot).into()
+}
+
+/// Converts a tranche from a slot to the tick number.
+pub fn tranche_to_tick(slot_duration_millis: u64, slot: Slot, tranche: u32) -> Tick {
+	slot_number_to_tick(slot_duration_millis, slot) + tranche as u64
+}
+
 /// A list of delayed futures that gets triggered when the waiting time has expired and it is
 /// time to sign the candidate.
 /// We have a timer per relay-chain block.
diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml
index f7866f99363125d9f905a7a18056bed52e5d33e9..136eccbf685482f05b34b8de20007c7ef548320d 100644
--- a/polkadot/node/subsystem-bench/Cargo.toml
+++ b/polkadot/node/subsystem-bench/Cargo.toml
@@ -38,6 +38,9 @@ sp-core = { path = "../../../substrate/primitives/core" }
 clap = { version = "4.4.18", features = ["derive"] }
 futures = "0.3.21"
 futures-timer = "3.0.2"
+bincode = "1.3.3"
+sha1 = "0.10.6"
+hex = "0.4.3"
 gum = { package = "tracing-gum", path = "../gum" }
 polkadot-erasure-coding = { package = "polkadot-erasure-coding", path = "../../erasure-coding" }
 log = "0.4.17"
@@ -64,6 +67,16 @@ prometheus_endpoint = { package = "substrate-prometheus-endpoint", path = "../..
 prometheus = { version = "0.13.0", default-features = false }
 serde = "1.0.195"
 serde_yaml = "0.9"
+
+polkadot-node-core-approval-voting = { path = "../core/approval-voting" }
+polkadot-approval-distribution = { path = "../network/approval-distribution" }
+sp-consensus-babe = { path = "../../../substrate/primitives/consensus/babe" }
+sp-runtime = { path = "../../../substrate/primitives/runtime", default-features = false }
+sp-timestamp = { path = "../../../substrate/primitives/timestamp" }
+
+schnorrkel = { version = "0.9.1", default-features = false }
+rand_core = "0.6.2"                                                                         # should match schnorrkel
+rand_chacha = { version = "0.3.1" }
 paste = "1.0.14"
 orchestra = { version = "0.3.5", default-features = false, features = ["futures_channel"] }
 pyroscope = "0.5.7"
diff --git a/polkadot/node/subsystem-bench/examples/approvals_no_shows.yaml b/polkadot/node/subsystem-bench/examples/approvals_no_shows.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..758c7fbbf1121a249ff3094bc74967e3ad2420da
--- /dev/null
+++ b/polkadot/node/subsystem-bench/examples/approvals_no_shows.yaml
@@ -0,0 +1,18 @@
+TestConfiguration:
+# Test 1
+- objective: !ApprovalVoting
+    last_considered_tranche: 89
+    coalesce_mean: 3.0
+    coalesce_std_dev: 1.0
+    stop_when_approved: true
+    coalesce_tranche_diff: 12
+    workdir_prefix: "/tmp/"
+    enable_assignments_v2: true
+    num_no_shows_per_candidate: 10
+  n_validators: 500
+  n_cores: 100
+  min_pov_size: 1120
+  max_pov_size: 5120
+  peer_bandwidth: 524288000000
+  bandwidth: 524288000000
+  num_blocks: 10
diff --git a/polkadot/node/subsystem-bench/examples/approvals_throughput.yaml b/polkadot/node/subsystem-bench/examples/approvals_throughput.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..9eeeefc53a4277aae51e44ee5940a2eb65111cd3
--- /dev/null
+++ b/polkadot/node/subsystem-bench/examples/approvals_throughput.yaml
@@ -0,0 +1,19 @@
+TestConfiguration:
+# Test 1
+- objective: !ApprovalVoting
+    coalesce_mean: 3.0
+    coalesce_std_dev: 1.0
+    enable_assignments_v2: true
+    last_considered_tranche: 89
+    stop_when_approved: false
+    coalesce_tranche_diff: 12
+    workdir_prefix: "/tmp"
+    num_no_shows_per_candidate: 0
+  n_validators: 500
+  n_cores: 100
+  n_included_candidates: 100
+  min_pov_size: 1120
+  max_pov_size: 5120
+  peer_bandwidth: 524288000000
+  bandwidth: 524288000000
+  num_blocks: 10
diff --git a/polkadot/node/subsystem-bench/examples/approvals_throughput_best_case.yaml b/polkadot/node/subsystem-bench/examples/approvals_throughput_best_case.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..370bb31a5c4c102174c29382ab6de70ca336278e
--- /dev/null
+++ b/polkadot/node/subsystem-bench/examples/approvals_throughput_best_case.yaml
@@ -0,0 +1,18 @@
+TestConfiguration:
+# Test 1
+- objective: !ApprovalVoting
+    coalesce_mean: 3.0
+    coalesce_std_dev: 1.0
+    enable_assignments_v2: true
+    last_considered_tranche: 89
+    stop_when_approved: true
+    coalesce_tranche_diff: 12
+    workdir_prefix: "/tmp/"
+    num_no_shows_per_candidate: 0
+  n_validators: 500
+  n_cores: 100
+  min_pov_size: 1120
+  max_pov_size: 5120
+  peer_bandwidth: 524288000000
+  bandwidth: 524288000000
+  num_blocks: 10
diff --git a/polkadot/node/subsystem-bench/examples/approvals_throughput_no_optimisations_enabled.yaml b/polkadot/node/subsystem-bench/examples/approvals_throughput_no_optimisations_enabled.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..30b9ac8dc50fec25bc4952ef3706f9878aa2bbd7
--- /dev/null
+++ b/polkadot/node/subsystem-bench/examples/approvals_throughput_no_optimisations_enabled.yaml
@@ -0,0 +1,18 @@
+TestConfiguration:
+# Test 1
+- objective: !ApprovalVoting
+    coalesce_mean: 1.0
+    coalesce_std_dev: 0.0
+    enable_assignments_v2: false
+    last_considered_tranche: 89
+    stop_when_approved: false
+    coalesce_tranche_diff: 12
+    workdir_prefix: "/tmp/"
+    num_no_shows_per_candidate: 0
+  n_validators: 500
+  n_cores: 100
+  min_pov_size: 1120
+  max_pov_size: 5120
+  peer_bandwidth: 524288000000
+  bandwidth: 524288000000
+  num_blocks: 10
diff --git a/polkadot/node/subsystem-bench/src/approval/helpers.rs b/polkadot/node/subsystem-bench/src/approval/helpers.rs
new file mode 100644
index 0000000000000000000000000000000000000000..7cbe3ba75949339646b9157452f358f479bca657
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/approval/helpers.rs
@@ -0,0 +1,207 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+use crate::core::configuration::TestAuthorities;
+use itertools::Itertools;
+use polkadot_node_core_approval_voting::time::{Clock, SystemClock, Tick};
+use polkadot_node_network_protocol::{
+	grid_topology::{SessionGridTopology, TopologyPeerInfo},
+	View,
+};
+use polkadot_node_subsystem_types::messages::{
+	network_bridge_event::NewGossipTopology, ApprovalDistributionMessage, NetworkBridgeEvent,
+};
+use polkadot_overseer::AllMessages;
+use polkadot_primitives::{
+	BlockNumber, CandidateEvent, CandidateReceipt, CoreIndex, GroupIndex, Hash, Header,
+	Id as ParaId, Slot, ValidatorIndex,
+};
+use polkadot_primitives_test_helpers::dummy_candidate_receipt_bad_sig;
+use rand::{seq::SliceRandom, SeedableRng};
+use rand_chacha::ChaCha20Rng;
+use sc_network::PeerId;
+use sp_consensus_babe::{
+	digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest},
+	AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch, VrfSignature, VrfTranscript,
+};
+use sp_core::crypto::VrfSecret;
+use sp_keyring::sr25519::Keyring as Sr25519Keyring;
+use sp_runtime::{Digest, DigestItem};
+use std::sync::{atomic::AtomicU64, Arc};
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+/// A fake system clock used for driving the approval voting and make
+/// it process blocks, assignments and approvals from the past.
+#[derive(Clone)]
+pub struct PastSystemClock {
+	/// The real system clock
+	real_system_clock: SystemClock,
+	/// The difference in ticks between the real system clock and the current clock.
+	delta_ticks: Arc<AtomicU64>,
+}
+
+impl PastSystemClock {
+	/// Creates a new fake system clock  with `delta_ticks` between the real time and the fake one.
+	pub fn new(real_system_clock: SystemClock, delta_ticks: Arc<AtomicU64>) -> Self {
+		PastSystemClock { real_system_clock, delta_ticks }
+	}
+}
+
+impl Clock for PastSystemClock {
+	fn tick_now(&self) -> Tick {
+		self.real_system_clock.tick_now() -
+			self.delta_ticks.load(std::sync::atomic::Ordering::SeqCst)
+	}
+
+	fn wait(
+		&self,
+		tick: Tick,
+	) -> std::pin::Pin<Box<dyn futures::prelude::Future<Output = ()> + Send + 'static>> {
+		self.real_system_clock
+			.wait(tick + self.delta_ticks.load(std::sync::atomic::Ordering::SeqCst))
+	}
+}
+
+/// Helper function to generate a  babe epoch for this benchmark.
+/// It does not change for the duration of the test.
+pub fn generate_babe_epoch(current_slot: Slot, authorities: TestAuthorities) -> BabeEpoch {
+	let authorities = authorities
+		.validator_babe_id
+		.into_iter()
+		.enumerate()
+		.map(|(index, public)| (public, index as u64))
+		.collect_vec();
+	BabeEpoch {
+		epoch_index: 1,
+		start_slot: current_slot.saturating_sub(1u64),
+		duration: 200,
+		authorities,
+		randomness: [0xde; 32],
+		config: BabeEpochConfiguration { c: (1, 4), allowed_slots: AllowedSlots::PrimarySlots },
+	}
+}
+
+/// Generates a topology to be used for this benchmark.
+pub fn generate_topology(test_authorities: &TestAuthorities) -> SessionGridTopology {
+	let keyrings = test_authorities
+		.validator_authority_id
+		.clone()
+		.into_iter()
+		.zip(test_authorities.peer_ids.clone())
+		.collect_vec();
+
+	let topology = keyrings
+		.clone()
+		.into_iter()
+		.enumerate()
+		.map(|(index, (discovery_id, peer_id))| TopologyPeerInfo {
+			peer_ids: vec![peer_id],
+			validator_index: ValidatorIndex(index as u32),
+			discovery_id,
+		})
+		.collect_vec();
+	let shuffled = (0..keyrings.len()).collect_vec();
+
+	SessionGridTopology::new(shuffled, topology)
+}
+
+/// Generates new session topology message.
+pub fn generate_new_session_topology(
+	test_authorities: &TestAuthorities,
+	test_node: ValidatorIndex,
+) -> Vec<AllMessages> {
+	let topology = generate_topology(test_authorities);
+
+	let event = NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
+		session: 1,
+		topology,
+		local_index: Some(test_node),
+	});
+	vec![AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(event))]
+}
+
+/// Generates a peer view change for the passed `block_hash`
+pub fn generate_peer_view_change_for(block_hash: Hash, peer_id: PeerId) -> AllMessages {
+	let network = NetworkBridgeEvent::PeerViewChange(peer_id, View::new([block_hash], 0));
+
+	AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(network))
+}
+
+/// Helper function to create a a signature for the block header.
+fn garbage_vrf_signature() -> VrfSignature {
+	let transcript = VrfTranscript::new(b"test-garbage", &[]);
+	Sr25519Keyring::Alice.pair().vrf_sign(&transcript.into())
+}
+
+/// Helper function to create a block header.
+pub fn make_header(parent_hash: Hash, slot: Slot, number: u32) -> Header {
+	let digest =
+		{
+			let mut digest = Digest::default();
+			let vrf_signature = garbage_vrf_signature();
+			digest.push(DigestItem::babe_pre_digest(PreDigest::SecondaryVRF(
+				SecondaryVRFPreDigest { authority_index: 0, slot, vrf_signature },
+			)));
+			digest
+		};
+
+	Header {
+		digest,
+		extrinsics_root: Default::default(),
+		number,
+		state_root: Default::default(),
+		parent_hash,
+	}
+}
+
+/// Helper function to create a candidate receipt.
+fn make_candidate(para_id: ParaId, hash: &Hash) -> CandidateReceipt {
+	let mut r = dummy_candidate_receipt_bad_sig(*hash, Some(Default::default()));
+	r.descriptor.para_id = para_id;
+	r
+}
+
+/// Helper function to create a list of candidates that are included in the block
+pub fn make_candidates(
+	block_hash: Hash,
+	block_number: BlockNumber,
+	num_cores: u32,
+	num_candidates: u32,
+) -> Vec<CandidateEvent> {
+	let seed = [block_number as u8; 32];
+	let mut rand_chacha = ChaCha20Rng::from_seed(seed);
+	let mut candidates = (0..num_cores)
+		.map(|core| {
+			CandidateEvent::CandidateIncluded(
+				make_candidate(ParaId::from(core), &block_hash),
+				Vec::new().into(),
+				CoreIndex(core),
+				GroupIndex(core),
+			)
+		})
+		.collect_vec();
+	let (candidates, _) = candidates.partial_shuffle(&mut rand_chacha, num_candidates as usize);
+	candidates
+		.iter_mut()
+		.map(|val| val.clone())
+		.sorted_by(|a, b| match (a, b) {
+			(
+				CandidateEvent::CandidateIncluded(_, _, core_a, _),
+				CandidateEvent::CandidateIncluded(_, _, core_b, _),
+			) => core_a.0.cmp(&core_b.0),
+			(_, _) => todo!("Should not happen"),
+		})
+		.collect_vec()
+}
diff --git a/polkadot/node/subsystem-bench/src/approval/message_generator.rs b/polkadot/node/subsystem-bench/src/approval/message_generator.rs
new file mode 100644
index 0000000000000000000000000000000000000000..4318dcdf8902a6672a73ac208e43efbf34da3e66
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/approval/message_generator.rs
@@ -0,0 +1,686 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use std::{
+	cmp::max,
+	collections::{BTreeMap, HashSet},
+	fs,
+	io::Write,
+	path::{Path, PathBuf},
+	time::Duration,
+};
+
+use futures::SinkExt;
+use itertools::Itertools;
+use parity_scale_codec::Encode;
+use polkadot_node_core_approval_voting::{
+	criteria::{compute_assignments, Config},
+	time::tranche_to_tick,
+};
+use polkadot_node_network_protocol::grid_topology::{
+	GridNeighbors, RandomRouting, RequiredRouting, SessionGridTopology,
+};
+use polkadot_node_primitives::approval::{
+	self,
+	v2::{CoreBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2},
+};
+use polkadot_primitives::{
+	vstaging::ApprovalVoteMultipleCandidates, CandidateEvent, CandidateHash, CandidateIndex,
+	CoreIndex, SessionInfo, Slot, ValidatorId, ValidatorIndex, ASSIGNMENT_KEY_TYPE_ID,
+};
+use rand::{seq::SliceRandom, RngCore, SeedableRng};
+use rand_chacha::ChaCha20Rng;
+use rand_distr::{Distribution, Normal};
+use sc_keystore::LocalKeystore;
+use sc_network::PeerId;
+use sha1::Digest;
+use sp_application_crypto::AppCrypto;
+use sp_consensus_babe::SlotDuration;
+use sp_keystore::Keystore;
+use sp_timestamp::Timestamp;
+
+use super::{
+	test_message::{MessagesBundle, TestMessageInfo},
+	ApprovalTestState, ApprovalsOptions, BlockTestData,
+};
+use crate::{
+	approval::{
+		helpers::{generate_babe_epoch, generate_topology},
+		GeneratedState, BUFFER_FOR_GENERATION_MILLIS, LOG_TARGET, SLOT_DURATION_MILLIS,
+	},
+	core::{
+		configuration::{TestAuthorities, TestConfiguration, TestObjective},
+		mock::session_info_for_peers,
+		NODE_UNDER_TEST,
+	},
+};
+use polkadot_node_network_protocol::v3 as protocol_v3;
+use polkadot_primitives::Hash;
+use sc_service::SpawnTaskHandle;
+/// A generator of messages coming from a given Peer/Validator
+pub struct PeerMessagesGenerator {
+	/// The grid neighbors of the node under test.
+	pub topology_node_under_test: GridNeighbors,
+	/// The topology of the network for the epoch under test.
+	pub topology: SessionGridTopology,
+	/// The validator index for this object generates the messages.
+	pub validator_index: ValidatorIndex,
+	/// An array of pre-generated random samplings, that is used to determine, which nodes would
+	/// send a given assignment, to the node under test because of the random samplings.
+	/// As an optimization we generate this sampling at the begining of the test and just pick
+	/// one randomly, because always taking the samples would be too expensive for benchamrk.
+	pub random_samplings: Vec<Vec<ValidatorIndex>>,
+	/// Channel for sending the generated messages to the aggregator
+	pub tx_messages: futures::channel::mpsc::UnboundedSender<(Hash, Vec<MessagesBundle>)>,
+	/// The list of test authorities
+	pub test_authorities: TestAuthorities,
+	//// The session info used for the test.
+	pub session_info: SessionInfo,
+	/// The blocks used for testing
+	pub blocks: Vec<BlockTestData>,
+	/// Approval options params.
+	pub options: ApprovalsOptions,
+}
+
+impl PeerMessagesGenerator {
+	/// Generates messages by spawning a blocking task in the background which begins creating
+	/// the assignments/approvals and peer view changes at the begining of each block.
+	pub fn generate_messages(mut self, spawn_task_handle: &SpawnTaskHandle) {
+		spawn_task_handle.spawn("generate-messages", "generate-messages", async move {
+			for block_info in &self.blocks {
+				let assignments = self.generate_assignments(block_info);
+
+				let bytes = self.validator_index.0.to_be_bytes();
+				let seed = [
+					bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+					0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+				];
+
+				let mut rand_chacha = ChaCha20Rng::from_seed(seed);
+				let approvals = issue_approvals(
+					assignments,
+					block_info.hash,
+					&self.test_authorities.validator_public,
+					block_info.candidates.clone(),
+					&self.options,
+					&mut rand_chacha,
+					self.test_authorities.keyring.keystore_ref(),
+				);
+
+				self.tx_messages
+					.send((block_info.hash, approvals))
+					.await
+					.expect("Should not fail");
+			}
+		})
+	}
+
+	// Builds the messages finger print corresponding to this configuration.
+	// When the finger print exists already on disk the messages are not re-generated.
+	fn messages_fingerprint(
+		configuration: &TestConfiguration,
+		options: &ApprovalsOptions,
+	) -> String {
+		let mut fingerprint = options.fingerprint();
+		let mut exclude_objective = configuration.clone();
+		// The objective contains the full content of `ApprovalOptions`, we don't want to put all of
+		// that in fingerprint, so execlute it because we add it manually see above.
+		exclude_objective.objective = TestObjective::Unimplemented;
+		let configuration_bytes = bincode::serialize(&exclude_objective).unwrap();
+		fingerprint.extend(configuration_bytes);
+		let mut sha1 = sha1::Sha1::new();
+		sha1.update(fingerprint);
+		let result = sha1.finalize();
+		hex::encode(result)
+	}
+
+	/// Generate all messages(Assignments & Approvals) needed for approving `blocks``.
+	pub fn generate_messages_if_needed(
+		configuration: &TestConfiguration,
+		test_authorities: &TestAuthorities,
+		options: &ApprovalsOptions,
+		spawn_task_handle: &SpawnTaskHandle,
+	) -> PathBuf {
+		let path_name = format!(
+			"{}/{}",
+			options.workdir_prefix,
+			Self::messages_fingerprint(configuration, options)
+		);
+
+		let path = Path::new(&path_name);
+		if path.exists() {
+			return path.to_path_buf();
+		}
+
+		gum::info!("Generate message because file does not exist");
+		let delta_to_first_slot_under_test = Timestamp::new(BUFFER_FOR_GENERATION_MILLIS);
+		let initial_slot = Slot::from_timestamp(
+			(*Timestamp::current() - *delta_to_first_slot_under_test).into(),
+			SlotDuration::from_millis(SLOT_DURATION_MILLIS),
+		);
+
+		let babe_epoch = generate_babe_epoch(initial_slot, test_authorities.clone());
+		let session_info = session_info_for_peers(configuration, test_authorities);
+		let blocks = ApprovalTestState::generate_blocks_information(
+			configuration,
+			&babe_epoch,
+			initial_slot,
+		);
+
+		gum::info!(target: LOG_TARGET, "Generate messages");
+		let topology = generate_topology(test_authorities);
+
+		let random_samplings = random_samplings_to_node(
+			ValidatorIndex(NODE_UNDER_TEST),
+			test_authorities.validator_public.len(),
+			test_authorities.validator_public.len() * 2,
+		);
+
+		let topology_node_under_test =
+			topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap();
+
+		let (tx, mut rx) = futures::channel::mpsc::unbounded();
+
+		// Spawn a thread to generate the messages for each validator, so that we speed up the
+		// generation.
+		for current_validator_index in 1..test_authorities.validator_public.len() {
+			let peer_message_source = PeerMessagesGenerator {
+				topology_node_under_test: topology_node_under_test.clone(),
+				topology: topology.clone(),
+				validator_index: ValidatorIndex(current_validator_index as u32),
+				test_authorities: test_authorities.clone(),
+				session_info: session_info.clone(),
+				blocks: blocks.clone(),
+				tx_messages: tx.clone(),
+				random_samplings: random_samplings.clone(),
+				options: options.clone(),
+			};
+
+			peer_message_source.generate_messages(spawn_task_handle);
+		}
+
+		std::mem::drop(tx);
+
+		let seed = [0x32; 32];
+		let mut rand_chacha = ChaCha20Rng::from_seed(seed);
+
+		let mut all_messages: BTreeMap<u64, Vec<MessagesBundle>> = BTreeMap::new();
+		// Receive all messages and sort them by Tick they have to be sent.
+		loop {
+			match rx.try_next() {
+				Ok(Some((block_hash, messages))) =>
+					for message in messages {
+						let block_info = blocks
+							.iter()
+							.find(|val| val.hash == block_hash)
+							.expect("Should find blocks");
+						let tick_to_send = tranche_to_tick(
+							SLOT_DURATION_MILLIS,
+							block_info.slot,
+							message.tranche_to_send(),
+						);
+						let to_add = all_messages.entry(tick_to_send).or_default();
+						to_add.push(message);
+					},
+				Ok(None) => break,
+				Err(_) => {
+					std::thread::sleep(Duration::from_millis(50));
+				},
+			}
+		}
+		let all_messages = all_messages
+			.into_iter()
+			.flat_map(|(_, mut messages)| {
+				// Shuffle the messages inside the same tick, so that we don't priorites messages
+				// for older nodes. we try to simulate the same behaviour as in real world.
+				messages.shuffle(&mut rand_chacha);
+				messages
+			})
+			.collect_vec();
+
+		gum::info!("Generated a number of {:} unique messages", all_messages.len());
+
+		let generated_state = GeneratedState { all_messages: Some(all_messages), initial_slot };
+
+		let mut messages_file = fs::OpenOptions::new()
+			.write(true)
+			.create(true)
+			.truncate(true)
+			.open(path)
+			.unwrap();
+
+		messages_file
+			.write_all(&generated_state.encode())
+			.expect("Could not update message file");
+		path.to_path_buf()
+	}
+
+	/// Generates assignments for the given `current_validator_index`
+	/// Returns a list of assignments to be sent sorted by tranche.
+	fn generate_assignments(&self, block_info: &BlockTestData) -> Vec<TestMessageInfo> {
+		let config = Config::from(&self.session_info);
+
+		let leaving_cores = block_info
+			.candidates
+			.clone()
+			.into_iter()
+			.map(|candidate_event| {
+				if let CandidateEvent::CandidateIncluded(candidate, _, core_index, group_index) =
+					candidate_event
+				{
+					(candidate.hash(), core_index, group_index)
+				} else {
+					todo!("Variant is never created in this benchmark")
+				}
+			})
+			.collect_vec();
+
+		let mut assignments_by_tranche = BTreeMap::new();
+
+		let bytes = self.validator_index.0.to_be_bytes();
+		let seed = [
+			bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+			0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+		];
+		let mut rand_chacha = ChaCha20Rng::from_seed(seed);
+
+		let to_be_sent_by = neighbours_that_would_sent_message(
+			&self.test_authorities.peer_ids,
+			self.validator_index.0,
+			&self.topology_node_under_test,
+			&self.topology,
+		);
+
+		let leaving_cores = leaving_cores
+			.clone()
+			.into_iter()
+			.filter(|(_, core_index, _group_index)| core_index.0 != self.validator_index.0)
+			.collect_vec();
+
+		let store = LocalKeystore::in_memory();
+		let _public = store
+			.sr25519_generate_new(
+				ASSIGNMENT_KEY_TYPE_ID,
+				Some(self.test_authorities.key_seeds[self.validator_index.0 as usize].as_str()),
+			)
+			.expect("should not fail");
+		let assignments = compute_assignments(
+			&store,
+			block_info.relay_vrf_story.clone(),
+			&config,
+			leaving_cores.clone(),
+			self.options.enable_assignments_v2,
+		);
+
+		let random_sending_nodes = self
+			.random_samplings
+			.get(rand_chacha.next_u32() as usize % self.random_samplings.len())
+			.unwrap();
+		let random_sending_peer_ids = random_sending_nodes
+			.iter()
+			.map(|validator| (*validator, self.test_authorities.peer_ids[validator.0 as usize]))
+			.collect_vec();
+
+		let mut unique_assignments = HashSet::new();
+		for (core_index, assignment) in assignments {
+			let assigned_cores = match &assignment.cert().kind {
+				approval::v2::AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
+					core_bitfield.iter_ones().map(|val| CoreIndex::from(val as u32)).collect_vec(),
+				approval::v2::AssignmentCertKindV2::RelayVRFDelay { core_index } =>
+					vec![*core_index],
+				approval::v2::AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
+					vec![core_index],
+			};
+
+			let bitfiled: CoreBitfield = assigned_cores.clone().try_into().unwrap();
+
+			// For the cases where tranch0 assignments are in a single certificate we need to make
+			// sure we create a single message.
+			if unique_assignments.insert(bitfiled) {
+				let this_tranche_assignments =
+					assignments_by_tranche.entry(assignment.tranche()).or_insert_with(Vec::new);
+
+				this_tranche_assignments.push((
+					IndirectAssignmentCertV2 {
+						block_hash: block_info.hash,
+						validator: self.validator_index,
+						cert: assignment.cert().clone(),
+					},
+					block_info
+						.candidates
+						.iter()
+						.enumerate()
+						.filter(|(_index, candidate)| {
+							if let CandidateEvent::CandidateIncluded(_, _, core, _) = candidate {
+								assigned_cores.contains(core)
+							} else {
+								panic!("Should not happen");
+							}
+						})
+						.map(|(index, _)| index as u32)
+						.collect_vec()
+						.try_into()
+						.unwrap(),
+					to_be_sent_by
+						.iter()
+						.chain(random_sending_peer_ids.iter())
+						.copied()
+						.collect::<HashSet<(ValidatorIndex, PeerId)>>(),
+					assignment.tranche(),
+				));
+			}
+		}
+
+		assignments_by_tranche
+			.into_values()
+			.flat_map(|assignments| assignments.into_iter())
+			.map(|assignment| {
+				let msg = protocol_v3::ApprovalDistributionMessage::Assignments(vec![(
+					assignment.0,
+					assignment.1,
+				)]);
+				TestMessageInfo {
+					msg,
+					sent_by: assignment
+						.2
+						.into_iter()
+						.map(|(validator_index, _)| validator_index)
+						.collect_vec(),
+					tranche: assignment.3,
+					block_hash: block_info.hash,
+				}
+			})
+			.collect_vec()
+	}
+}
+
+/// A list of random samplings that we use to determine which nodes should send a given message to
+/// the node under test.
+/// We can not sample every time for all the messages because that would be too expensive to
+/// perform, so pre-generate a list of samples for a given network size.
+/// - result[i] give us as a list of random nodes that would send a given message to the node under
+/// test.
+fn random_samplings_to_node(
+	node_under_test: ValidatorIndex,
+	num_validators: usize,
+	num_samplings: usize,
+) -> Vec<Vec<ValidatorIndex>> {
+	let seed = [7u8; 32];
+	let mut rand_chacha = ChaCha20Rng::from_seed(seed);
+
+	(0..num_samplings)
+		.map(|_| {
+			(0..num_validators)
+				.filter(|sending_validator_index| {
+					*sending_validator_index != NODE_UNDER_TEST as usize
+				})
+				.flat_map(|sending_validator_index| {
+					let mut validators = (0..num_validators).collect_vec();
+					validators.shuffle(&mut rand_chacha);
+
+					let mut random_routing = RandomRouting::default();
+					validators
+						.into_iter()
+						.flat_map(|validator_to_send| {
+							if random_routing.sample(num_validators, &mut rand_chacha) {
+								random_routing.inc_sent();
+								if validator_to_send == node_under_test.0 as usize {
+									Some(ValidatorIndex(sending_validator_index as u32))
+								} else {
+									None
+								}
+							} else {
+								None
+							}
+						})
+						.collect_vec()
+				})
+				.collect_vec()
+		})
+		.collect_vec()
+}
+
+/// Helper function to randomly determine how many approvals we coalesce together in a single
+/// message.
+fn coalesce_approvals_len(
+	coalesce_mean: f32,
+	coalesce_std_dev: f32,
+	rand_chacha: &mut ChaCha20Rng,
+) -> usize {
+	max(
+		1,
+		Normal::new(coalesce_mean, coalesce_std_dev)
+			.expect("normal distribution parameters are good")
+			.sample(rand_chacha)
+			.round() as i32,
+	) as usize
+}
+
+/// Helper function to create approvals signatures for all assignments passed as arguments.
+/// Returns a list of Approvals messages that need to be sent.
+fn issue_approvals(
+	assignments: Vec<TestMessageInfo>,
+	block_hash: Hash,
+	validator_ids: &[ValidatorId],
+	candidates: Vec<CandidateEvent>,
+	options: &ApprovalsOptions,
+	rand_chacha: &mut ChaCha20Rng,
+	store: &LocalKeystore,
+) -> Vec<MessagesBundle> {
+	let mut queued_to_sign: Vec<TestSignInfo> = Vec::new();
+	let mut num_coalesce =
+		coalesce_approvals_len(options.coalesce_mean, options.coalesce_std_dev, rand_chacha);
+	let result = assignments
+		.iter()
+		.enumerate()
+		.map(|(_index, message)| match &message.msg {
+			protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
+				let mut approvals_to_create = Vec::new();
+
+				let current_validator_index = queued_to_sign
+					.first()
+					.map(|msg| msg.validator_index)
+					.unwrap_or(ValidatorIndex(99999));
+
+				// Invariant for this benchmark.
+				assert_eq!(assignments.len(), 1);
+
+				let assignment = assignments.first().unwrap();
+
+				let earliest_tranche = queued_to_sign
+					.first()
+					.map(|val| val.assignment.tranche)
+					.unwrap_or(message.tranche);
+
+				if queued_to_sign.len() >= num_coalesce ||
+					(!queued_to_sign.is_empty() &&
+						current_validator_index != assignment.0.validator) ||
+					message.tranche - earliest_tranche >= options.coalesce_tranche_diff
+				{
+					approvals_to_create.push(TestSignInfo::sign_candidates(
+						&mut queued_to_sign,
+						validator_ids,
+						block_hash,
+						num_coalesce,
+						store,
+					));
+					num_coalesce = coalesce_approvals_len(
+						options.coalesce_mean,
+						options.coalesce_std_dev,
+						rand_chacha,
+					);
+				}
+
+				// If more that one candidate was in the assignment queue all of them for issuing
+				// approvals
+				for candidate_index in assignment.1.iter_ones() {
+					let candidate = candidates.get(candidate_index).unwrap();
+					if let CandidateEvent::CandidateIncluded(candidate, _, _, _) = candidate {
+						queued_to_sign.push(TestSignInfo {
+							candidate_hash: candidate.hash(),
+							candidate_index: candidate_index as CandidateIndex,
+							validator_index: assignment.0.validator,
+							assignment: message.clone(),
+						});
+					} else {
+						todo!("Other enum variants are not used in this benchmark");
+					}
+				}
+				approvals_to_create
+			},
+			_ => {
+				todo!("Other enum variants are not used in this benchmark");
+			},
+		})
+		.collect_vec();
+
+	let mut messages = result.into_iter().flatten().collect_vec();
+
+	if !queued_to_sign.is_empty() {
+		messages.push(TestSignInfo::sign_candidates(
+			&mut queued_to_sign,
+			validator_ids,
+			block_hash,
+			num_coalesce,
+			store,
+		));
+	}
+	messages
+}
+
+/// Helper struct to gather information about more than one candidate an sign it in a single
+/// approval message.
+struct TestSignInfo {
+	/// The candidate hash
+	candidate_hash: CandidateHash,
+	/// The candidate index
+	candidate_index: CandidateIndex,
+	/// The validator sending the assignments
+	validator_index: ValidatorIndex,
+	/// The assignments convering this candidate
+	assignment: TestMessageInfo,
+}
+
+impl TestSignInfo {
+	/// Helper function to create a signture for all candidates in `to_sign` parameter.
+	/// Returns a TestMessage
+	fn sign_candidates(
+		to_sign: &mut Vec<TestSignInfo>,
+		validator_ids: &[ValidatorId],
+		block_hash: Hash,
+		num_coalesce: usize,
+		store: &LocalKeystore,
+	) -> MessagesBundle {
+		let current_validator_index = to_sign.first().map(|val| val.validator_index).unwrap();
+		let tranche_approval_can_be_sent =
+			to_sign.iter().map(|val| val.assignment.tranche).max().unwrap();
+		let validator_id = validator_ids.get(current_validator_index.0 as usize).unwrap().clone();
+
+		let unique_assignments: HashSet<TestMessageInfo> =
+			to_sign.iter().map(|info| info.assignment.clone()).collect();
+
+		let mut to_sign = to_sign
+			.drain(..)
+			.sorted_by(|val1, val2| val1.candidate_index.cmp(&val2.candidate_index))
+			.peekable();
+
+		let mut bundle = MessagesBundle {
+			assignments: unique_assignments.into_iter().collect_vec(),
+			approvals: Vec::new(),
+		};
+
+		while to_sign.peek().is_some() {
+			let to_sign = to_sign.by_ref().take(num_coalesce).collect_vec();
+
+			let hashes = to_sign.iter().map(|val| val.candidate_hash).collect_vec();
+			let candidate_indices = to_sign.iter().map(|val| val.candidate_index).collect_vec();
+
+			let sent_by = to_sign
+				.iter()
+				.flat_map(|val| val.assignment.sent_by.iter())
+				.copied()
+				.collect::<HashSet<ValidatorIndex>>();
+
+			let payload = ApprovalVoteMultipleCandidates(&hashes).signing_payload(1);
+
+			let signature = store
+				.sr25519_sign(ValidatorId::ID, &validator_id.clone().into(), &payload[..])
+				.unwrap()
+				.unwrap()
+				.into();
+			let indirect = IndirectSignedApprovalVoteV2 {
+				block_hash,
+				candidate_indices: candidate_indices.try_into().unwrap(),
+				validator: current_validator_index,
+				signature,
+			};
+			let msg = protocol_v3::ApprovalDistributionMessage::Approvals(vec![indirect]);
+
+			bundle.approvals.push(TestMessageInfo {
+				msg,
+				sent_by: sent_by.into_iter().collect_vec(),
+				tranche: tranche_approval_can_be_sent,
+				block_hash,
+			});
+		}
+		bundle
+	}
+}
+
+/// Determine what neighbours would send a given message to the node under test.
+fn neighbours_that_would_sent_message(
+	peer_ids: &[PeerId],
+	current_validator_index: u32,
+	topology_node_under_test: &GridNeighbors,
+	topology: &SessionGridTopology,
+) -> Vec<(ValidatorIndex, PeerId)> {
+	let topology_originator = topology
+		.compute_grid_neighbors_for(ValidatorIndex(current_validator_index))
+		.unwrap();
+
+	let originator_y = topology_originator.validator_indices_y.iter().find(|validator| {
+		topology_node_under_test.required_routing_by_index(**validator, false) ==
+			RequiredRouting::GridY
+	});
+
+	assert!(originator_y != Some(&ValidatorIndex(NODE_UNDER_TEST)));
+
+	let originator_x = topology_originator.validator_indices_x.iter().find(|validator| {
+		topology_node_under_test.required_routing_by_index(**validator, false) ==
+			RequiredRouting::GridX
+	});
+
+	assert!(originator_x != Some(&ValidatorIndex(NODE_UNDER_TEST)));
+
+	let is_neighbour = topology_originator
+		.validator_indices_x
+		.contains(&ValidatorIndex(NODE_UNDER_TEST)) ||
+		topology_originator
+			.validator_indices_y
+			.contains(&ValidatorIndex(NODE_UNDER_TEST));
+
+	let mut to_be_sent_by = originator_y
+		.into_iter()
+		.chain(originator_x)
+		.map(|val| (*val, peer_ids[val.0 as usize]))
+		.collect_vec();
+
+	if is_neighbour {
+		to_be_sent_by.push((ValidatorIndex(current_validator_index), peer_ids[0]));
+	}
+
+	to_be_sent_by
+}
diff --git a/polkadot/node/subsystem-bench/src/approval/mock_chain_selection.rs b/polkadot/node/subsystem-bench/src/approval/mock_chain_selection.rs
new file mode 100644
index 0000000000000000000000000000000000000000..ab23a8ced484eb7584eaa3d90c5a0e967ba897f7
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/approval/mock_chain_selection.rs
@@ -0,0 +1,66 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use crate::approval::{LOG_TARGET, SLOT_DURATION_MILLIS};
+
+use super::{ApprovalTestState, PastSystemClock};
+use futures::FutureExt;
+use polkadot_node_core_approval_voting::time::{slot_number_to_tick, Clock, TICK_DURATION_MILLIS};
+use polkadot_node_subsystem::{overseer, SpawnedSubsystem, SubsystemError};
+use polkadot_node_subsystem_types::messages::ChainSelectionMessage;
+
+/// Mock ChainSelection subsystem used to answer request made by the approval-voting subsystem,
+/// during benchmark. All the necessary information to answer the requests is stored in the `state`
+pub struct MockChainSelection {
+	pub state: ApprovalTestState,
+	pub clock: PastSystemClock,
+}
+
+#[overseer::subsystem(ChainSelection, error=SubsystemError, prefix=self::overseer)]
+impl<Context> MockChainSelection {
+	fn start(self, ctx: Context) -> SpawnedSubsystem {
+		let future = self.run(ctx).map(|_| Ok(())).boxed();
+
+		SpawnedSubsystem { name: "mock-chain-subsystem", future }
+	}
+}
+
+#[overseer::contextbounds(ChainSelection, prefix = self::overseer)]
+impl MockChainSelection {
+	async fn run<Context>(self, mut ctx: Context) {
+		loop {
+			let msg = ctx.recv().await.expect("Should not fail");
+			match msg {
+				orchestra::FromOrchestra::Signal(_) => {},
+				orchestra::FromOrchestra::Communication { msg } =>
+					if let ChainSelectionMessage::Approved(hash) = msg {
+						let block_info = self.state.get_info_by_hash(hash);
+						let approved_number = block_info.block_number;
+
+						block_info.approved.store(true, std::sync::atomic::Ordering::SeqCst);
+						self.state
+							.last_approved_block
+							.store(approved_number, std::sync::atomic::Ordering::SeqCst);
+
+						let approved_in_tick = self.clock.tick_now() -
+							slot_number_to_tick(SLOT_DURATION_MILLIS, block_info.slot);
+
+						gum::info!(target: LOG_TARGET, ?hash, "Chain selection approved  after {:} ms", approved_in_tick * TICK_DURATION_MILLIS);
+					},
+			}
+		}
+	}
+}
diff --git a/polkadot/node/subsystem-bench/src/approval/mod.rs b/polkadot/node/subsystem-bench/src/approval/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..3544ce74711e6df4f5e5b4f92299ef54f6757733
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/approval/mod.rs
@@ -0,0 +1,1059 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use self::{
+	helpers::{make_candidates, make_header},
+	test_message::{MessagesBundle, TestMessageInfo},
+};
+use crate::{
+	approval::{
+		helpers::{
+			generate_babe_epoch, generate_new_session_topology, generate_peer_view_change_for,
+			PastSystemClock,
+		},
+		message_generator::PeerMessagesGenerator,
+		mock_chain_selection::MockChainSelection,
+	},
+	core::{
+		configuration::{TestAuthorities, TestConfiguration},
+		environment::{TestEnvironment, TestEnvironmentDependencies, MAX_TIME_OF_FLIGHT},
+		mock::{
+			dummy_builder,
+			network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx},
+			AlwaysSupportsParachains, ChainApiState, MockChainApi, MockRuntimeApi, TestSyncOracle,
+		},
+		network::{
+			new_network, HandleNetworkMessage, NetworkEmulatorHandle, NetworkInterface,
+			NetworkInterfaceReceiver,
+		},
+		NODE_UNDER_TEST,
+	},
+};
+use colored::Colorize;
+use futures::channel::oneshot;
+use itertools::Itertools;
+use orchestra::TimeoutExt;
+use overseer::{metrics::Metrics as OverseerMetrics, MetricsTrait};
+use parity_scale_codec::{Decode, Encode};
+use polkadot_approval_distribution::ApprovalDistribution;
+use polkadot_node_core_approval_voting::{
+	time::{slot_number_to_tick, tick_to_slot_number, Clock, ClockExt, SystemClock},
+	ApprovalVotingSubsystem, Config as ApprovalVotingConfig, Metrics as ApprovalVotingMetrics,
+};
+use polkadot_node_network_protocol::v3 as protocol_v3;
+use polkadot_node_primitives::approval::{self, v1::RelayVRFStory};
+use polkadot_node_subsystem::{overseer, AllMessages, Overseer, OverseerConnector, SpawnGlue};
+use polkadot_node_subsystem_test_helpers::mock::new_block_import_info;
+use polkadot_node_subsystem_types::messages::{ApprovalDistributionMessage, ApprovalVotingMessage};
+use polkadot_node_subsystem_util::metrics::Metrics;
+use polkadot_overseer::Handle as OverseerHandleReal;
+use polkadot_primitives::{
+	BlockNumber, CandidateEvent, CandidateIndex, CandidateReceipt, Hash, Header, Slot,
+	ValidatorIndex,
+};
+use prometheus::Registry;
+use sc_keystore::LocalKeystore;
+use sc_service::SpawnTaskHandle;
+use serde::{Deserialize, Serialize};
+use sp_consensus_babe::Epoch as BabeEpoch;
+use sp_core::H256;
+use std::{
+	cmp::max,
+	collections::{HashMap, HashSet},
+	fs,
+	io::Read,
+	ops::Sub,
+	sync::{
+		atomic::{AtomicBool, AtomicU32, AtomicU64},
+		Arc,
+	},
+	time::{Duration, Instant},
+};
+use tokio::time::sleep;
+
+mod helpers;
+mod message_generator;
+mod mock_chain_selection;
+mod test_message;
+
+pub const LOG_TARGET: &str = "subsystem-bench::approval";
+const DATA_COL: u32 = 0;
+pub(crate) const NUM_COLUMNS: u32 = 1;
+pub(crate) const SLOT_DURATION_MILLIS: u64 = 6000;
+pub(crate) const TEST_CONFIG: ApprovalVotingConfig = ApprovalVotingConfig {
+	col_approval_data: DATA_COL,
+	slot_duration_millis: SLOT_DURATION_MILLIS,
+};
+
+/// Start generating messages for a slot into the future, so that the
+/// generation nevers falls behind the current slot.
+const BUFFER_FOR_GENERATION_MILLIS: u64 = 30_000;
+
+/// Parameters specific to the approvals benchmark
+#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
+#[clap(rename_all = "kebab-case")]
+#[allow(missing_docs)]
+pub struct ApprovalsOptions {
+	#[clap(short, long, default_value_t = 89)]
+	/// The last considered tranche for which we send the message.
+	pub last_considered_tranche: u32,
+	#[clap(short, long, default_value_t = 1.0)]
+	/// Min candidates to be signed in a single approval.
+	pub coalesce_mean: f32,
+	#[clap(short, long, default_value_t = 1.0)]
+	/// Max candidate to be signed in a single approval.
+	pub coalesce_std_dev: f32,
+	/// The maximum tranche diff between approvals coalesced toghther.
+	pub coalesce_tranche_diff: u32,
+	#[clap(short, long, default_value_t = false)]
+	/// Enable assignments v2.
+	pub enable_assignments_v2: bool,
+	#[clap(short, long, default_value_t = true)]
+	/// Sends messages only till block is approved.
+	pub stop_when_approved: bool,
+	#[clap(short, long)]
+	/// Work directory.
+	#[clap(short, long, default_value_t = format!("/tmp"))]
+	pub workdir_prefix: String,
+	/// The number of no shows per candidate
+	#[clap(short, long, default_value_t = 0)]
+	pub num_no_shows_per_candidate: u32,
+}
+
+impl ApprovalsOptions {
+	// Generates a fingerprint use to determine if messages need to be re-generated.
+	fn fingerprint(&self) -> Vec<u8> {
+		let mut bytes = Vec::new();
+		bytes.extend(self.coalesce_mean.to_be_bytes());
+		bytes.extend(self.coalesce_std_dev.to_be_bytes());
+		bytes.extend(self.coalesce_tranche_diff.to_be_bytes());
+		bytes.extend((self.enable_assignments_v2 as i32).to_be_bytes());
+		bytes
+	}
+}
+
+/// Information about a block. It is part of test state and it is used by the mock
+/// subsystems to be able to answer the calls approval-voting and approval-distribution
+/// do into the outside world.
+#[derive(Clone, Debug)]
+struct BlockTestData {
+	/// The slot this block occupies, see implementer's guide to understand what a slot
+	/// is in the context of polkadot.
+	slot: Slot,
+	/// The hash of the block.
+	hash: Hash,
+	/// The block number.
+	block_number: BlockNumber,
+	/// The list of candidates included in this block.
+	candidates: Vec<CandidateEvent>,
+	/// The block header.
+	header: Header,
+	/// The vrf story for the given block.
+	relay_vrf_story: RelayVRFStory,
+	/// If the block has been approved by the approval-voting subsystem.
+	/// This set on `true` when ChainSelectionMessage::Approved is received inside the chain
+	/// selection mock subsystem.
+	approved: Arc<AtomicBool>,
+	/// The total number of candidates before this block.
+	total_candidates_before: u64,
+	/// The votes we sent.
+	/// votes[validator_index][candidate_index] tells if validator sent vote for candidate.
+	/// We use this to mark the test as succesfull if GetApprovalSignatures returns all the votes
+	/// from here.
+	votes: Arc<Vec<Vec<AtomicBool>>>,
+}
+
+/// Candidate information used during the test to decide if more messages are needed.
+#[derive(Debug)]
+struct CandidateTestData {
+	/// The configured maximum number of no-shows for this candidate.
+	max_no_shows: u32,
+	/// The last tranche where we had a no-show.
+	last_tranche_with_no_show: u32,
+	/// The number of sent assignments.
+	sent_assignment: u32,
+	/// The number of no-shows.
+	num_no_shows: u32,
+	/// The maximum tranche were we covered the needed approvals
+	max_tranche: u32,
+	/// Minimum needed votes to approve candidate.
+	needed_approvals: u32,
+}
+
+impl CandidateTestData {
+	/// If message in this tranche needs to be sent.
+	fn should_send_tranche(&self, tranche: u32) -> bool {
+		self.sent_assignment <= self.needed_approvals ||
+			tranche <= self.max_tranche + self.num_no_shows
+	}
+
+	/// Sets max tranche
+	fn set_max_tranche(&mut self, tranche: u32) {
+		self.max_tranche = max(tranche, self.max_tranche);
+	}
+
+	/// Records no-show for candidate.
+	fn record_no_show(&mut self, tranche: u32) {
+		self.num_no_shows += 1;
+		self.last_tranche_with_no_show = max(tranche, self.last_tranche_with_no_show);
+	}
+
+	/// Marks an assignment sent.
+	fn mark_sent_assignment(&mut self, tranche: u32) {
+		if self.sent_assignment < self.needed_approvals {
+			self.set_max_tranche(tranche);
+		}
+
+		self.sent_assignment += 1;
+	}
+
+	/// Tells if a message in this tranche should be a no-show.
+	fn should_no_show(&self, tranche: u32) -> bool {
+		(self.num_no_shows < self.max_no_shows && self.last_tranche_with_no_show < tranche) ||
+			(tranche == 0 && self.num_no_shows == 0 && self.max_no_shows > 0)
+	}
+}
+
+/// Test state that is pre-generated and loaded from a file that matches the fingerprint
+/// of the TestConfiguration.
+#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
+struct GeneratedState {
+	/// All assignments and approvals
+	all_messages: Option<Vec<MessagesBundle>>,
+	/// The first slot in the test.
+	initial_slot: Slot,
+}
+
+/// Approval test state used by all mock subsystems to be able to answer messages emitted
+/// by the approval-voting and approval-distribution-subystems.
+///
+/// This gets cloned across all mock subsystems, so if there is any information that gets
+/// updated between subsystems, they would have to be wrapped in Arc's.
+#[derive(Clone)]
+pub struct ApprovalTestState {
+	/// The main test configuration
+	configuration: TestConfiguration,
+	/// The specific test configurations passed when starting the benchmark.
+	options: ApprovalsOptions,
+	/// The list of blocks used for testing.
+	blocks: Vec<BlockTestData>,
+	/// The babe epoch used during testing.
+	babe_epoch: BabeEpoch,
+	/// The pre-generated state.
+	generated_state: GeneratedState,
+	/// The test authorities
+	test_authorities: TestAuthorities,
+	/// Last approved block number.
+	last_approved_block: Arc<AtomicU32>,
+	/// Total sent messages from peers to node
+	total_sent_messages_to_node: Arc<AtomicU64>,
+	/// Total sent messages from test node to other peers
+	total_sent_messages_from_node: Arc<AtomicU64>,
+	/// Total unique sent messages.
+	total_unique_messages: Arc<AtomicU64>,
+	/// Approval voting metrics.
+	approval_voting_metrics: ApprovalVotingMetrics,
+	/// The delta ticks from the tick the messages were generated to the the time we start this
+	/// message.
+	delta_tick_from_generated: Arc<AtomicU64>,
+}
+
+impl ApprovalTestState {
+	/// Build a new `ApprovalTestState` object out of the configurations passed when the benchmark
+	/// was tested.
+	fn new(
+		configuration: &TestConfiguration,
+		options: ApprovalsOptions,
+		dependencies: &TestEnvironmentDependencies,
+	) -> Self {
+		let test_authorities = configuration.generate_authorities();
+		let start = Instant::now();
+
+		let messages_path = PeerMessagesGenerator::generate_messages_if_needed(
+			configuration,
+			&test_authorities,
+			&options,
+			&dependencies.task_manager.spawn_handle(),
+		);
+
+		let mut messages_file =
+			fs::OpenOptions::new().read(true).open(messages_path.as_path()).unwrap();
+		let mut messages_bytes = Vec::<u8>::with_capacity(2000000);
+
+		messages_file
+			.read_to_end(&mut messages_bytes)
+			.expect("Could not initialize list of messages");
+		let generated_state: GeneratedState =
+			Decode::decode(&mut messages_bytes.as_slice()).expect("Could not decode messages");
+
+		gum::info!(
+			"It took {:?} ms to load {:?} unique messages",
+			start.elapsed().as_millis(),
+			generated_state.all_messages.as_ref().map(|val| val.len()).unwrap_or_default()
+		);
+
+		let babe_epoch =
+			generate_babe_epoch(generated_state.initial_slot, test_authorities.clone());
+		let blocks = Self::generate_blocks_information(
+			configuration,
+			&babe_epoch,
+			generated_state.initial_slot,
+		);
+
+		let state = ApprovalTestState {
+			blocks,
+			babe_epoch: babe_epoch.clone(),
+			generated_state,
+			test_authorities,
+			last_approved_block: Arc::new(AtomicU32::new(0)),
+			total_sent_messages_to_node: Arc::new(AtomicU64::new(0)),
+			total_sent_messages_from_node: Arc::new(AtomicU64::new(0)),
+			total_unique_messages: Arc::new(AtomicU64::new(0)),
+			options,
+			approval_voting_metrics: ApprovalVotingMetrics::try_register(&dependencies.registry)
+				.unwrap(),
+			delta_tick_from_generated: Arc::new(AtomicU64::new(630720000)),
+			configuration: configuration.clone(),
+		};
+
+		gum::info!("Built testing state");
+
+		state
+	}
+
+	/// Generates the blocks and the information about the blocks that will be used
+	/// to drive this test.
+	fn generate_blocks_information(
+		configuration: &TestConfiguration,
+		babe_epoch: &BabeEpoch,
+		initial_slot: Slot,
+	) -> Vec<BlockTestData> {
+		let mut per_block_heads: Vec<BlockTestData> = Vec::new();
+		let mut prev_candidates = 0;
+		for block_number in 1..=configuration.num_blocks {
+			let block_hash = Hash::repeat_byte(block_number as u8);
+			let parent_hash =
+				per_block_heads.last().map(|val| val.hash).unwrap_or(Hash::repeat_byte(0xde));
+			let slot_for_block = initial_slot + (block_number as u64 - 1);
+
+			let header = make_header(parent_hash, slot_for_block, block_number as u32);
+
+			let unsafe_vrf = approval::v1::babe_unsafe_vrf_info(&header)
+				.expect("Can not continue without vrf generator");
+			let relay_vrf_story = unsafe_vrf
+				.compute_randomness(
+					&babe_epoch.authorities,
+					&babe_epoch.randomness,
+					babe_epoch.epoch_index,
+				)
+				.expect("Can not continue without vrf story");
+			let block_info = BlockTestData {
+				slot: slot_for_block,
+				block_number: block_number as BlockNumber,
+				hash: block_hash,
+				header,
+				candidates: make_candidates(
+					block_hash,
+					block_number as BlockNumber,
+					configuration.n_cores as u32,
+					configuration.n_cores as u32,
+				),
+				relay_vrf_story,
+				approved: Arc::new(AtomicBool::new(false)),
+				total_candidates_before: prev_candidates,
+				votes: Arc::new(
+					(0..configuration.n_validators)
+						.map(|_| {
+							(0..configuration.n_cores).map(|_| AtomicBool::new(false)).collect_vec()
+						})
+						.collect_vec(),
+				),
+			};
+			prev_candidates += block_info.candidates.len() as u64;
+			per_block_heads.push(block_info)
+		}
+		per_block_heads
+	}
+
+	/// Starts the generation of messages(Assignments & Approvals) needed for approving blocks.
+	async fn start_message_production(
+		&mut self,
+		network_emulator: &NetworkEmulatorHandle,
+		overseer_handle: OverseerHandleReal,
+		env: &TestEnvironment,
+		registry: Registry,
+	) -> oneshot::Receiver<()> {
+		gum::info!(target: LOG_TARGET, "Start assignments/approvals production");
+
+		let (producer_tx, producer_rx) = oneshot::channel();
+		let peer_message_source = PeerMessageProducer {
+			network: network_emulator.clone(),
+			overseer_handle: overseer_handle.clone(),
+			state: self.clone(),
+			options: self.options.clone(),
+			notify_done: producer_tx,
+			registry,
+		};
+
+		peer_message_source
+			.produce_messages(env, self.generated_state.all_messages.take().unwrap());
+		producer_rx
+	}
+
+	// Generates a ChainApiState used for driving MockChainApi
+	fn build_chain_api_state(&self) -> ChainApiState {
+		ChainApiState {
+			block_headers: self
+				.blocks
+				.iter()
+				.map(|block| (block.hash, block.header.clone()))
+				.collect(),
+		}
+	}
+
+	// Builds a map  with the list of candidate events per-block.
+	fn candidate_events_by_block(&self) -> HashMap<H256, Vec<CandidateEvent>> {
+		self.blocks.iter().map(|block| (block.hash, block.candidates.clone())).collect()
+	}
+
+	// Builds a map  with the list of candidate hashes per-block.
+	fn candidate_hashes_by_block(&self) -> HashMap<H256, Vec<CandidateReceipt>> {
+		self.blocks
+			.iter()
+			.map(|block| {
+				(
+					block.hash,
+					block
+						.candidates
+						.iter()
+						.map(|candidate_event| match candidate_event {
+							CandidateEvent::CandidateBacked(_, _, _, _) => todo!(),
+							CandidateEvent::CandidateIncluded(receipt, _, _, _) => receipt.clone(),
+							CandidateEvent::CandidateTimedOut(_, _, _) => todo!(),
+						})
+						.collect_vec(),
+				)
+			})
+			.collect()
+	}
+}
+
+impl ApprovalTestState {
+	/// Returns test data for the given hash
+	fn get_info_by_hash(&self, requested_hash: Hash) -> &BlockTestData {
+		self.blocks
+			.iter()
+			.find(|block| block.hash == requested_hash)
+			.expect("Mocks should not use unknown hashes")
+	}
+
+	/// Returns test data for the given slot
+	fn get_info_by_slot(&self, slot: Slot) -> Option<&BlockTestData> {
+		self.blocks.iter().find(|block| block.slot == slot)
+	}
+}
+
+impl HandleNetworkMessage for ApprovalTestState {
+	fn handle(
+		&self,
+		_message: crate::core::network::NetworkMessage,
+		_node_sender: &mut futures::channel::mpsc::UnboundedSender<
+			crate::core::network::NetworkMessage,
+		>,
+	) -> Option<crate::core::network::NetworkMessage> {
+		self.total_sent_messages_from_node
+			.as_ref()
+			.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+		None
+	}
+}
+
+/// A generator of messages coming from a given Peer/Validator
+struct PeerMessageProducer {
+	/// The state state used to know what messages to generate.
+	state: ApprovalTestState,
+	/// Configuration options, passed at the beginning of the test.
+	options: ApprovalsOptions,
+	/// A reference to the network emulator
+	network: NetworkEmulatorHandle,
+	/// A handle to the overseer, used for sending messages to the node
+	/// under test.
+	overseer_handle: OverseerHandleReal,
+	/// Channel for producer to notify main loop it finished sending
+	/// all messages and they have been processed.
+	notify_done: oneshot::Sender<()>,
+	/// The metrics registry.
+	registry: Registry,
+}
+
+impl PeerMessageProducer {
+	/// Generates messages by spawning a blocking task in the background which begins creating
+	/// the assignments/approvals and peer view changes at the begining of each block.
+	fn produce_messages(mut self, env: &TestEnvironment, all_messages: Vec<MessagesBundle>) {
+		env.spawn_blocking("produce-messages", async move {
+			let mut initialized_blocks = HashSet::new();
+			let mut per_candidate_data: HashMap<(Hash, CandidateIndex), CandidateTestData> =
+				self.initialize_candidates_test_data();
+			let mut skipped_messages: Vec<MessagesBundle> = Vec::new();
+			let mut re_process_skipped = false;
+
+			let system_clock =
+				PastSystemClock::new(SystemClock {}, self.state.delta_tick_from_generated.clone());
+			let mut all_messages = all_messages.into_iter().peekable();
+
+			while all_messages.peek().is_some() {
+				let current_slot =
+					tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now());
+				let block_to_initialize = self
+					.state
+					.blocks
+					.iter()
+					.filter(|block_info| {
+						block_info.slot <= current_slot &&
+							!initialized_blocks.contains(&block_info.hash)
+					})
+					.cloned()
+					.collect_vec();
+				for block_info in block_to_initialize {
+					if !TestEnvironment::metric_lower_than(
+						&self.registry,
+						"polkadot_parachain_imported_candidates_total",
+						(block_info.total_candidates_before + block_info.candidates.len() as u64 -
+							1) as f64,
+					) {
+						initialized_blocks.insert(block_info.hash);
+						self.initialize_block(&block_info).await;
+					}
+				}
+
+				let mut maybe_need_skip = if re_process_skipped {
+					skipped_messages.clone().into_iter().peekable()
+				} else {
+					vec![].into_iter().peekable()
+				};
+
+				let progressing_iterator = if !re_process_skipped {
+					&mut all_messages
+				} else {
+					re_process_skipped = false;
+					skipped_messages.clear();
+					&mut maybe_need_skip
+				};
+
+				while progressing_iterator
+					.peek()
+					.map(|bundle| {
+						self.time_to_process_message(
+							bundle,
+							current_slot,
+							&initialized_blocks,
+							&system_clock,
+							&per_candidate_data,
+						)
+					})
+					.unwrap_or_default()
+				{
+					let bundle = progressing_iterator.next().unwrap();
+					re_process_skipped = self.process_message(
+						bundle,
+						&mut per_candidate_data,
+						&mut skipped_messages,
+					) || re_process_skipped;
+				}
+				// Sleep, so that we don't busy wait in this loop when don't have anything to send.
+				sleep(Duration::from_millis(50)).await;
+			}
+
+			gum::info!(
+				"All messages sent max_tranche {:?} last_tranche_with_no_show {:?}",
+				per_candidate_data.values().map(|data| data.max_tranche).max(),
+				per_candidate_data.values().map(|data| data.last_tranche_with_no_show).max()
+			);
+			sleep(Duration::from_secs(6)).await;
+			// Send an empty GetApprovalSignatures as the last message
+			// so when the approval-distribution answered to it, we know it doesn't have anything
+			// else to process.
+			let (tx, rx) = oneshot::channel();
+			let msg = ApprovalDistributionMessage::GetApprovalSignatures(HashSet::new(), tx);
+			self.send_overseer_message(
+				AllMessages::ApprovalDistribution(msg),
+				ValidatorIndex(0),
+				None,
+			)
+			.await;
+			rx.await.expect("Failed to get signatures");
+			self.notify_done.send(()).expect("Failed to notify main loop");
+			gum::info!("All messages processed ");
+		});
+	}
+
+	// Processes a single message bundle and queue the messages to be sent by the peers that would
+	// send the message in our simulation.
+	pub fn process_message(
+		&mut self,
+		bundle: MessagesBundle,
+		per_candidate_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
+		skipped_messages: &mut Vec<MessagesBundle>,
+	) -> bool {
+		let mut reprocess_skipped = false;
+		let block_info = self
+			.state
+			.get_info_by_hash(bundle.assignments.first().unwrap().block_hash)
+			.clone();
+
+		if bundle.should_send(per_candidate_data, &self.options) {
+			bundle.record_sent_assignment(per_candidate_data);
+
+			let assignments = bundle.assignments.clone();
+
+			for message in bundle.assignments.into_iter().chain(bundle.approvals.into_iter()) {
+				if message.no_show_if_required(&assignments, per_candidate_data) {
+					reprocess_skipped = true;
+					continue;
+				} else {
+					message.record_vote(&block_info);
+				}
+				self.state
+					.total_unique_messages
+					.as_ref()
+					.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+				for (peer, messages) in
+					message.clone().split_by_peer_id(&self.state.test_authorities)
+				{
+					for message in messages {
+						self.state
+							.total_sent_messages_to_node
+							.as_ref()
+							.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+						self.queue_message_from_peer(message, peer.0)
+					}
+				}
+			}
+		} else if !block_info.approved.load(std::sync::atomic::Ordering::SeqCst) &&
+			self.options.num_no_shows_per_candidate > 0
+		{
+			skipped_messages.push(bundle);
+		}
+		reprocess_skipped
+	}
+
+	// Tells if it is the time to process a message.
+	pub fn time_to_process_message(
+		&self,
+		bundle: &MessagesBundle,
+		current_slot: Slot,
+		initialized_blocks: &HashSet<Hash>,
+		system_clock: &PastSystemClock,
+		per_candidate_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
+	) -> bool {
+		let block_info =
+			self.state.get_info_by_hash(bundle.assignments.first().unwrap().block_hash);
+		let tranche_now = system_clock.tranche_now(SLOT_DURATION_MILLIS, block_info.slot);
+
+		Self::is_past_tranche(
+			bundle,
+			tranche_now,
+			current_slot,
+			block_info,
+			initialized_blocks.contains(&block_info.hash),
+		) || !bundle.should_send(per_candidate_data, &self.options)
+	}
+
+	// Tells if the tranche where the bundle should be sent has passed.
+	pub fn is_past_tranche(
+		bundle: &MessagesBundle,
+		tranche_now: u32,
+		current_slot: Slot,
+		block_info: &BlockTestData,
+		block_initialized: bool,
+	) -> bool {
+		bundle.tranche_to_send() <= tranche_now &&
+			current_slot >= block_info.slot &&
+			block_initialized
+	}
+
+	// Queue message to be sent by validator `sent_by`
+	fn queue_message_from_peer(&mut self, message: TestMessageInfo, sent_by: ValidatorIndex) {
+		let peer_authority_id = self
+			.state
+			.test_authorities
+			.validator_authority_id
+			.get(sent_by.0 as usize)
+			.expect("We can't handle unknown peers")
+			.clone();
+
+		self.network
+			.send_message_from_peer(
+				&peer_authority_id,
+				protocol_v3::ValidationProtocol::ApprovalDistribution(message.msg).into(),
+			)
+			.unwrap_or_else(|_| panic!("Network should be up and running {:?}", sent_by));
+	}
+
+	// Queues a message to be sent by the peer identified by the `sent_by` value.
+	async fn send_overseer_message(
+		&mut self,
+		message: AllMessages,
+		_sent_by: ValidatorIndex,
+		_latency: Option<Duration>,
+	) {
+		self.overseer_handle
+			.send_msg(message, LOG_TARGET)
+			.timeout(MAX_TIME_OF_FLIGHT)
+			.await
+			.unwrap_or_else(|| {
+				panic!("{} ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
+			});
+	}
+
+	// Sends the messages needed by approval-distribution and approval-voting for processing a
+	// message. E.g: PeerViewChange.
+	async fn initialize_block(&mut self, block_info: &BlockTestData) {
+		gum::info!("Initialize block {:?}", block_info.hash);
+		let (tx, rx) = oneshot::channel();
+		self.overseer_handle.wait_for_activation(block_info.hash, tx).await;
+
+		rx.await
+			.expect("We should not fail waiting for block to be activated")
+			.expect("We should not fail waiting for block to be activated");
+
+		for validator in 1..self.state.test_authorities.validator_authority_id.len() as u32 {
+			let peer_id = self.state.test_authorities.peer_ids.get(validator as usize).unwrap();
+			let validator = ValidatorIndex(validator);
+			let view_update = generate_peer_view_change_for(block_info.hash, *peer_id);
+
+			self.send_overseer_message(view_update, validator, None).await;
+		}
+	}
+
+	// Initializes the candidates test data. This is used for bookeeping if more assignments and
+	// approvals would be needed.
+	fn initialize_candidates_test_data(
+		&self,
+	) -> HashMap<(Hash, CandidateIndex), CandidateTestData> {
+		let mut per_candidate_data: HashMap<(Hash, CandidateIndex), CandidateTestData> =
+			HashMap::new();
+		for block_info in self.state.blocks.iter() {
+			for (candidate_index, _) in block_info.candidates.iter().enumerate() {
+				per_candidate_data.insert(
+					(block_info.hash, candidate_index as CandidateIndex),
+					CandidateTestData {
+						max_no_shows: self.options.num_no_shows_per_candidate,
+						last_tranche_with_no_show: 0,
+						sent_assignment: 0,
+						num_no_shows: 0,
+						max_tranche: 0,
+						needed_approvals: self.state.configuration.needed_approvals as u32,
+					},
+				);
+			}
+		}
+		per_candidate_data
+	}
+}
+
+/// Helper function to build an overseer with the real implementation for `ApprovalDistribution` and
+/// `ApprovalVoting` subystems and mock subsytems for all others.
+fn build_overseer(
+	state: &ApprovalTestState,
+	network: &NetworkEmulatorHandle,
+	config: &TestConfiguration,
+	dependencies: &TestEnvironmentDependencies,
+	network_interface: &NetworkInterface,
+	network_receiver: NetworkInterfaceReceiver,
+) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandleReal) {
+	let overseer_connector = OverseerConnector::with_event_capacity(6400000);
+
+	let spawn_task_handle = dependencies.task_manager.spawn_handle();
+
+	let db = kvdb_memorydb::create(NUM_COLUMNS);
+	let db: polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter<kvdb_memorydb::InMemory> =
+		polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
+	let keystore = LocalKeystore::in_memory();
+
+	let system_clock =
+		PastSystemClock::new(SystemClock {}, state.delta_tick_from_generated.clone());
+	let approval_voting = ApprovalVotingSubsystem::with_config_and_clock(
+		TEST_CONFIG,
+		Arc::new(db),
+		Arc::new(keystore),
+		Box::new(TestSyncOracle {}),
+		state.approval_voting_metrics.clone(),
+		Box::new(system_clock.clone()),
+	);
+
+	let approval_distribution =
+		ApprovalDistribution::new(Metrics::register(Some(&dependencies.registry)).unwrap());
+	let mock_chain_api = MockChainApi::new(state.build_chain_api_state());
+	let mock_chain_selection = MockChainSelection { state: state.clone(), clock: system_clock };
+	let mock_runtime_api = MockRuntimeApi::new(
+		config.clone(),
+		state.test_authorities.clone(),
+		state.candidate_hashes_by_block(),
+		state.candidate_events_by_block(),
+		Some(state.babe_epoch.clone()),
+		1,
+	);
+	let mock_tx_bridge = MockNetworkBridgeTx::new(
+		network.clone(),
+		network_interface.subsystem_sender(),
+		state.test_authorities.clone(),
+	);
+	let mock_rx_bridge = MockNetworkBridgeRx::new(network_receiver, None);
+	let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
+	let dummy = dummy_builder!(spawn_task_handle, overseer_metrics)
+		.replace_approval_distribution(|_| approval_distribution)
+		.replace_approval_voting(|_| approval_voting)
+		.replace_chain_api(|_| mock_chain_api)
+		.replace_chain_selection(|_| mock_chain_selection)
+		.replace_runtime_api(|_| mock_runtime_api)
+		.replace_network_bridge_tx(|_| mock_tx_bridge)
+		.replace_network_bridge_rx(|_| mock_rx_bridge);
+
+	let (overseer, raw_handle) =
+		dummy.build_with_connector(overseer_connector).expect("Should not fail");
+
+	let overseer_handle = OverseerHandleReal::new(raw_handle);
+	(overseer, overseer_handle)
+}
+
+/// Takes a test configuration and uses it to creates the `TestEnvironment`.
+pub fn prepare_test(
+	config: TestConfiguration,
+	options: ApprovalsOptions,
+) -> (TestEnvironment, ApprovalTestState) {
+	prepare_test_inner(config, TestEnvironmentDependencies::default(), options)
+}
+
+/// Build the test environment for an Approval benchmark.
+fn prepare_test_inner(
+	config: TestConfiguration,
+	dependencies: TestEnvironmentDependencies,
+	options: ApprovalsOptions,
+) -> (TestEnvironment, ApprovalTestState) {
+	gum::info!("Prepare test state");
+	let state = ApprovalTestState::new(&config, options, &dependencies);
+
+	gum::info!("Build network emulator");
+
+	let (network, network_interface, network_receiver) =
+		new_network(&config, &dependencies, &state.test_authorities, vec![Arc::new(state.clone())]);
+
+	gum::info!("Build overseer");
+
+	let (overseer, overseer_handle) = build_overseer(
+		&state,
+		&network,
+		&config,
+		&dependencies,
+		&network_interface,
+		network_receiver,
+	);
+
+	(
+		TestEnvironment::new(
+			dependencies,
+			config,
+			network,
+			overseer,
+			overseer_handle,
+			state.test_authorities.clone(),
+		),
+		state,
+	)
+}
+
+pub async fn bench_approvals(env: &mut TestEnvironment, mut state: ApprovalTestState) {
+	let producer_rx = state
+		.start_message_production(
+			env.network(),
+			env.overseer_handle().clone(),
+			env,
+			env.registry().clone(),
+		)
+		.await;
+	bench_approvals_run(env, state, producer_rx).await
+}
+
+/// Runs the approval benchmark.
+pub async fn bench_approvals_run(
+	env: &mut TestEnvironment,
+	state: ApprovalTestState,
+	producer_rx: oneshot::Receiver<()>,
+) {
+	let config = env.config().clone();
+
+	env.metrics().set_n_validators(config.n_validators);
+	env.metrics().set_n_cores(config.n_cores);
+
+	// First create the initialization messages that make sure that then node under
+	// tests receives notifications about the topology used and the connected peers.
+	let mut initialization_messages = env.network().generate_peer_connected();
+	initialization_messages.extend(generate_new_session_topology(
+		&state.test_authorities,
+		ValidatorIndex(NODE_UNDER_TEST),
+	));
+	for message in initialization_messages {
+		env.send_message(message).await;
+	}
+
+	let start_marker = Instant::now();
+	let real_clock = SystemClock {};
+	state.delta_tick_from_generated.store(
+		real_clock.tick_now() -
+			slot_number_to_tick(SLOT_DURATION_MILLIS, state.generated_state.initial_slot),
+		std::sync::atomic::Ordering::SeqCst,
+	);
+	let system_clock = PastSystemClock::new(real_clock, state.delta_tick_from_generated.clone());
+
+	for block_num in 0..env.config().num_blocks {
+		let mut current_slot = tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now());
+
+		// Wait untill the time arrieves at the first slot under test.
+		while current_slot < state.generated_state.initial_slot {
+			sleep(Duration::from_millis(5)).await;
+			current_slot = tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now());
+		}
+
+		gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num + 1, env.config().num_blocks);
+		env.metrics().set_current_block(block_num);
+		let block_start_ts = Instant::now();
+
+		if let Some(block_info) = state.get_info_by_slot(current_slot) {
+			env.import_block(new_block_import_info(block_info.hash, block_info.block_number))
+				.await;
+		}
+
+		let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
+		env.metrics().set_block_time(block_time);
+		gum::info!("Block time {}", format!("{:?}ms", block_time).cyan());
+
+		system_clock
+			.wait(slot_number_to_tick(SLOT_DURATION_MILLIS, current_slot + 1))
+			.await;
+	}
+
+	// Wait for all blocks to be approved before exiting.
+	// This is an invariant of the benchmark, if this does not happen something went teribbly wrong.
+	while state.last_approved_block.load(std::sync::atomic::Ordering::SeqCst) <
+		env.config().num_blocks as u32
+	{
+		gum::info!(
+			"Waiting for all blocks to be approved current approved {:} num_sent {:} num_unique {:}",
+			state.last_approved_block.load(std::sync::atomic::Ordering::SeqCst),
+			state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst),
+			state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst)
+		);
+		tokio::time::sleep(Duration::from_secs(6)).await;
+	}
+
+	gum::info!("Awaiting producer to signal done");
+
+	producer_rx.await.expect("Failed to receive done from message producer");
+
+	gum::info!("Awaiting polkadot_parachain_subsystem_bounded_received to tells us the messages have been processed");
+	let at_least_messages =
+		state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize;
+	env.wait_until_metric(
+		"polkadot_parachain_subsystem_bounded_received",
+		Some(("subsystem_name", "approval-distribution-subsystem")),
+		|value| {
+			gum::info!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric");
+			value >= at_least_messages as f64
+		},
+	)
+	.await;
+	gum::info!("Requesting approval votes ms");
+
+	for info in &state.blocks {
+		for (index, candidates) in info.candidates.iter().enumerate() {
+			match candidates {
+				CandidateEvent::CandidateBacked(_, _, _, _) => todo!(),
+				CandidateEvent::CandidateIncluded(receipt_fetch, _head, _, _) => {
+					let (tx, rx) = oneshot::channel();
+
+					let msg = ApprovalVotingMessage::GetApprovalSignaturesForCandidate(
+						receipt_fetch.hash(),
+						tx,
+					);
+					env.send_message(AllMessages::ApprovalVoting(msg)).await;
+
+					let result = rx.await.unwrap();
+
+					for (validator, _) in result.iter() {
+						info.votes
+							.get(validator.0 as usize)
+							.unwrap()
+							.get(index)
+							.unwrap()
+							.store(false, std::sync::atomic::Ordering::SeqCst);
+					}
+				},
+
+				CandidateEvent::CandidateTimedOut(_, _, _) => todo!(),
+			};
+		}
+	}
+
+	gum::info!("Awaiting polkadot_parachain_subsystem_bounded_received to tells us the messages have been processed");
+	let at_least_messages =
+		state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize;
+	env.wait_until_metric(
+		"polkadot_parachain_subsystem_bounded_received",
+		Some(("subsystem_name", "approval-distribution-subsystem")),
+		|value| {
+			gum::info!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric");
+			value >= at_least_messages as f64
+		},
+	)
+	.await;
+
+	for state in &state.blocks {
+		for (validator, votes) in state
+			.votes
+			.as_ref()
+			.iter()
+			.enumerate()
+			.filter(|(validator, _)| *validator != NODE_UNDER_TEST as usize)
+		{
+			for (index, candidate) in votes.iter().enumerate() {
+				assert_eq!(
+					(
+						validator,
+						index,
+						candidate.load(std::sync::atomic::Ordering::SeqCst),
+						state.hash
+					),
+					(validator, index, false, state.hash)
+				);
+			}
+		}
+	}
+
+	env.stop().await;
+
+	let duration: u128 = start_marker.elapsed().as_millis();
+	gum::info!(
+		"All blocks processed in {} total_sent_messages_to_node {} total_sent_messages_from_node {} num_unique_messages {}",
+		format!("{:?}ms", duration).cyan(),
+		state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst),
+		state.total_sent_messages_from_node.load(std::sync::atomic::Ordering::SeqCst),
+		state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst)
+	);
+
+	env.display_network_usage();
+	env.display_cpu_usage(&["approval-distribution", "approval-voting"]);
+}
diff --git a/polkadot/node/subsystem-bench/src/approval/test_message.rs b/polkadot/node/subsystem-bench/src/approval/test_message.rs
new file mode 100644
index 0000000000000000000000000000000000000000..ea578776f261d07e7124f8abc12d5e1c03f00987
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/approval/test_message.rs
@@ -0,0 +1,304 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use super::{ApprovalsOptions, BlockTestData, CandidateTestData};
+use crate::core::configuration::TestAuthorities;
+use itertools::Itertools;
+use parity_scale_codec::{Decode, Encode};
+use polkadot_node_network_protocol::v3 as protocol_v3;
+
+use polkadot_primitives::{CandidateIndex, Hash, ValidatorIndex};
+use sc_network::PeerId;
+use std::collections::{HashMap, HashSet};
+
+#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
+pub struct TestMessageInfo {
+	/// The actual message
+	pub msg: protocol_v3::ApprovalDistributionMessage,
+	/// The list of peers that would sends this message in a real topology.
+	/// It includes both the peers that would send the message because of the topology
+	/// or because of randomly chosing so.
+	pub sent_by: Vec<ValidatorIndex>,
+	/// The tranche at which this message should be sent.
+	pub tranche: u32,
+	/// The block hash this message refers to.
+	pub block_hash: Hash,
+}
+
+impl std::hash::Hash for TestMessageInfo {
+	fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+		match &self.msg {
+			protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
+				for (assignment, candidates) in assignments {
+					(assignment.block_hash, assignment.validator).hash(state);
+					candidates.hash(state);
+				}
+			},
+			protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => {
+				for approval in approvals {
+					(approval.block_hash, approval.validator).hash(state);
+					approval.candidate_indices.hash(state);
+				}
+			},
+		};
+	}
+}
+
+#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
+/// A list of messages that depend of each-other, approvals cover one of the assignments and
+/// vice-versa.
+pub struct MessagesBundle {
+	pub assignments: Vec<TestMessageInfo>,
+	pub approvals: Vec<TestMessageInfo>,
+}
+
+impl MessagesBundle {
+	/// The tranche when this bundle can be sent correctly, so no assignments or approvals will be
+	/// from the future.
+	pub fn tranche_to_send(&self) -> u32 {
+		self.assignments
+			.iter()
+			.chain(self.approvals.iter())
+			.max_by(|a, b| a.tranche.cmp(&b.tranche))
+			.unwrap()
+			.tranche
+	}
+
+	/// The min tranche in the bundle.
+	pub fn min_tranche(&self) -> u32 {
+		self.assignments
+			.iter()
+			.chain(self.approvals.iter())
+			.min_by(|a, b| a.tranche.cmp(&b.tranche))
+			.unwrap()
+			.tranche
+	}
+
+	/// Tells if the bundle is needed for sending.
+	/// We either send it because we need more assignments and approvals to approve the candidates
+	/// or because we configured the test to send messages untill a given tranche.
+	pub fn should_send(
+		&self,
+		candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
+		options: &ApprovalsOptions,
+	) -> bool {
+		self.needed_for_approval(candidates_test_data) ||
+			(!options.stop_when_approved &&
+				self.min_tranche() <= options.last_considered_tranche)
+	}
+
+	/// Tells if the bundle is needed because we need more messages to approve the candidates.
+	pub fn needed_for_approval(
+		&self,
+		candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
+	) -> bool {
+		self.assignments
+			.iter()
+			.any(|message| message.needed_for_approval(candidates_test_data))
+	}
+
+	/// Mark the assignments in the bundle as sent.
+	pub fn record_sent_assignment(
+		&self,
+		candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
+	) {
+		self.assignments
+			.iter()
+			.for_each(|assignment| assignment.record_sent_assignment(candidates_test_data));
+	}
+}
+
+impl TestMessageInfo {
+	/// Tells if the message is an approval.
+	fn is_approval(&self) -> bool {
+		match self.msg {
+			protocol_v3::ApprovalDistributionMessage::Assignments(_) => false,
+			protocol_v3::ApprovalDistributionMessage::Approvals(_) => true,
+		}
+	}
+
+	/// Records an approval.
+	/// We use this to check after all messages have been processed that we didn't loose any
+	/// message.
+	pub fn record_vote(&self, state: &BlockTestData) {
+		if self.is_approval() {
+			match &self.msg {
+				protocol_v3::ApprovalDistributionMessage::Assignments(_) => todo!(),
+				protocol_v3::ApprovalDistributionMessage::Approvals(approvals) =>
+					for approval in approvals {
+						for candidate_index in approval.candidate_indices.iter_ones() {
+							state
+								.votes
+								.get(approval.validator.0 as usize)
+								.unwrap()
+								.get(candidate_index)
+								.unwrap()
+								.store(true, std::sync::atomic::Ordering::SeqCst);
+						}
+					},
+			}
+		}
+	}
+
+	/// Mark the assignments in the message as sent.
+	pub fn record_sent_assignment(
+		&self,
+		candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
+	) {
+		match &self.msg {
+			protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
+				for (assignment, candidate_indices) in assignments {
+					for candidate_index in candidate_indices.iter_ones() {
+						let candidate_test_data = candidates_test_data
+							.get_mut(&(assignment.block_hash, candidate_index as CandidateIndex))
+							.unwrap();
+						candidate_test_data.mark_sent_assignment(self.tranche)
+					}
+				}
+			},
+			protocol_v3::ApprovalDistributionMessage::Approvals(_approvals) => todo!(),
+		}
+	}
+
+	/// Returns a list of candidates indicies in this message
+	pub fn candidate_indices(&self) -> HashSet<usize> {
+		let mut unique_candidate_indicies = HashSet::new();
+		match &self.msg {
+			protocol_v3::ApprovalDistributionMessage::Assignments(assignments) =>
+				for (_assignment, candidate_indices) in assignments {
+					for candidate_index in candidate_indices.iter_ones() {
+						unique_candidate_indicies.insert(candidate_index);
+					}
+				},
+			protocol_v3::ApprovalDistributionMessage::Approvals(approvals) =>
+				for approval in approvals {
+					for candidate_index in approval.candidate_indices.iter_ones() {
+						unique_candidate_indicies.insert(candidate_index);
+					}
+				},
+		}
+		unique_candidate_indicies
+	}
+
+	/// Marks this message as no-shows if the number of configured no-shows is above the registered
+	/// no-shows.
+	/// Returns true if the message is a no-show.
+	pub fn no_show_if_required(
+		&self,
+		assignments: &[TestMessageInfo],
+		candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
+	) -> bool {
+		let mut should_no_show = false;
+		if self.is_approval() {
+			let covered_candidates = assignments
+				.iter()
+				.map(|assignment| (assignment, assignment.candidate_indices()))
+				.collect_vec();
+
+			match &self.msg {
+				protocol_v3::ApprovalDistributionMessage::Assignments(_) => todo!(),
+				protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => {
+					assert_eq!(approvals.len(), 1);
+
+					for approval in approvals {
+						should_no_show = should_no_show ||
+							approval.candidate_indices.iter_ones().all(|candidate_index| {
+								let candidate_test_data = candidates_test_data
+									.get_mut(&(
+										approval.block_hash,
+										candidate_index as CandidateIndex,
+									))
+									.unwrap();
+								let assignment = covered_candidates
+									.iter()
+									.find(|(_assignment, candidates)| {
+										candidates.contains(&candidate_index)
+									})
+									.unwrap();
+								candidate_test_data.should_no_show(assignment.0.tranche)
+							});
+
+						if should_no_show {
+							for candidate_index in approval.candidate_indices.iter_ones() {
+								let candidate_test_data = candidates_test_data
+									.get_mut(&(
+										approval.block_hash,
+										candidate_index as CandidateIndex,
+									))
+									.unwrap();
+								let assignment = covered_candidates
+									.iter()
+									.find(|(_assignment, candidates)| {
+										candidates.contains(&candidate_index)
+									})
+									.unwrap();
+								candidate_test_data.record_no_show(assignment.0.tranche)
+							}
+						}
+					}
+				},
+			}
+		}
+		should_no_show
+	}
+
+	/// Tells if a message is needed for approval
+	pub fn needed_for_approval(
+		&self,
+		candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
+	) -> bool {
+		match &self.msg {
+			protocol_v3::ApprovalDistributionMessage::Assignments(assignments) =>
+				assignments.iter().any(|(assignment, candidate_indices)| {
+					candidate_indices.iter_ones().any(|candidate_index| {
+						candidates_test_data
+							.get(&(assignment.block_hash, candidate_index as CandidateIndex))
+							.map(|data| data.should_send_tranche(self.tranche))
+							.unwrap_or_default()
+					})
+				}),
+			protocol_v3::ApprovalDistributionMessage::Approvals(approvals) =>
+				approvals.iter().any(|approval| {
+					approval.candidate_indices.iter_ones().any(|candidate_index| {
+						candidates_test_data
+							.get(&(approval.block_hash, candidate_index as CandidateIndex))
+							.map(|data| data.should_send_tranche(self.tranche))
+							.unwrap_or_default()
+					})
+				}),
+		}
+	}
+
+	/// Splits a message into multiple messages based on what peers should send this message.
+	/// It build a HashMap of messages that should be sent by each peer.
+	pub fn split_by_peer_id(
+		self,
+		authorities: &TestAuthorities,
+	) -> HashMap<(ValidatorIndex, PeerId), Vec<TestMessageInfo>> {
+		let mut result: HashMap<(ValidatorIndex, PeerId), Vec<TestMessageInfo>> = HashMap::new();
+
+		for validator_index in &self.sent_by {
+			let peer = authorities.peer_ids.get(validator_index.0 as usize).unwrap();
+			result.entry((*validator_index, *peer)).or_default().push(TestMessageInfo {
+				msg: self.msg.clone(),
+				sent_by: Default::default(),
+				tranche: self.tranche,
+				block_hash: self.block_hash,
+			});
+		}
+		result
+	}
+}
diff --git a/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs b/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs
index 18ea2f72891fd7f9839449bb2e4495dc5b60621b..e6827f1d8aeaa63678561fb295a4a5166eb50b28 100644
--- a/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs
+++ b/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs
@@ -14,6 +14,8 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
+use crate::core::mock::TestSyncOracle;
+
 use super::*;
 
 use polkadot_node_metrics::metrics::Metrics;
@@ -31,22 +33,10 @@ mod columns {
 
 const TEST_CONFIG: Config = Config { col_data: columns::DATA, col_meta: columns::META };
 
-struct DumbOracle;
-
-impl sp_consensus::SyncOracle for DumbOracle {
-	fn is_major_syncing(&self) -> bool {
-		false
-	}
-
-	fn is_offline(&self) -> bool {
-		unimplemented!("oh no!")
-	}
-}
-
 pub fn new_av_store(dependencies: &TestEnvironmentDependencies) -> AvailabilityStoreSubsystem {
 	let metrics = Metrics::try_register(&dependencies.registry).unwrap();
 
-	AvailabilityStoreSubsystem::new(test_store(), TEST_CONFIG, Box::new(DumbOracle), metrics)
+	AvailabilityStoreSubsystem::new(test_store(), TEST_CONFIG, Box::new(TestSyncOracle {}), metrics)
 }
 
 fn test_store() -> Arc<dyn Database> {
diff --git a/polkadot/node/subsystem-bench/src/availability/mod.rs b/polkadot/node/subsystem-bench/src/availability/mod.rs
index f9892efb3c68ce31763f7caa1e4e4bb76a2b938a..f7f1184448b357edab23ec2e8d28f51c8291d8ff 100644
--- a/polkadot/node/subsystem-bench/src/availability/mod.rs
+++ b/polkadot/node/subsystem-bench/src/availability/mod.rs
@@ -25,7 +25,7 @@ use polkadot_node_subsystem_types::{
 	messages::{AvailabilityStoreMessage, NetworkBridgeEvent},
 	Span,
 };
-use polkadot_overseer::Handle as OverseerHandle;
+use polkadot_overseer::{metrics::Metrics as OverseerMetrics, Handle as OverseerHandle};
 use sc_network::{request_responses::ProtocolConfig, PeerId};
 use sp_core::H256;
 use std::{collections::HashMap, iter::Cycle, ops::Sub, sync::Arc, time::Instant};
@@ -85,9 +85,12 @@ fn build_overseer_for_availability_read(
 	av_store: MockAvailabilityStore,
 	network_bridge: (MockNetworkBridgeTx, MockNetworkBridgeRx),
 	availability_recovery: AvailabilityRecoverySubsystem,
+	dependencies: &TestEnvironmentDependencies,
 ) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
 	let overseer_connector = OverseerConnector::with_event_capacity(64000);
-	let dummy = dummy_builder!(spawn_task_handle);
+	let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
+
+	let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
 	let builder = dummy
 		.replace_runtime_api(|_| runtime_api)
 		.replace_availability_store(|_| av_store)
@@ -101,6 +104,7 @@ fn build_overseer_for_availability_read(
 	(overseer, OverseerHandle::new(raw_handle))
 }
 
+#[allow(clippy::too_many_arguments)]
 fn build_overseer_for_availability_write(
 	spawn_task_handle: SpawnTaskHandle,
 	runtime_api: MockRuntimeApi,
@@ -109,9 +113,12 @@ fn build_overseer_for_availability_write(
 	chain_api: MockChainApi,
 	availability_store: AvailabilityStoreSubsystem,
 	bitfield_distribution: BitfieldDistribution,
+	dependencies: &TestEnvironmentDependencies,
 ) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
 	let overseer_connector = OverseerConnector::with_event_capacity(64000);
-	let dummy = dummy_builder!(spawn_task_handle);
+	let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
+
+	let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
 	let builder = dummy
 		.replace_runtime_api(|_| runtime_api)
 		.replace_availability_store(|_| availability_store)
@@ -171,6 +178,9 @@ fn prepare_test_inner(
 		config.clone(),
 		test_authorities.clone(),
 		candidate_hashes,
+		Default::default(),
+		Default::default(),
+		0,
 	);
 
 	let availability_state = NetworkAvailabilityState {
@@ -198,6 +208,7 @@ fn prepare_test_inner(
 	let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new(
 		network.clone(),
 		network_interface.subsystem_sender(),
+		test_authorities.clone(),
 	);
 
 	let network_bridge_rx =
@@ -231,6 +242,7 @@ fn prepare_test_inner(
 				av_store,
 				(network_bridge_tx, network_bridge_rx),
 				subsystem,
+				&dependencies,
 			)
 		},
 		TestObjective::DataAvailabilityWrite => {
@@ -240,7 +252,7 @@ fn prepare_test_inner(
 				Metrics::try_register(&dependencies.registry).unwrap(),
 			);
 
-			let block_headers = (0..=config.num_blocks)
+			let block_headers = (1..=config.num_blocks)
 				.map(|block_number| {
 					(
 						Hash::repeat_byte(block_number as u8),
@@ -267,6 +279,7 @@ fn prepare_test_inner(
 				chain_api,
 				new_av_store(&dependencies),
 				bitfield_distribution,
+				&dependencies,
 			)
 		},
 		_ => {
@@ -614,9 +627,10 @@ pub async fn benchmark_availability_write(env: &mut TestEnvironment, mut state:
 		);
 
 		// Wait for all bitfields to be processed.
-		env.wait_until_metric_eq(
+		env.wait_until_metric(
 			"polkadot_parachain_received_availabilty_bitfields_total",
-			config.connected_count() * block_num,
+			None,
+			|value| value == (config.connected_count() * block_num) as f64,
 		)
 		.await;
 
diff --git a/polkadot/node/subsystem-bench/src/cli.rs b/polkadot/node/subsystem-bench/src/cli.rs
index 7213713eb6baa38537bf32b1c1b7867c0e0ad846..bfce8cc183a9b06ae3a91c7fb3600835dcf72885 100644
--- a/polkadot/node/subsystem-bench/src/cli.rs
+++ b/polkadot/node/subsystem-bench/src/cli.rs
@@ -14,6 +14,7 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 use super::availability::DataAvailabilityReadOptions;
+use crate::approval::ApprovalsOptions;
 use serde::{Deserialize, Serialize};
 
 #[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
@@ -34,6 +35,9 @@ pub enum TestObjective {
 	DataAvailabilityWrite,
 	/// Run a test sequence specified in a file
 	TestSequence(TestSequenceOptions),
+	/// Benchmark the approval-voting and approval-distribution subsystems.
+	ApprovalVoting(ApprovalsOptions),
+	Unimplemented,
 }
 
 #[derive(Debug, clap::Parser)]
diff --git a/polkadot/node/subsystem-bench/src/core/configuration.rs b/polkadot/node/subsystem-bench/src/core/configuration.rs
index 66da8a1db45d7bcd008c3bf5c0f3693594ac35b5..0c8a78c504c8c22ca83eb0d7362aca3e2bcda916 100644
--- a/polkadot/node/subsystem-bench/src/core/configuration.rs
+++ b/polkadot/node/subsystem-bench/src/core/configuration.rs
@@ -16,11 +16,14 @@
 //
 //! Test configuration definition and helpers.
 use super::*;
+use itertools::Itertools;
 use keyring::Keyring;
-use std::path::Path;
+use sc_network::PeerId;
+use sp_consensus_babe::AuthorityId;
+use std::{collections::HashMap, path::Path};
 
 pub use crate::cli::TestObjective;
-use polkadot_primitives::{AuthorityDiscoveryId, ValidatorId};
+use polkadot_primitives::{AssignmentId, AuthorityDiscoveryId, ValidatorId};
 use rand::thread_rng;
 use rand_distr::{Distribution, Normal, Uniform};
 
@@ -65,6 +68,25 @@ fn default_backing_group_size() -> usize {
 	5
 }
 
+// Default needed approvals
+fn default_needed_approvals() -> usize {
+	30
+}
+
+fn default_zeroth_delay_tranche_width() -> usize {
+	0
+}
+fn default_relay_vrf_modulo_samples() -> usize {
+	6
+}
+
+fn default_n_delay_tranches() -> usize {
+	89
+}
+fn default_no_show_slots() -> usize {
+	3
+}
+
 /// The test input parameters
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct TestConfiguration {
@@ -74,6 +96,17 @@ pub struct TestConfiguration {
 	pub n_validators: usize,
 	/// Number of cores
 	pub n_cores: usize,
+	/// The number of needed votes to approve a candidate.
+	#[serde(default = "default_needed_approvals")]
+	pub needed_approvals: usize,
+	#[serde(default = "default_zeroth_delay_tranche_width")]
+	pub zeroth_delay_tranche_width: usize,
+	#[serde(default = "default_relay_vrf_modulo_samples")]
+	pub relay_vrf_modulo_samples: usize,
+	#[serde(default = "default_n_delay_tranches")]
+	pub n_delay_tranches: usize,
+	#[serde(default = "default_no_show_slots")]
+	pub no_show_slots: usize,
 	/// Maximum backing group size
 	#[serde(default = "default_backing_group_size")]
 	pub max_validators_per_core: usize,
@@ -139,6 +172,11 @@ pub struct TestAuthorities {
 	pub keyring: Keyring,
 	pub validator_public: Vec<ValidatorId>,
 	pub validator_authority_id: Vec<AuthorityDiscoveryId>,
+	pub validator_babe_id: Vec<AuthorityId>,
+	pub validator_assignment_id: Vec<AssignmentId>,
+	pub key_seeds: Vec<String>,
+	pub peer_ids: Vec<PeerId>,
+	pub peer_id_to_authority: HashMap<PeerId, AuthorityDiscoveryId>,
 }
 
 impl TestConfiguration {
@@ -162,18 +200,45 @@ impl TestConfiguration {
 	pub fn generate_authorities(&self) -> TestAuthorities {
 		let keyring = Keyring::default();
 
-		let keys = (0..self.n_validators)
-			.map(|peer_index| keyring.sr25519_new(format!("Node{}", peer_index)))
+		let key_seeds = (0..self.n_validators)
+			.map(|peer_index| format!("//Node{}", peer_index))
+			.collect_vec();
+
+		let keys = key_seeds
+			.iter()
+			.map(|seed| keyring.sr25519_new(seed.as_str()))
 			.collect::<Vec<_>>();
 
-		// Generate `AuthorityDiscoveryId`` for each peer
+		// Generate keys and peers ids in each of the format needed by the tests.
 		let validator_public: Vec<ValidatorId> =
 			keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
 
 		let validator_authority_id: Vec<AuthorityDiscoveryId> =
 			keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
 
-		TestAuthorities { keyring, validator_public, validator_authority_id }
+		let validator_babe_id: Vec<AuthorityId> =
+			keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
+
+		let validator_assignment_id: Vec<AssignmentId> =
+			keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
+		let peer_ids: Vec<PeerId> = keys.iter().map(|_| PeerId::random()).collect::<Vec<_>>();
+
+		let peer_id_to_authority = peer_ids
+			.iter()
+			.zip(validator_authority_id.iter())
+			.map(|(peer_id, authorithy_id)| (*peer_id, authorithy_id.clone()))
+			.collect();
+
+		TestAuthorities {
+			keyring,
+			validator_public,
+			validator_authority_id,
+			peer_ids,
+			validator_babe_id,
+			validator_assignment_id,
+			key_seeds,
+			peer_id_to_authority,
+		}
 	}
 
 	/// An unconstrained standard configuration matching Polkadot/Kusama
@@ -199,6 +264,11 @@ impl TestConfiguration {
 			min_pov_size,
 			max_pov_size,
 			connectivity: 100,
+			needed_approvals: default_needed_approvals(),
+			n_delay_tranches: default_n_delay_tranches(),
+			no_show_slots: default_no_show_slots(),
+			relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(),
+			zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(),
 		}
 	}
 
@@ -223,6 +293,11 @@ impl TestConfiguration {
 			min_pov_size,
 			max_pov_size,
 			connectivity: 95,
+			needed_approvals: default_needed_approvals(),
+			n_delay_tranches: default_n_delay_tranches(),
+			no_show_slots: default_no_show_slots(),
+			relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(),
+			zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(),
 		}
 	}
 
@@ -247,6 +322,11 @@ impl TestConfiguration {
 			min_pov_size,
 			max_pov_size,
 			connectivity: 67,
+			needed_approvals: default_needed_approvals(),
+			n_delay_tranches: default_n_delay_tranches(),
+			no_show_slots: default_no_show_slots(),
+			relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(),
+			zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(),
 		}
 	}
 }
diff --git a/polkadot/node/subsystem-bench/src/core/display.rs b/polkadot/node/subsystem-bench/src/core/display.rs
index bca82d7b90ae9290b5bd969233f56f2df854f116..b130afdcfad578e2565d4556f67b76280df7de11 100644
--- a/polkadot/node/subsystem-bench/src/core/display.rs
+++ b/polkadot/node/subsystem-bench/src/core/display.rs
@@ -26,7 +26,7 @@ use prometheus::{
 };
 use std::fmt::Display;
 
-#[derive(Default)]
+#[derive(Default, Debug)]
 pub struct MetricCollection(Vec<TestMetric>);
 
 impl From<Vec<TestMetric>> for MetricCollection {
@@ -49,6 +49,11 @@ impl MetricCollection {
 			.sum()
 	}
 
+	/// Tells if entries in bucket metric is lower than `value`
+	pub fn metric_lower_than(&self, metric_name: &str, value: f64) -> bool {
+		self.sum_by(metric_name) < value
+	}
+
 	pub fn subset_with_label_value(&self, label_name: &str, label_value: &str) -> MetricCollection {
 		self.0
 			.iter()
@@ -163,7 +168,7 @@ pub fn parse_metrics(registry: &Registry) -> MetricCollection {
 						name: h_name,
 						label_names,
 						label_values,
-						value: h.get_sample_sum(),
+						value: h.get_sample_count() as f64,
 					});
 				},
 				MetricType::SUMMARY => {
diff --git a/polkadot/node/subsystem-bench/src/core/environment.rs b/polkadot/node/subsystem-bench/src/core/environment.rs
index b6846316430b0da0fcc7d2c21cb93fc6f2e582e2..59bfed7f112005ad9002afaddd2cd51200ddcb3f 100644
--- a/polkadot/node/subsystem-bench/src/core/environment.rs
+++ b/polkadot/node/subsystem-bench/src/core/environment.rs
@@ -243,6 +243,11 @@ impl TestEnvironment {
 		&self.network
 	}
 
+	/// Returns a reference to the overseer handle.
+	pub fn overseer_handle(&self) -> &OverseerHandle {
+		&self.overseer_handle
+	}
+
 	/// Returns the Prometheus registry.
 	pub fn registry(&self) -> &Registry {
 		&self.dependencies.registry
@@ -311,18 +316,32 @@ impl TestEnvironment {
 		self.overseer_handle.stop().await;
 	}
 
-	/// Blocks until `metric_name` == `value`
-	pub async fn wait_until_metric_eq(&self, metric_name: &str, value: usize) {
-		let value = value as f64;
+	/// Tells if entries in bucket metric is lower than `value`
+	pub fn metric_lower_than(registry: &Registry, metric_name: &str, value: f64) -> bool {
+		let test_metrics = super::display::parse_metrics(registry);
+		test_metrics.metric_lower_than(metric_name, value)
+	}
+
+	/// Blocks until `metric_name` >= `value`
+	pub async fn wait_until_metric(
+		&self,
+		metric_name: &str,
+		label: Option<(&str, &str)>,
+		condition: impl Fn(f64) -> bool,
+	) {
 		loop {
-			let test_metrics = super::display::parse_metrics(self.registry());
+			let test_metrics = if let Some((label_name, label_value)) = label {
+				super::display::parse_metrics(self.registry())
+					.subset_with_label_value(label_name, label_value)
+			} else {
+				super::display::parse_metrics(self.registry())
+			};
 			let current_value = test_metrics.sum_by(metric_name);
 
-			gum::debug!(target: LOG_TARGET, metric_name, current_value, value, "Waiting for metric");
-			if current_value == value {
+			gum::debug!(target: LOG_TARGET, metric_name, current_value, "Waiting for metric");
+			if condition(current_value) {
 				break
 			}
-
 			// Check value every 50ms.
 			tokio::time::sleep(std::time::Duration::from_millis(50)).await;
 		}
diff --git a/polkadot/node/subsystem-bench/src/core/keyring.rs b/polkadot/node/subsystem-bench/src/core/keyring.rs
index 66c7229847c3a34b8ecfda6339ced5ee8b086d28..c290d30b46fbe1a2a0db04155d8460c288fd98ff 100644
--- a/polkadot/node/subsystem-bench/src/core/keyring.rs
+++ b/polkadot/node/subsystem-bench/src/core/keyring.rs
@@ -34,13 +34,17 @@ impl Default for Keyring {
 }
 
 impl Keyring {
-	pub fn sr25519_new(&self, name: String) -> Public {
+	pub fn sr25519_new(&self, seed: &str) -> Public {
 		self.keystore
-			.sr25519_generate_new(ValidatorId::ID, Some(&format!("//{}", name)))
+			.sr25519_generate_new(ValidatorId::ID, Some(seed))
 			.expect("Insert key into keystore")
 	}
 
 	pub fn keystore(&self) -> Arc<dyn Keystore> {
 		self.keystore.clone()
 	}
+
+	pub fn keystore_ref(&self) -> &LocalKeystore {
+		self.keystore.as_ref()
+	}
 }
diff --git a/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs b/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs
index 008d8eef106a0f0b4ebe1f266bd850956e698e23..7a5ee80de8007fcd9d4c362676ebb391f0b268d9 100644
--- a/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs
+++ b/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs
@@ -16,6 +16,7 @@
 //!
 //! A generic runtime api subsystem mockup suitable to be used in benchmarks.
 
+use itertools::Itertools;
 use polkadot_primitives::Header;
 
 use polkadot_node_subsystem::{
@@ -38,6 +39,12 @@ pub struct MockChainApi {
 	state: ChainApiState,
 }
 
+impl ChainApiState {
+	fn get_header_by_number(&self, requested_number: u32) -> Option<&Header> {
+		self.block_headers.values().find(|header| header.number == requested_number)
+	}
+}
+
 impl MockChainApi {
 	pub fn new(state: ChainApiState) -> MockChainApi {
 		Self { state }
@@ -77,9 +84,44 @@ impl MockChainApi {
 									.expect("Relay chain block hashes are known"),
 							)));
 						},
-						ChainApiMessage::Ancestors { hash: _hash, k: _k, response_channel } => {
-							// For our purposes, no ancestors is fine.
-							let _ = response_channel.send(Ok(Vec::new()));
+						ChainApiMessage::FinalizedBlockNumber(val) => {
+							val.send(Ok(0)).unwrap();
+						},
+						ChainApiMessage::FinalizedBlockHash(requested_number, sender) => {
+							let hash = self
+								.state
+								.get_header_by_number(requested_number)
+								.expect("Unknow block number")
+								.hash();
+							sender.send(Ok(Some(hash))).unwrap();
+						},
+						ChainApiMessage::BlockNumber(requested_hash, sender) => {
+							sender
+								.send(Ok(Some(
+									self.state
+										.block_headers
+										.get(&requested_hash)
+										.expect("Unknown block hash")
+										.number,
+								)))
+								.unwrap();
+						},
+						ChainApiMessage::Ancestors { hash, k: _, response_channel } => {
+							let block_number = self
+								.state
+								.block_headers
+								.get(&hash)
+								.expect("Unknown block hash")
+								.number;
+							let ancestors = self
+								.state
+								.block_headers
+								.iter()
+								.filter(|(_, header)| header.number < block_number)
+								.sorted_by(|a, b| a.1.number.cmp(&b.1.number))
+								.map(|(hash, _)| *hash)
+								.collect_vec();
+							response_channel.send(Ok(ancestors)).unwrap();
 						},
 						_ => {
 							unimplemented!("Unexpected chain-api message")
diff --git a/polkadot/node/subsystem-bench/src/core/mock/mod.rs b/polkadot/node/subsystem-bench/src/core/mock/mod.rs
index b67c6611e8cd34ba3afe058c8a8a28774d291036..e766b07e8b16479f9ce654e8a56f430b60136432 100644
--- a/polkadot/node/subsystem-bench/src/core/mock/mod.rs
+++ b/polkadot/node/subsystem-bench/src/core/mock/mod.rs
@@ -37,7 +37,7 @@ impl HeadSupportsParachains for AlwaysSupportsParachains {
 
 // An orchestra with dummy subsystems
 macro_rules! dummy_builder {
-	($spawn_task_handle: ident) => {{
+	($spawn_task_handle: ident, $metrics: ident) => {{
 		use super::core::mock::dummy::*;
 
 		// Initialize a mock overseer.
@@ -69,10 +69,24 @@ macro_rules! dummy_builder {
 			.activation_external_listeners(Default::default())
 			.span_per_active_leaf(Default::default())
 			.active_leaves(Default::default())
-			.metrics(Default::default())
+			.metrics($metrics)
 			.supports_parachains(AlwaysSupportsParachains {})
 			.spawner(SpawnGlue($spawn_task_handle))
 	}};
 }
 
 pub(crate) use dummy_builder;
+use sp_consensus::SyncOracle;
+
+#[derive(Clone)]
+pub struct TestSyncOracle {}
+
+impl SyncOracle for TestSyncOracle {
+	fn is_major_syncing(&self) -> bool {
+		false
+	}
+
+	fn is_offline(&self) -> bool {
+		unimplemented!("not used by subsystem benchmarks")
+	}
+}
diff --git a/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs b/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs
index a2be853ef8d51a9d6a34164d7be4392ae2214472..a171deb2e715d09093b7b0df1f0d9836551e9f84 100644
--- a/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs
+++ b/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs
@@ -18,11 +18,11 @@
 //! the emulated network.
 use futures::{channel::mpsc::UnboundedSender, FutureExt, StreamExt};
 use polkadot_node_subsystem_types::{
-	messages::{BitfieldDistributionMessage, NetworkBridgeEvent},
+	messages::{ApprovalDistributionMessage, BitfieldDistributionMessage, NetworkBridgeEvent},
 	OverseerSignal,
 };
 
-use sc_network::{request_responses::ProtocolConfig, PeerId, RequestFailure};
+use sc_network::{request_responses::ProtocolConfig, RequestFailure};
 
 use polkadot_node_subsystem::{
 	messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, SubsystemError,
@@ -30,8 +30,9 @@ use polkadot_node_subsystem::{
 
 use polkadot_node_network_protocol::Versioned;
 
-use crate::core::network::{
-	NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt,
+use crate::core::{
+	configuration::TestAuthorities,
+	network::{NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt},
 };
 
 const LOG_TARGET: &str = "subsystem-bench::network-bridge";
@@ -44,6 +45,8 @@ pub struct MockNetworkBridgeTx {
 	network: NetworkEmulatorHandle,
 	/// A channel to the network interface,
 	to_network_interface: UnboundedSender<NetworkMessage>,
+	/// Test authorithies
+	test_authorithies: TestAuthorities,
 }
 
 /// A mock of the network bridge tx subsystem.
@@ -58,8 +61,9 @@ impl MockNetworkBridgeTx {
 	pub fn new(
 		network: NetworkEmulatorHandle,
 		to_network_interface: UnboundedSender<NetworkMessage>,
+		test_authorithies: TestAuthorities,
 	) -> MockNetworkBridgeTx {
-		Self { network, to_network_interface }
+		Self { network, to_network_interface, test_authorithies }
 	}
 }
 
@@ -126,9 +130,21 @@ impl MockNetworkBridgeTx {
 					NetworkBridgeTxMessage::ReportPeer(_) => {
 						// ingore rep changes
 					},
-					_ => {
-						unimplemented!("Unexpected network bridge message")
+					NetworkBridgeTxMessage::SendValidationMessage(peers, message) => {
+						for peer in peers {
+							self.to_network_interface
+								.unbounded_send(NetworkMessage::MessageFromNode(
+									self.test_authorithies
+										.peer_id_to_authority
+										.get(&peer)
+										.unwrap()
+										.clone(),
+									message.clone(),
+								))
+								.expect("Should not fail");
+						}
 					},
+					_ => unimplemented!("Unexpected network bridge message"),
 				},
 			}
 		}
@@ -145,16 +161,23 @@ impl MockNetworkBridgeRx {
 				maybe_peer_message = from_network_interface.next() => {
 					if let Some(message) = maybe_peer_message {
 						match message {
-							NetworkMessage::MessageFromPeer(message) => match message {
+							NetworkMessage::MessageFromPeer(peer_id, message) => match message {
 								Versioned::V2(
 									polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution(
 										bitfield,
 									),
 								) => {
 									ctx.send_message(
-										BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(PeerId::random(), polkadot_node_network_protocol::Versioned::V2(bitfield)))
+										BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V2(bitfield)))
 									).await;
 								},
+								Versioned::V3(
+									polkadot_node_network_protocol::v3::ValidationProtocol::ApprovalDistribution(msg)
+								) => {
+									ctx.send_message(
+										ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg)))
+									).await;
+								}
 								_ => {
 									unimplemented!("We only talk v2 network protocol")
 								},
diff --git a/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs b/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs
index caefe068efff50bc24ae27e1bad86dc1e1da1c42..ca6896dbb29d50fbf25a040e5e0981debed93e35 100644
--- a/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs
+++ b/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs
@@ -16,8 +16,10 @@
 //!
 //! A generic runtime api subsystem mockup suitable to be used in benchmarks.
 
+use itertools::Itertools;
 use polkadot_primitives::{
-	CandidateReceipt, CoreState, GroupIndex, IndexedVec, OccupiedCore, SessionInfo, ValidatorIndex,
+	vstaging::NodeFeatures, CandidateEvent, CandidateReceipt, CoreState, GroupIndex, IndexedVec,
+	OccupiedCore, SessionIndex, SessionInfo, ValidatorIndex,
 };
 
 use bitvec::prelude::BitVec;
@@ -26,6 +28,7 @@ use polkadot_node_subsystem::{
 	overseer, SpawnedSubsystem, SubsystemError,
 };
 use polkadot_node_subsystem_types::OverseerSignal;
+use sp_consensus_babe::Epoch as BabeEpoch;
 use sp_core::H256;
 use std::collections::HashMap;
 
@@ -38,8 +41,13 @@ const LOG_TARGET: &str = "subsystem-bench::runtime-api-mock";
 pub struct RuntimeApiState {
 	// All authorities in the test,
 	authorities: TestAuthorities,
-	// Candidate
+	// Candidate hashes per block
 	candidate_hashes: HashMap<H256, Vec<CandidateReceipt>>,
+	// Included candidates per bock
+	included_candidates: HashMap<H256, Vec<CandidateEvent>>,
+	babe_epoch: Option<BabeEpoch>,
+	// The session child index,
+	session_index: SessionIndex,
 }
 
 /// A mocked `runtime-api` subsystem.
@@ -53,34 +61,57 @@ impl MockRuntimeApi {
 		config: TestConfiguration,
 		authorities: TestAuthorities,
 		candidate_hashes: HashMap<H256, Vec<CandidateReceipt>>,
+		included_candidates: HashMap<H256, Vec<CandidateEvent>>,
+		babe_epoch: Option<BabeEpoch>,
+		session_index: SessionIndex,
 	) -> MockRuntimeApi {
-		Self { state: RuntimeApiState { authorities, candidate_hashes }, config }
+		Self {
+			state: RuntimeApiState {
+				authorities,
+				candidate_hashes,
+				included_candidates,
+				babe_epoch,
+				session_index,
+			},
+			config,
+		}
 	}
 
 	fn session_info(&self) -> SessionInfo {
-		let all_validators = (0..self.config.n_validators)
-			.map(|i| ValidatorIndex(i as _))
-			.collect::<Vec<_>>();
-
-		let validator_groups = all_validators
-			.chunks(self.config.max_validators_per_core)
-			.map(Vec::from)
-			.collect::<Vec<_>>();
-		SessionInfo {
-			validators: self.state.authorities.validator_public.clone().into(),
-			discovery_keys: self.state.authorities.validator_authority_id.clone(),
-			validator_groups: IndexedVec::<GroupIndex, Vec<ValidatorIndex>>::from(validator_groups),
-			assignment_keys: vec![],
-			n_cores: self.config.n_cores as u32,
-			zeroth_delay_tranche_width: 0,
-			relay_vrf_modulo_samples: 0,
-			n_delay_tranches: 0,
-			no_show_slots: 0,
-			needed_approvals: 0,
-			active_validator_indices: vec![],
-			dispute_period: 6,
-			random_seed: [0u8; 32],
-		}
+		session_info_for_peers(&self.config, &self.state.authorities)
+	}
+}
+
+/// Generates a test session info with all passed authorities as consensus validators.
+pub fn session_info_for_peers(
+	configuration: &TestConfiguration,
+	authorities: &TestAuthorities,
+) -> SessionInfo {
+	let all_validators = (0..configuration.n_validators)
+		.map(|i| ValidatorIndex(i as _))
+		.collect::<Vec<_>>();
+
+	let validator_groups = all_validators
+		.chunks(configuration.max_validators_per_core)
+		.map(Vec::from)
+		.collect::<Vec<_>>();
+
+	SessionInfo {
+		validators: authorities.validator_public.iter().cloned().collect(),
+		discovery_keys: authorities.validator_authority_id.to_vec(),
+		assignment_keys: authorities.validator_assignment_id.to_vec(),
+		validator_groups: IndexedVec::<GroupIndex, Vec<ValidatorIndex>>::from(validator_groups),
+		n_cores: configuration.n_cores as u32,
+		needed_approvals: configuration.needed_approvals as u32,
+		zeroth_delay_tranche_width: configuration.zeroth_delay_tranche_width as u32,
+		relay_vrf_modulo_samples: configuration.relay_vrf_modulo_samples as u32,
+		n_delay_tranches: configuration.n_delay_tranches as u32,
+		no_show_slots: configuration.no_show_slots as u32,
+		active_validator_indices: (0..authorities.validator_authority_id.len())
+			.map(|index| ValidatorIndex(index as u32))
+			.collect_vec(),
+		dispute_period: 6,
+		random_seed: [0u8; 32],
 	}
 }
 
@@ -110,6 +141,13 @@ impl MockRuntimeApi {
 					gum::debug!(target: LOG_TARGET, msg=?msg, "recv message");
 
 					match msg {
+						RuntimeApiMessage::Request(
+							request,
+							RuntimeApiRequest::CandidateEvents(sender),
+						) => {
+							let candidate_events = self.state.included_candidates.get(&request);
+							let _ = sender.send(Ok(candidate_events.cloned().unwrap_or_default()));
+						},
 						RuntimeApiMessage::Request(
 							_block_hash,
 							RuntimeApiRequest::SessionInfo(_session_index, sender),
@@ -123,24 +161,24 @@ impl MockRuntimeApi {
 							let _ = sender.send(Ok(Some(Default::default())));
 						},
 						RuntimeApiMessage::Request(
-							_block_hash,
-							RuntimeApiRequest::Validators(sender),
+							_request,
+							RuntimeApiRequest::NodeFeatures(_session_index, sender),
 						) => {
-							let _ =
-								sender.send(Ok(self.state.authorities.validator_public.clone()));
+							let _ = sender.send(Ok(NodeFeatures::EMPTY));
 						},
 						RuntimeApiMessage::Request(
 							_block_hash,
-							RuntimeApiRequest::CandidateEvents(sender),
+							RuntimeApiRequest::Validators(sender),
 						) => {
-							let _ = sender.send(Ok(Default::default()));
+							let _ =
+								sender.send(Ok(self.state.authorities.validator_public.clone()));
 						},
 						RuntimeApiMessage::Request(
 							_block_hash,
 							RuntimeApiRequest::SessionIndexForChild(sender),
 						) => {
 							// Session is always the same.
-							let _ = sender.send(Ok(0));
+							let _ = sender.send(Ok(self.state.session_index));
 						},
 						RuntimeApiMessage::Request(
 							block_hash,
@@ -176,10 +214,14 @@ impl MockRuntimeApi {
 							let _ = sender.send(Ok(cores));
 						},
 						RuntimeApiMessage::Request(
-							_block_hash,
-							RuntimeApiRequest::NodeFeatures(_session_index, sender),
+							_request,
+							RuntimeApiRequest::CurrentBabeEpoch(sender),
 						) => {
-							let _ = sender.send(Ok(Default::default()));
+							let _ = sender.send(Ok(self
+								.state
+								.babe_epoch
+								.clone()
+								.expect("Babe epoch unpopulated")));
 						},
 						// Long term TODO: implement more as needed.
 						message => {
diff --git a/polkadot/node/subsystem-bench/src/core/mod.rs b/polkadot/node/subsystem-bench/src/core/mod.rs
index 282788d143b44a9a2444533f1eda756e0385c0a2..507dd1aa83f6634583182c0faf77872ce556d52a 100644
--- a/polkadot/node/subsystem-bench/src/core/mod.rs
+++ b/polkadot/node/subsystem-bench/src/core/mod.rs
@@ -15,6 +15,8 @@
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
 const LOG_TARGET: &str = "subsystem-bench::core";
+// The validator index that represent the node that is under test.
+pub const NODE_UNDER_TEST: u32 = 0;
 
 pub mod configuration;
 pub mod display;
diff --git a/polkadot/node/subsystem-bench/src/core/network.rs b/polkadot/node/subsystem-bench/src/core/network.rs
index e2932bf0f51b662213b0cdf041e8409d9d590694..8e7c28140635377785ff1a2646128cda741d17de 100644
--- a/polkadot/node/subsystem-bench/src/core/network.rs
+++ b/polkadot/node/subsystem-bench/src/core/network.rs
@@ -10,7 +10,6 @@
 // but WITHOUT ANY WARRANTY; without even the implied warranty of
 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 // GNU General Public License for more details.
-
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 //!
@@ -48,17 +47,21 @@ use futures::{
 	stream::FuturesUnordered,
 };
 
+use itertools::Itertools;
 use net_protocol::{
+	peer_set::{ProtocolVersion, ValidationVersion},
 	request_response::{Recipient, Requests, ResponseSender},
-	VersionedValidationProtocol,
+	ObservedRole, VersionedValidationProtocol,
 };
 use parity_scale_codec::Encode;
+use polkadot_node_subsystem_types::messages::{ApprovalDistributionMessage, NetworkBridgeEvent};
+use polkadot_overseer::AllMessages;
 use polkadot_primitives::AuthorityDiscoveryId;
 use prometheus_endpoint::U64;
 use rand::{seq::SliceRandom, thread_rng};
 use sc_network::{
 	request_responses::{IncomingRequest, OutgoingResponse},
-	RequestFailure,
+	PeerId, RequestFailure,
 };
 use sc_service::SpawnTaskHandle;
 use std::{
@@ -142,7 +145,7 @@ impl RateLimit {
 /// peer(`AuthorityDiscoveryId``).
 pub enum NetworkMessage {
 	/// A gossip message from peer to node.
-	MessageFromPeer(VersionedValidationProtocol),
+	MessageFromPeer(PeerId, VersionedValidationProtocol),
 	/// A gossip message from node to a peer.
 	MessageFromNode(AuthorityDiscoveryId, VersionedValidationProtocol),
 	/// A request originating from our node
@@ -155,9 +158,9 @@ impl NetworkMessage {
 	/// Returns the size of the encoded message or request
 	pub fn size(&self) -> usize {
 		match &self {
-			NetworkMessage::MessageFromPeer(Versioned::V2(message)) => message.encoded_size(),
-			NetworkMessage::MessageFromPeer(Versioned::V1(message)) => message.encoded_size(),
-			NetworkMessage::MessageFromPeer(Versioned::V3(message)) => message.encoded_size(),
+			NetworkMessage::MessageFromPeer(_, Versioned::V2(message)) => message.encoded_size(),
+			NetworkMessage::MessageFromPeer(_, Versioned::V1(message)) => message.encoded_size(),
+			NetworkMessage::MessageFromPeer(_, Versioned::V3(message)) => message.encoded_size(),
 			NetworkMessage::MessageFromNode(_peer_id, Versioned::V2(message)) =>
 				message.encoded_size(),
 			NetworkMessage::MessageFromNode(_peer_id, Versioned::V1(message)) =>
@@ -430,6 +433,7 @@ pub struct EmulatedPeerHandle {
 	messages_tx: UnboundedSender<NetworkMessage>,
 	/// Send actions to be performed by the peer.
 	actions_tx: UnboundedSender<NetworkMessage>,
+	peer_id: PeerId,
 }
 
 impl EmulatedPeerHandle {
@@ -441,7 +445,7 @@ impl EmulatedPeerHandle {
 	/// Send a message to the node.
 	pub fn send_message(&self, message: VersionedValidationProtocol) {
 		self.actions_tx
-			.unbounded_send(NetworkMessage::MessageFromPeer(message))
+			.unbounded_send(NetworkMessage::MessageFromPeer(self.peer_id, message))
 			.expect("Peer action channel hangup");
 	}
 
@@ -613,6 +617,7 @@ pub fn new_peer(
 	stats: Arc<PeerEmulatorStats>,
 	to_network_interface: UnboundedSender<NetworkMessage>,
 	latency_ms: usize,
+	peer_id: PeerId,
 ) -> EmulatedPeerHandle {
 	let (messages_tx, messages_rx) = mpsc::unbounded::<NetworkMessage>();
 	let (actions_tx, actions_rx) = mpsc::unbounded::<NetworkMessage>();
@@ -641,7 +646,7 @@ pub fn new_peer(
 		.boxed(),
 	);
 
-	EmulatedPeerHandle { messages_tx, actions_tx }
+	EmulatedPeerHandle { messages_tx, actions_tx, peer_id }
 }
 
 /// Book keeping of sent and received bytes.
@@ -719,6 +724,28 @@ pub struct NetworkEmulatorHandle {
 	validator_authority_ids: HashMap<AuthorityDiscoveryId, usize>,
 }
 
+impl NetworkEmulatorHandle {
+	/// Generates peer_connected messages for all peers in `test_authorities`
+	pub fn generate_peer_connected(&self) -> Vec<AllMessages> {
+		self.peers
+			.iter()
+			.filter(|peer| peer.is_connected())
+			.map(|peer| {
+				let network = NetworkBridgeEvent::PeerConnected(
+					peer.handle().peer_id,
+					ObservedRole::Full,
+					ProtocolVersion::from(ValidationVersion::V3),
+					None,
+				);
+
+				AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(
+					network,
+				))
+			})
+			.collect_vec()
+	}
+}
+
 /// Create a new emulated network based on `config`.
 /// Each emulated peer will run the specified `handlers` to process incoming messages.
 pub fn new_network(
@@ -753,6 +780,7 @@ pub fn new_network(
 					stats,
 					to_network_interface.clone(),
 					random_latency(config.latency.as_ref()),
+					*authorities.peer_ids.get(peer_index).unwrap(),
 				)),
 			)
 		})
@@ -760,10 +788,14 @@ pub fn new_network(
 
 	let connected_count = config.connected_count();
 
-	let (_connected, to_disconnect) = peers.partial_shuffle(&mut thread_rng(), connected_count);
+	let mut peers_indicies = (0..n_peers).collect_vec();
+	let (_connected, to_disconnect) =
+		peers_indicies.partial_shuffle(&mut thread_rng(), connected_count);
 
-	for peer in to_disconnect {
-		peer.disconnect();
+	// Node under test is always mark as disconnected.
+	peers[NODE_UNDER_TEST as usize].disconnect();
+	for peer in to_disconnect.iter().skip(1) {
+		peers[*peer].disconnect();
 	}
 
 	gum::info!(target: LOG_TARGET, "{}",format!("Network created, connected validator count {}", connected_count).bright_black());
@@ -786,6 +818,7 @@ pub fn new_network(
 }
 
 /// Errors that can happen when sending data to emulated peers.
+#[derive(Clone, Debug)]
 pub enum EmulatedPeerError {
 	NotConnected,
 }
diff --git a/polkadot/node/subsystem-bench/src/subsystem-bench.rs b/polkadot/node/subsystem-bench/src/subsystem-bench.rs
index 8633ebb703aa6eda949965d6640a0ad2cf261cdf..6f45214bc7359287bae873ce95d8306e99e799a8 100644
--- a/polkadot/node/subsystem-bench/src/subsystem-bench.rs
+++ b/polkadot/node/subsystem-bench/src/subsystem-bench.rs
@@ -26,6 +26,7 @@ use pyroscope_pprofrs::{pprof_backend, PprofConfig};
 
 use std::path::Path;
 
+pub(crate) mod approval;
 pub(crate) mod availability;
 pub(crate) mod cli;
 pub(crate) mod core;
@@ -43,7 +44,7 @@ use core::{
 
 use clap_num::number_range;
 
-use crate::core::display::display_configuration;
+use crate::{approval::bench_approvals, core::display::display_configuration};
 
 fn le_100(s: &str) -> Result<usize, String> {
 	number_range(s, 0, 100)
@@ -174,6 +175,12 @@ impl BenchCli {
 								&mut env, state,
 							));
 						},
+						TestObjective::ApprovalVoting(ref options) => {
+							let (mut env, state) =
+								approval::prepare_test(test_config.clone(), options.clone());
+
+							env.runtime().block_on(bench_approvals(&mut env, state));
+						},
 						TestObjective::DataAvailabilityWrite => {
 							let mut state = TestState::new(&test_config);
 							let (mut env, _protocol_config) = prepare_test(test_config, &mut state);
@@ -181,13 +188,16 @@ impl BenchCli {
 								&mut env, state,
 							));
 						},
-						_ => gum::error!("Invalid test objective in sequence"),
+						TestObjective::TestSequence(_) => todo!(),
+						TestObjective::Unimplemented => todo!(),
 					}
 				}
 				return Ok(())
 			},
 			TestObjective::DataAvailabilityRead(ref _options) => self.create_test_configuration(),
 			TestObjective::DataAvailabilityWrite => self.create_test_configuration(),
+			TestObjective::ApprovalVoting(_) => todo!(),
+			TestObjective::Unimplemented => todo!(),
 		};
 
 		let mut latency_config = test_config.latency.clone().unwrap_or_default();
@@ -232,6 +242,8 @@ impl BenchCli {
 					.block_on(availability::benchmark_availability_write(&mut env, state));
 			},
 			TestObjective::TestSequence(_options) => {},
+			TestObjective::ApprovalVoting(_) => todo!(),
+			TestObjective::Unimplemented => todo!(),
 		}
 
 		if let Some(agent_running) = agent_running {