diff --git a/polkadot/node/network/statement-distribution/src/v2/cluster.rs b/polkadot/node/network/statement-distribution/src/v2/cluster.rs
index c3f45314b2464e1125a797686c51827e022b0403..87b25c785d83cfc30feb6af8f2ec8a9299952200 100644
--- a/polkadot/node/network/statement-distribution/src/v2/cluster.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/cluster.rs
@@ -60,13 +60,6 @@ use polkadot_primitives::{CandidateHash, CompactStatement, Hash, ValidatorIndex}
 use crate::LOG_TARGET;
 use std::collections::{HashMap, HashSet};
 
-#[derive(Hash, PartialEq, Eq)]
-struct ValidStatementManifest {
-	remote: ValidatorIndex,
-	originator: ValidatorIndex,
-	candidate_hash: CandidateHash,
-}
-
 // A piece of knowledge about a candidate
 #[derive(Hash, Clone, PartialEq, Eq)]
 enum Knowledge {
diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs
index 961ec45bdada037885fb5ea43858e4e1522fac2c..73416b193bbec518fa24e608bdfc17c9de436b50 100644
--- a/polkadot/node/network/statement-distribution/src/v2/mod.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs
@@ -68,7 +68,7 @@ use futures::{
 use std::{
 	collections::{
 		hash_map::{Entry, HashMap},
-		BTreeSet, HashSet,
+		HashSet,
 	},
 	time::{Duration, Instant},
 };
@@ -156,6 +156,7 @@ struct PerRelayParentState {
 	seconding_limit: usize,
 	session: SessionIndex,
 	groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
+	disabled_validators: HashSet<ValidatorIndex>,
 }
 
 impl PerRelayParentState {
@@ -166,6 +167,17 @@ impl PerRelayParentState {
 	fn active_validator_state_mut(&mut self) -> Option<&mut ActiveValidatorState> {
 		self.local_validator.as_mut().and_then(|local| local.active.as_mut())
 	}
+
+	/// Returns `true` if the given validator is disabled in the context of the relay parent.
+	pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
+		self.disabled_validators.contains(validator_index)
+	}
+
+	/// A convenience function to generate a disabled bitmask for the given backing group.
+	/// The output bits are set to `true` for validators that are disabled.
+	pub fn disabled_bitmask(&self, group: &[ValidatorIndex]) -> BitVec<u8, Lsb0> {
+		BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)))
+	}
 }
 
 // per-relay-parent local validator state.
@@ -206,8 +218,6 @@ struct PerSessionState {
 	// getting the topology from the gossip-support subsystem
 	grid_view: Option<grid::SessionTopologyView>,
 	local_validator: Option<LocalValidatorIndex>,
-	// We store the latest state here based on union of leaves.
-	disabled_validators: BTreeSet<ValidatorIndex>,
 }
 
 impl PerSessionState {
@@ -224,16 +234,7 @@ impl PerSessionState {
 		)
 		.map(|(_, index)| LocalValidatorIndex::Active(index));
 
-		let disabled_validators = BTreeSet::new();
-
-		PerSessionState {
-			session_info,
-			groups,
-			authority_lookup,
-			grid_view: None,
-			local_validator,
-			disabled_validators,
-		}
+		PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator }
 	}
 
 	fn supply_topology(
@@ -269,33 +270,6 @@ impl PerSessionState {
 	fn is_not_validator(&self) -> bool {
 		self.grid_view.is_some() && self.local_validator.is_none()
 	}
-
-	/// A convenience function to generate a disabled bitmask for the given backing group.
-	/// The output bits are set to `true` for validators that are disabled.
-	/// Returns `None` if the group index is out of bounds.
-	pub fn disabled_bitmask(&self, group: GroupIndex) -> Option<BitVec<u8, Lsb0>> {
-		let group = self.groups.get(group)?;
-		let mask = BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)));
-		Some(mask)
-	}
-
-	/// Returns `true` if the given validator is disabled in the current session.
-	pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
-		self.disabled_validators.contains(validator_index)
-	}
-
-	/// Extend the list of disabled validators.
-	pub fn extend_disabled_validators(
-		&mut self,
-		disabled: impl IntoIterator<Item = ValidatorIndex>,
-	) {
-		self.disabled_validators.extend(disabled);
-	}
-
-	/// Clear the list of disabled validators.
-	pub fn clear_disabled_validators(&mut self) {
-		self.disabled_validators.clear();
-	}
 }
 
 pub(crate) struct State {
@@ -582,19 +556,16 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 	let new_relay_parents =
 		state.implicit_view.all_allowed_relay_parents().cloned().collect::<Vec<_>>();
 
-	// We clear the list of disabled validators to reset it properly based on union of leaves.
-	let mut cleared_disabled_validators: BTreeSet<SessionIndex> = BTreeSet::new();
-
 	for new_relay_parent in new_relay_parents.iter().cloned() {
-		// Even if we processed this relay parent before, we need to fetch the list of disabled
-		// validators based on union of active leaves.
-		let disabled_validators =
+		let disabled_validators: HashSet<_> =
 			polkadot_node_subsystem_util::vstaging::get_disabled_validators_with_fallback(
 				ctx.sender(),
 				new_relay_parent,
 			)
 			.await
-			.map_err(JfyiError::FetchDisabledValidators)?;
+			.map_err(JfyiError::FetchDisabledValidators)?
+			.into_iter()
+			.collect();
 
 		let session_index = polkadot_node_subsystem_util::request_session_index_for_child(
 			new_relay_parent,
@@ -644,10 +615,6 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 			.get_mut(&session_index)
 			.expect("either existed or just inserted; qed");
 
-		if cleared_disabled_validators.insert(session_index) {
-			per_session.clear_disabled_validators();
-		}
-
 		if !disabled_validators.is_empty() {
 			gum::debug!(
 				target: LOG_TARGET,
@@ -656,8 +623,6 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 				?disabled_validators,
 				"Disabled validators detected"
 			);
-
-			per_session.extend_disabled_validators(disabled_validators);
 		}
 
 		if state.per_relay_parent.contains_key(&new_relay_parent) {
@@ -723,6 +688,7 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 				seconding_limit,
 				session: session_index,
 				groups_per_para,
+				disabled_validators,
 			},
 		);
 	}
@@ -1581,6 +1547,17 @@ async fn handle_incoming_statement<Context>(
 	};
 	let session_info = &per_session.session_info;
 
+	if per_relay_parent.is_disabled(&statement.unchecked_validator_index()) {
+		gum::debug!(
+			target: LOG_TARGET,
+			?relay_parent,
+			validator_index = ?statement.unchecked_validator_index(),
+			"Ignoring a statement from disabled validator."
+		);
+		modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
+		return
+	}
+
 	let local_validator = match per_relay_parent.local_validator.as_mut() {
 		None => {
 			// we shouldn't be receiving statements unless we're a validator
@@ -1614,17 +1591,6 @@ async fn handle_incoming_statement<Context>(
 			},
 		};
 
-	if per_session.is_disabled(&statement.unchecked_validator_index()) {
-		gum::debug!(
-			target: LOG_TARGET,
-			?relay_parent,
-			validator_index = ?statement.unchecked_validator_index(),
-			"Ignoring a statement from disabled validator."
-		);
-		modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
-		return
-	}
-
 	let (active, cluster_sender_index) = {
 		// This block of code only returns `Some` when both the originator and
 		// the sending peer are in the cluster.
@@ -2379,21 +2345,18 @@ async fn handle_incoming_manifest_common<'a, Context>(
 		Some(s) => s,
 	};
 
-	let local_validator = match relay_parent_state.local_validator.as_mut() {
-		None => {
-			if per_session.is_not_validator() {
-				modify_reputation(
-					reputation,
-					ctx.sender(),
-					peer,
-					COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
-				)
-				.await;
-			}
-			return None
-		},
-		Some(x) => x,
-	};
+	if relay_parent_state.local_validator.is_none() {
+		if per_session.is_not_validator() {
+			modify_reputation(
+				reputation,
+				ctx.sender(),
+				peer,
+				COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
+			)
+			.await;
+		}
+		return None
+	}
 
 	let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para_id) else {
 		modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
@@ -2436,10 +2399,13 @@ async fn handle_incoming_manifest_common<'a, Context>(
 	let claimed_parent_hash = manifest_summary.claimed_parent_hash;
 
 	// Ignore votes from disabled validators when counting towards the threshold.
-	let disabled_mask = per_session.disabled_bitmask(group_index).unwrap_or_default();
+	let group = per_session.groups.get(group_index).unwrap_or(&[]);
+	let disabled_mask = relay_parent_state.disabled_bitmask(group);
 	manifest_summary.statement_knowledge.mask_seconded(&disabled_mask);
 	manifest_summary.statement_knowledge.mask_valid(&disabled_mask);
 
+	let local_validator = relay_parent_state.local_validator.as_mut().expect("checked above; qed");
+
 	let acknowledge = match local_validator.grid_tracker.import_manifest(
 		grid_topology,
 		&per_session.groups,
@@ -3018,9 +2984,7 @@ pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut St
 		}
 
 		// Add disabled validators to the unwanted mask.
-		let disabled_mask = per_session
-			.disabled_bitmask(group_index)
-			.expect("group existence checked above; qed");
+		let disabled_mask = relay_parent_state.disabled_bitmask(group);
 		unwanted_mask.seconded_in_group |= &disabled_mask;
 		unwanted_mask.validated_in_group |= &disabled_mask;
 
@@ -3111,9 +3075,7 @@ pub(crate) async fn handle_response<Context>(
 			Some(g) => g,
 		};
 
-		let disabled_mask = per_session
-			.disabled_bitmask(group_index)
-			.expect("group_index checked above; qed");
+		let disabled_mask = relay_parent_state.disabled_bitmask(group);
 
 		let res = response.validate_response(
 			&mut state.request_manager,
@@ -3258,7 +3220,7 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
 		Some(s) => s,
 	};
 
-	let local_validator = match relay_parent_state.local_validator.as_mut() {
+	let local_validator = match relay_parent_state.local_validator.as_ref() {
 		None => return,
 		Some(s) => s,
 	};
@@ -3332,16 +3294,15 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
 
 	// Transform mask with 'OR' semantics into one with 'AND' semantics for the API used
 	// below.
-	let mut and_mask = StatementFilter {
+	let and_mask = StatementFilter {
 		seconded_in_group: !mask.seconded_in_group.clone(),
 		validated_in_group: !mask.validated_in_group.clone(),
 	};
 
-	// Ignore disabled validators from the latest state when sending the response.
-	let disabled_mask =
-		per_session.disabled_bitmask(group_index).expect("group existence checked; qed");
-	and_mask.mask_seconded(&disabled_mask);
-	and_mask.mask_valid(&disabled_mask);
+	let local_validator = match relay_parent_state.local_validator.as_mut() {
+		None => return,
+		Some(s) => s,
+	};
 
 	let mut sent_filter = StatementFilter::blank(group_size);
 	let statements: Vec<_> = relay_parent_state
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
index f9a484f47a94c37fb909cdb640b5886ec12e2b2b..078d556391a397d9e4c6a9f22754272d0f85922a 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
@@ -422,7 +422,7 @@ struct TestLeaf {
 	parent_hash: Hash,
 	session: SessionIndex,
 	availability_cores: Vec<CoreState>,
-	disabled_validators: Vec<ValidatorIndex>,
+	pub disabled_validators: Vec<ValidatorIndex>,
 	para_data: Vec<(ParaId, PerParaData)>,
 	minimum_backing_votes: u32,
 }
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
index 38d7a10b86527c153f4a369beb8bbe86da05d582..4fdfda0dba247d9c058adeef66fde0d1d90298cb 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
@@ -22,9 +22,8 @@ use polkadot_node_network_protocol::{
 	request_response::v2 as request_v2, v2::BackedCandidateManifest,
 };
 use polkadot_primitives_test_helpers::make_candidate;
-use sc_network::{
-	config::{IncomingRequest as RawIncomingRequest, OutgoingResponse as RawOutgoingResponse},
-	ProtocolName,
+use sc_network::config::{
+	IncomingRequest as RawIncomingRequest, OutgoingResponse as RawOutgoingResponse,
 };
 
 #[test]
@@ -1222,392 +1221,8 @@ fn disabled_validators_added_to_unwanted_mask() {
 	});
 }
 
-// We send a request to a peer and after receiving the response
-// we learn about a validator being disabled. We should filter out
-// the statement from the disabled validator when receiving it.
 #[test]
-fn when_validator_disabled_after_sending_the_request() {
-	let group_size = 3;
-	let config = TestConfig {
-		validator_count: 20,
-		group_size,
-		local_validator: LocalRole::Validator,
-		async_backing_params: None,
-	};
-
-	let relay_parent = Hash::repeat_byte(1);
-	let another_relay_parent = Hash::repeat_byte(2);
-	let peer_disabled_later = PeerId::random();
-	let peer_b = PeerId::random();
-
-	test_harness(config, |state, mut overseer| async move {
-		let local_validator = state.local.clone().unwrap();
-		let local_group_index = local_validator.group_index.unwrap();
-		let local_para = ParaId::from(local_group_index.0);
-		let other_group_validators = state.group_validators(local_group_index, true);
-		let index_disabled = other_group_validators[0];
-		let index_b = other_group_validators[1];
-
-		let test_leaf = state.make_dummy_leaf_with_disabled_validators(relay_parent, vec![]);
-		let test_leaf_disabled = state
-			.make_dummy_leaf_with_disabled_validators(another_relay_parent, vec![index_disabled]);
-
-		let (candidate, pvd) = make_candidate(
-			relay_parent,
-			1,
-			local_para,
-			test_leaf.para_data(local_para).head_data.clone(),
-			vec![4, 5, 6].into(),
-			Hash::repeat_byte(42).into(),
-		);
-		let candidate_hash = candidate.hash();
-
-		// peer A is in group, has relay parent in view and disabled later.
-		// peer B is in group, has relay parent in view.
-		{
-			connect_peer(
-				&mut overseer,
-				peer_disabled_later.clone(),
-				Some(vec![state.discovery_id(index_disabled)].into_iter().collect()),
-			)
-			.await;
-			connect_peer(
-				&mut overseer,
-				peer_b.clone(),
-				Some(vec![state.discovery_id(index_b)].into_iter().collect()),
-			)
-			.await;
-			send_peer_view_change(&mut overseer, peer_disabled_later.clone(), view![relay_parent])
-				.await;
-			send_peer_view_change(&mut overseer, peer_b.clone(), view![relay_parent]).await;
-		}
-
-		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
-
-		let seconded_disabled = state
-			.sign_statement(
-				index_disabled,
-				CompactStatement::Seconded(candidate_hash),
-				&SigningContext { parent_hash: relay_parent, session_index: 1 },
-			)
-			.as_unchecked()
-			.clone();
-
-		let seconded_b = state
-			.sign_statement(
-				index_b,
-				CompactStatement::Seconded(candidate_hash),
-				&SigningContext { parent_hash: relay_parent, session_index: 1 },
-			)
-			.as_unchecked()
-			.clone();
-		{
-			send_peer_message(
-				&mut overseer,
-				peer_b.clone(),
-				protocol_v2::StatementDistributionMessage::Statement(
-					relay_parent,
-					seconded_b.clone(),
-				),
-			)
-			.await;
-
-			assert_matches!(
-				overseer.recv().await,
-				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
-					if p == peer_b && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { }
-			);
-		}
-
-		// Send a request to peer and activate leaf when a validator is disabled;
-		// mock the response with a statement from disabled validator.
-		{
-			let statements = vec![seconded_disabled];
-			let mask = StatementFilter::blank(group_size);
-
-			assert_matches!(
-				overseer.recv().await,
-				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(mut requests, IfDisconnected::ImmediateError)) => {
-					assert_eq!(requests.len(), 1);
-					assert_matches!(
-						requests.pop().unwrap(),
-						Requests::AttestedCandidateV2(outgoing) => {
-							assert_eq!(outgoing.peer, Recipient::Peer(peer_b));
-							assert_eq!(outgoing.payload.candidate_hash, candidate_hash);
-							assert_eq!(outgoing.payload.mask, mask);
-
-							activate_leaf(&mut overseer, &test_leaf_disabled, &state, false, vec![]).await;
-
-							let res = AttestedCandidateResponse {
-								candidate_receipt: candidate,
-								persisted_validation_data: pvd,
-								statements,
-							};
-							outgoing.pending_response.send(Ok((res.encode(), ProtocolName::from("")))).unwrap();
-						}
-					);
-				}
-			);
-
-			assert_matches!(
-				overseer.recv().await,
-				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
-					if p == peer_b && r == BENEFIT_VALID_RESPONSE.into() => { }
-			);
-
-			assert_matches!(
-				overseer.recv().await,
-				AllMessages:: NetworkBridgeTx(
-					NetworkBridgeTxMessage::SendValidationMessage(
-						peers,
-						Versioned::V2(
-							protocol_v2::ValidationProtocol::StatementDistribution(
-								protocol_v2::StatementDistributionMessage::Statement(hash, statement),
-							),
-						),
-					)
-				) => {
-					assert_eq!(peers, vec![peer_disabled_later]);
-					assert_eq!(hash, relay_parent);
-					assert_eq!(statement, seconded_b);
-				}
-			);
-			answer_expected_hypothetical_membership_request(&mut overseer, vec![]).await;
-		}
-
-		overseer
-	});
-}
-
-#[test]
-fn no_response_for_grid_request_not_meeting_quorum() {
-	let validator_count = 6;
-	let group_size = 3;
-	let config = TestConfig {
-		validator_count,
-		group_size,
-		local_validator: LocalRole::Validator,
-		async_backing_params: None,
-	};
-
-	let relay_parent = Hash::repeat_byte(1);
-	let peer_a = PeerId::random();
-	let peer_b = PeerId::random();
-	let peer_c = PeerId::random();
-
-	test_harness(config, |mut state, mut overseer| async move {
-		let local_validator = state.local.clone().unwrap();
-		let local_group_index = local_validator.group_index.unwrap();
-		let local_para = ParaId::from(local_group_index.0);
-
-		let test_leaf = state.make_dummy_leaf_with_min_backing_votes(relay_parent, 2);
-
-		let (candidate, pvd) = make_candidate(
-			relay_parent,
-			1,
-			local_para,
-			test_leaf.para_data(local_para).head_data.clone(),
-			vec![4, 5, 6].into(),
-			Hash::repeat_byte(42).into(),
-		);
-		let candidate_hash = candidate.hash();
-
-		let other_group_validators = state.group_validators(local_group_index, true);
-		let target_group_validators =
-			state.group_validators((local_group_index.0 + 1).into(), true);
-		let v_a = other_group_validators[0];
-		let v_b = other_group_validators[1];
-		let v_c = target_group_validators[0];
-
-		// peer A is in group, has relay parent in view.
-		// peer B is in group, has no relay parent in view.
-		// peer C is not in group, has relay parent in view.
-		{
-			connect_peer(
-				&mut overseer,
-				peer_a.clone(),
-				Some(vec![state.discovery_id(v_a)].into_iter().collect()),
-			)
-			.await;
-
-			connect_peer(
-				&mut overseer,
-				peer_b.clone(),
-				Some(vec![state.discovery_id(v_b)].into_iter().collect()),
-			)
-			.await;
-
-			connect_peer(
-				&mut overseer,
-				peer_c.clone(),
-				Some(vec![state.discovery_id(v_c)].into_iter().collect()),
-			)
-			.await;
-
-			send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
-			send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await;
-		}
-
-		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
-
-		// Send gossip topology.
-		send_new_topology(&mut overseer, state.make_dummy_topology()).await;
-
-		// Confirm the candidate locally so that we don't send out requests.
-		{
-			let statement = state
-				.sign_full_statement(
-					local_validator.validator_index,
-					Statement::Seconded(candidate.clone()),
-					&SigningContext { parent_hash: relay_parent, session_index: 1 },
-					pvd.clone(),
-				)
-				.clone();
-
-			overseer
-				.send(FromOrchestra::Communication {
-					msg: StatementDistributionMessage::Share(relay_parent, statement),
-				})
-				.await;
-
-			assert_matches!(
-				overseer.recv().await,
-				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
-			);
-
-			answer_expected_hypothetical_membership_request(&mut overseer, vec![]).await;
-		}
-
-		// Send enough statements to make candidate backable, make sure announcements are sent.
-
-		// Send statement from peer A.
-		{
-			let statement = state
-				.sign_statement(
-					v_a,
-					CompactStatement::Seconded(candidate_hash),
-					&SigningContext { parent_hash: relay_parent, session_index: 1 },
-				)
-				.as_unchecked()
-				.clone();
-
-			send_peer_message(
-				&mut overseer,
-				peer_a.clone(),
-				protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement),
-			)
-			.await;
-
-			assert_matches!(
-				overseer.recv().await,
-				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
-					if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { }
-			);
-		}
-
-		// Send statement from peer B.
-		let statement_b = state
-			.sign_statement(
-				v_b,
-				CompactStatement::Seconded(candidate_hash),
-				&SigningContext { parent_hash: relay_parent, session_index: 1 },
-			)
-			.as_unchecked()
-			.clone();
-		{
-			send_peer_message(
-				&mut overseer,
-				peer_b.clone(),
-				protocol_v2::StatementDistributionMessage::Statement(
-					relay_parent,
-					statement_b.clone(),
-				),
-			)
-			.await;
-
-			assert_matches!(
-				overseer.recv().await,
-				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
-					if p == peer_b && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { }
-			);
-
-			assert_matches!(
-				overseer.recv().await,
-				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(peers, _)) if peers == vec![peer_a]
-			);
-		}
-
-		// Send Backed notification.
-		{
-			overseer
-				.send(FromOrchestra::Communication {
-					msg: StatementDistributionMessage::Backed(candidate_hash),
-				})
-				.await;
-
-			assert_matches!(
-				overseer.recv().await,
-				AllMessages:: NetworkBridgeTx(
-					NetworkBridgeTxMessage::SendValidationMessage(
-						peers,
-						Versioned::V2(
-							protocol_v2::ValidationProtocol::StatementDistribution(
-								protocol_v2::StatementDistributionMessage::BackedCandidateManifest(manifest),
-							),
-						),
-					)
-				) => {
-					assert_eq!(peers, vec![peer_c]);
-					assert_eq!(manifest, BackedCandidateManifest {
-						relay_parent,
-						candidate_hash,
-						group_index: local_validator.group_index.unwrap(),
-						para_id: local_para,
-						parent_head_data_hash: pvd.parent_head.hash(),
-						statement_knowledge: StatementFilter {
-							seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1],
-							validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
-						},
-					});
-				}
-			);
-
-			answer_expected_hypothetical_membership_request(&mut overseer, vec![]).await;
-		}
-
-		let mask = StatementFilter {
-			seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
-			validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
-		};
-
-		let relay_2 = Hash::repeat_byte(2);
-		let disabled_validators = vec![v_a];
-		let leaf_2 = state.make_dummy_leaf_with_disabled_validators(relay_2, disabled_validators);
-		activate_leaf(&mut overseer, &leaf_2, &state, false, vec![]).await;
-
-		// Incoming request to local node. Local node should not send the response as v_a is
-		// disabled and hence the quorum is not reached.
-		{
-			let response = state
-				.send_request(
-					peer_c,
-					request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask },
-				)
-				.await
-				.await;
-
-			assert!(
-				response.is_none(),
-				"We should not send a response as the quorum is not reached yet"
-			);
-		}
-
-		overseer
-	});
-}
-
-#[test]
-fn disabling_works_from_the_latest_state_not_relay_parent() {
+fn disabling_works_from_relay_parent_not_the_latest_state() {
 	let group_size = 3;
 	let config = TestConfig {
 		validator_count: 20,
@@ -1642,7 +1257,7 @@ fn disabling_works_from_the_latest_state_not_relay_parent() {
 		);
 		let candidate_1_hash = candidate_1.hash();
 
-		let (candidate_2, _) = make_candidate(
+		let (candidate_2, pvd_2) = make_candidate(
 			relay_1,
 			1,
 			local_para,
@@ -1652,6 +1267,16 @@ fn disabling_works_from_the_latest_state_not_relay_parent() {
 		);
 		let candidate_2_hash = candidate_2.hash();
 
+		let (candidate_3, _) = make_candidate(
+			relay_2,
+			1,
+			local_para,
+			leaf_1.para_data(local_para).head_data.clone(),
+			vec![4, 5, 6, 7].into(),
+			Hash::repeat_byte(42).into(),
+		);
+		let candidate_3_hash = candidate_3.hash();
+
 		{
 			connect_peer(
 				&mut overseer,
@@ -1681,6 +1306,16 @@ fn disabling_works_from_the_latest_state_not_relay_parent() {
 			)
 			.as_unchecked()
 			.clone();
+
+		let seconded_3 = state
+			.sign_statement(
+				index_disabled,
+				CompactStatement::Seconded(candidate_3_hash),
+				&SigningContext { parent_hash: relay_2, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
+
 		{
 			send_peer_message(
 				&mut overseer,
@@ -1733,6 +1368,48 @@ fn disabling_works_from_the_latest_state_not_relay_parent() {
 			)
 			.await;
 
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_disabled && r == BENEFIT_VALID_STATEMENT_FIRST.into() => { }
+			);
+		}
+
+		{
+			handle_sent_request(
+				&mut overseer,
+				peer_disabled,
+				candidate_2_hash,
+				StatementFilter::blank(group_size),
+				candidate_2.clone(),
+				pvd_2.clone(),
+				vec![seconded_2.clone()],
+			)
+			.await;
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_disabled && r == BENEFIT_VALID_STATEMENT.into() => { }
+			);
+
+			assert_matches!(
+				overseer.recv().await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+					if p == peer_disabled && r == BENEFIT_VALID_RESPONSE.into() => { }
+			);
+
+			answer_expected_hypothetical_membership_request(&mut overseer, vec![]).await;
+		}
+
+		{
+			send_peer_message(
+				&mut overseer,
+				peer_disabled.clone(),
+				protocol_v2::StatementDistributionMessage::Statement(relay_2, seconded_3.clone()),
+			)
+			.await;
+
 			assert_matches!(
 				overseer.recv().await,
 				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md b/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md
index e5eb9bd7642c1108c45e73134a00ee22b2f6475c..ce2ff3ca9139dc67ffd8aafa613a5fbde9f40448 100644
--- a/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md
+++ b/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md
@@ -130,23 +130,9 @@ accept statements from it. Filtering out of statements from disabled validators
 on the node side is purely an optimization, as it will be done in the runtime
 as well.
 
-Because we use the state of the active leaves to
-check whether a validator is disabled instead of the relay parent, the notion
-of being disabled is inherently racy:
-- the responder has learned about the disabled validator before the requester
-- the receiver has witnessed the disabled validator after sending the request
-
-We could have sent a manifest to a peer, then received information about
-disabling, and then receive a request. This can break an invariant of the grid
-mode:
-- the response is required to indicate quorum
-
-Due to the above, there should be no response at all for grid requests when
-the backing threshold is no longer met as a result of disabled validators.
-In addition to that, we add disabled validators to the request's unwanted
-mask. This ensures that the sender will not send statements from disabled
-validators (at least from the perspective of the receiver at the moment of the
-request). This doesn't fully avoid race conditions, but tries to minimize them.
+We use the state of the relay parent to check whether a validator is disabled
+to avoid race conditions and ensure that disabling works well in the presense
+of re-enabling.
 
 ## Messages
 
@@ -211,9 +197,9 @@ We also have a request/response protocol because validators do not eagerly send
    - Requests are queued up with `RequestManager::get_or_insert`.
      - Done as needed, when handling incoming manifests/statements.
    - `RequestManager::dispatch_requests` sends any queued-up requests.
-      - Calls `RequestManager::next_request` to completion.
-        - Creates the `OutgoingRequest`, saves the receiver in `RequestManager::pending_responses`.
-      - Does nothing if we have more responses pending than the limit of parallel requests.
+     - Calls `RequestManager::next_request` to completion.
+       - Creates the `OutgoingRequest`, saves the receiver in `RequestManager::pending_responses`.
+     - Does nothing if we have more responses pending than the limit of parallel requests.
 
 2. Peer
 
diff --git a/prdoc/pr_4431.prdoc b/prdoc/pr_4431.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..993a7326b9aad4aa713f47dffeb9622493db1cf2
--- /dev/null
+++ b/prdoc/pr_4431.prdoc
@@ -0,0 +1,17 @@
+title: "Statement-Distribution validator disabling changes"
+
+doc:
+  - audience: Node Dev
+    description: |
+      In preparation for launching re-enabling (#2418), we need to adjust the
+      disabling strategy of statement-distribution to use the relay parent's
+      state instead of the latest state (union of active leaves). This will also
+      ensure no raciness of getting the latest state vs accepting statements from
+      disabling validators at the cost of being more lenient/potentially accepting
+      more statements from disabled validators.
+
+crates:
+  - name: polkadot-statement-distribution
+    bump: patch
+  - name: polkadot
+    bump: none