From a02b53475b9fe2567ae05ad4e03c81f4f3d6b463 Mon Sep 17 00:00:00 2001
From: ordian <write@reusable.software>
Date: Mon, 8 Jan 2024 20:58:30 +0100
Subject: [PATCH] backport to master: Handling of disabled validators in
 backing subsystem (#1259) (#2764)

#1259 was merged into a feature branch, but we've decided to merge
node-side changes for disabling straight into master.
This is a dependency of #1841 and #2637.

---------

Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
---
 polkadot/node/core/backing/src/lib.rs         |  87 ++++-
 polkadot/node/core/backing/src/tests/mod.rs   | 329 +++++++++++++++++-
 .../src/tests/prospective_parachains.rs       |  20 ++
 polkadot/node/core/provisioner/src/lib.rs     |  59 +---
 polkadot/node/subsystem-util/src/lib.rs       |  80 ++++-
 polkadot/node/subsystem-util/src/vstaging.rs  |  56 +++
 prdoc/pr_2764.prdoc                           |  16 +
 7 files changed, 574 insertions(+), 73 deletions(-)
 create mode 100644 polkadot/node/subsystem-util/src/vstaging.rs
 create mode 100644 prdoc/pr_2764.prdoc

diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs
index 434051f1b00..ba25ebb192e 100644
--- a/polkadot/node/core/backing/src/lib.rs
+++ b/polkadot/node/core/backing/src/lib.rs
@@ -118,6 +118,7 @@ use statement_table::{
 	},
 	Config as TableConfig, Context as TableContextTrait, Table,
 };
+use util::vstaging::get_disabled_validators_with_fallback;
 
 mod error;
 
@@ -383,6 +384,21 @@ struct TableContext {
 	validator: Option<Validator>,
 	groups: HashMap<ParaId, Vec<ValidatorIndex>>,
 	validators: Vec<ValidatorId>,
+	disabled_validators: Vec<ValidatorIndex>,
+}
+
+impl TableContext {
+	// Returns `true` if the provided `ValidatorIndex` is in the disabled validators list
+	pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
+		self.disabled_validators
+			.iter()
+			.any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
+	}
+
+	// Returns `true` if the local validator is in the disabled validators list
+	pub fn local_validator_is_disabled(&self) -> Option<bool> {
+		self.validator.as_ref().map(|v| v.disabled())
+	}
 }
 
 impl TableContextTrait for TableContext {
@@ -1010,21 +1026,33 @@ async fn construct_per_relay_parent_state<Context>(
 	let minimum_backing_votes =
 		try_runtime_api!(request_min_backing_votes(parent, session_index, ctx.sender()).await);
 
+	// TODO: https://github.com/paritytech/polkadot-sdk/issues/1940
+	// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to
+	// `get_disabled_validators_with_fallback`, add `request_disabled_validators` call to the
+	// `try_join!` above and use `try_runtime_api!` to get `disabled_validators`
+	let disabled_validators = get_disabled_validators_with_fallback(ctx.sender(), parent)
+		.await
+		.map_err(Error::UtilError)?;
+
 	let signing_context = SigningContext { parent_hash: parent, session_index };
-	let validator =
-		match Validator::construct(&validators, signing_context.clone(), keystore.clone()) {
-			Ok(v) => Some(v),
-			Err(util::Error::NotAValidator) => None,
-			Err(e) => {
-				gum::warn!(
-					target: LOG_TARGET,
-					err = ?e,
-					"Cannot participate in candidate backing",
-				);
+	let validator = match Validator::construct(
+		&validators,
+		&disabled_validators,
+		signing_context.clone(),
+		keystore.clone(),
+	) {
+		Ok(v) => Some(v),
+		Err(util::Error::NotAValidator) => None,
+		Err(e) => {
+			gum::warn!(
+				target: LOG_TARGET,
+				err = ?e,
+				"Cannot participate in candidate backing",
+			);
 
-				return Ok(None)
-			},
-		};
+			return Ok(None)
+		},
+	};
 
 	let mut groups = HashMap::new();
 	let n_cores = cores.len();
@@ -1054,7 +1082,7 @@ async fn construct_per_relay_parent_state<Context>(
 		}
 	}
 
-	let table_context = TableContext { groups, validators, validator };
+	let table_context = TableContext { validator, groups, validators, disabled_validators };
 	let table_config = TableConfig {
 		allow_multiple_seconded: match mode {
 			ProspectiveParachainsMode::Enabled { .. } => true,
@@ -1726,6 +1754,19 @@ async fn kick_off_validation_work<Context>(
 	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
 	attesting: AttestingData,
 ) -> Result<(), Error> {
+	// Do nothing if the local validator is disabled or not a validator at all
+	match rp_state.table_context.local_validator_is_disabled() {
+		Some(true) => {
+			gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
+			return Ok(())
+		},
+		Some(false) => {}, // we are not disabled - move on
+		None => {
+			gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
+			return Ok(())
+		},
+	}
+
 	let candidate_hash = attesting.candidate.hash();
 	if rp_state.issued_statements.contains(&candidate_hash) {
 		return Ok(())
@@ -1783,6 +1824,16 @@ async fn maybe_validate_and_import<Context>(
 		},
 	};
 
+	// Don't import statement if the sender is disabled
+	if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
+		gum::debug!(
+			target: LOG_TARGET,
+			sender_validator_idx = ?statement.validator_index(),
+			"Not importing statement because the sender is disabled"
+		);
+		return Ok(())
+	}
+
 	let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
 
 	// if we get an Error::RejectedByProspectiveParachains,
@@ -1944,6 +1995,13 @@ async fn handle_second_message<Context>(
 		Some(r) => r,
 	};
 
+	// Just return if the local validator is disabled. If we are here the local node should be a
+	// validator but defensively use `unwrap_or(false)` to continue processing in this case.
+	if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
+		gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
+		return Ok(())
+	}
+
 	// Sanity check that candidate is from our assignment.
 	if Some(candidate.descriptor().para_id) != rp_state.assignment {
 		gum::debug!(
@@ -1990,6 +2048,7 @@ async fn handle_statement_message<Context>(
 ) -> Result<(), Error> {
 	let _timer = metrics.time_process_statement();
 
+	// Validator disabling is handled in `maybe_validate_and_import`
 	match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
 		Err(Error::ValidationFailed(_)) => Ok(()),
 		Err(e) => Err(e),
diff --git a/polkadot/node/core/backing/src/tests/mod.rs b/polkadot/node/core/backing/src/tests/mod.rs
index c12be72556e..1957f4e19c5 100644
--- a/polkadot/node/core/backing/src/tests/mod.rs
+++ b/polkadot/node/core/backing/src/tests/mod.rs
@@ -41,7 +41,7 @@ use sp_keyring::Sr25519Keyring;
 use sp_keystore::Keystore;
 use sp_tracing as _;
 use statement_table::v2::Misbehavior;
-use std::collections::HashMap;
+use std::{collections::HashMap, time::Duration};
 
 mod prospective_parachains;
 
@@ -77,6 +77,7 @@ struct TestState {
 	signing_context: SigningContext,
 	relay_parent: Hash,
 	minimum_backing_votes: u32,
+	disabled_validators: Vec<ValidatorIndex>,
 }
 
 impl TestState {
@@ -148,6 +149,7 @@ impl Default for TestState {
 			signing_context,
 			relay_parent,
 			minimum_backing_votes: LEGACY_MIN_BACKING_VOTES,
+			disabled_validators: Vec::new(),
 		}
 	}
 }
@@ -293,6 +295,26 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
 			tx.send(Ok(test_state.minimum_backing_votes)).unwrap();
 		}
 	);
+
+	// Check that subsystem job issues a request for the runtime version.
+	assert_matches!(
+		virtual_overseer.recv().await,
+		AllMessages::RuntimeApi(
+			RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx))
+		) if parent == test_state.relay_parent => {
+			tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap();
+		}
+	);
+
+	// Check that subsystem job issues a request for the disabled validators.
+	assert_matches!(
+		virtual_overseer.recv().await,
+		AllMessages::RuntimeApi(
+			RuntimeApiMessage::Request(parent, RuntimeApiRequest::DisabledValidators(tx))
+		) if parent == test_state.relay_parent => {
+			tx.send(Ok(test_state.disabled_validators.clone())).unwrap();
+		}
+	);
 }
 
 async fn assert_validation_requests(
@@ -1420,6 +1442,7 @@ fn candidate_backing_reorders_votes() {
 
 	let table_context = TableContext {
 		validator: None,
+		disabled_validators: Vec::new(),
 		groups: validator_groups,
 		validators: validator_public.clone(),
 	};
@@ -1957,3 +1980,307 @@ fn new_leaf_view_doesnt_clobber_old() {
 		virtual_overseer
 	});
 }
+
+// Test that a disabled local validator doesn't do any work on `CandidateBackingMessage::Second`
+#[test]
+fn disabled_validator_doesnt_distribute_statement_on_receiving_second() {
+	let mut test_state = TestState::default();
+	test_state.disabled_validators.push(ValidatorIndex(0));
+
+	test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
+		test_startup(&mut virtual_overseer, &test_state).await;
+
+		let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
+		let pvd = dummy_pvd();
+		let validation_code = ValidationCode(vec![1, 2, 3]);
+
+		let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
+
+		let pov_hash = pov.hash();
+		let candidate = TestCandidateBuilder {
+			para_id: test_state.chain_ids[0],
+			relay_parent: test_state.relay_parent,
+			pov_hash,
+			head_data: expected_head_data.clone(),
+			erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()),
+			persisted_validation_data_hash: pvd.hash(),
+			validation_code: validation_code.0.clone(),
+		}
+		.build();
+
+		let second = CandidateBackingMessage::Second(
+			test_state.relay_parent,
+			candidate.to_plain(),
+			pvd.clone(),
+			pov.clone(),
+		);
+
+		virtual_overseer.send(FromOrchestra::Communication { msg: second }).await;
+
+		// Ensure backing subsystem is not doing any work
+		assert_matches!(virtual_overseer.recv().timeout(Duration::from_secs(1)).await, None);
+
+		virtual_overseer
+			.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
+				ActiveLeavesUpdate::stop_work(test_state.relay_parent),
+			)))
+			.await;
+		virtual_overseer
+	});
+}
+
+// Test that a disabled local validator doesn't do any work on `CandidateBackingMessage::Statement`
+#[test]
+fn disabled_validator_doesnt_distribute_statement_on_receiving_statement() {
+	let mut test_state = TestState::default();
+	test_state.disabled_validators.push(ValidatorIndex(0));
+
+	test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
+		test_startup(&mut virtual_overseer, &test_state).await;
+
+		let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
+		let pvd = dummy_pvd();
+		let validation_code = ValidationCode(vec![1, 2, 3]);
+
+		let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
+
+		let pov_hash = pov.hash();
+		let candidate = TestCandidateBuilder {
+			para_id: test_state.chain_ids[0],
+			relay_parent: test_state.relay_parent,
+			pov_hash,
+			head_data: expected_head_data.clone(),
+			erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()),
+			persisted_validation_data_hash: pvd.hash(),
+			validation_code: validation_code.0.clone(),
+		}
+		.build();
+
+		let public2 = Keystore::sr25519_generate_new(
+			&*test_state.keystore,
+			ValidatorId::ID,
+			Some(&test_state.validators[2].to_seed()),
+		)
+		.expect("Insert key into keystore");
+
+		let signed = SignedFullStatementWithPVD::sign(
+			&test_state.keystore,
+			StatementWithPVD::Seconded(candidate.clone(), pvd.clone()),
+			&test_state.signing_context,
+			ValidatorIndex(2),
+			&public2.into(),
+		)
+		.ok()
+		.flatten()
+		.expect("should be signed");
+
+		let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed.clone());
+
+		virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
+
+		// Ensure backing subsystem is not doing any work
+		assert_matches!(virtual_overseer.recv().timeout(Duration::from_secs(1)).await, None);
+
+		virtual_overseer
+			.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
+				ActiveLeavesUpdate::stop_work(test_state.relay_parent),
+			)))
+			.await;
+		virtual_overseer
+	});
+}
+
+// Test that a validator doesn't do any work on receiving a `CandidateBackingMessage::Statement`
+// from a disabled validator
+#[test]
+fn validator_ignores_statements_from_disabled_validators() {
+	let mut test_state = TestState::default();
+	test_state.disabled_validators.push(ValidatorIndex(2));
+
+	test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
+		test_startup(&mut virtual_overseer, &test_state).await;
+
+		let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
+		let pvd = dummy_pvd();
+		let validation_code = ValidationCode(vec![1, 2, 3]);
+
+		let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
+
+		let pov_hash = pov.hash();
+		let candidate = TestCandidateBuilder {
+			para_id: test_state.chain_ids[0],
+			relay_parent: test_state.relay_parent,
+			pov_hash,
+			head_data: expected_head_data.clone(),
+			erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()),
+			persisted_validation_data_hash: pvd.hash(),
+			validation_code: validation_code.0.clone(),
+		}
+		.build();
+		let candidate_commitments_hash = candidate.commitments.hash();
+
+		let public2 = Keystore::sr25519_generate_new(
+			&*test_state.keystore,
+			ValidatorId::ID,
+			Some(&test_state.validators[2].to_seed()),
+		)
+		.expect("Insert key into keystore");
+
+		let signed_2 = SignedFullStatementWithPVD::sign(
+			&test_state.keystore,
+			StatementWithPVD::Seconded(candidate.clone(), pvd.clone()),
+			&test_state.signing_context,
+			ValidatorIndex(2),
+			&public2.into(),
+		)
+		.ok()
+		.flatten()
+		.expect("should be signed");
+
+		let statement_2 =
+			CandidateBackingMessage::Statement(test_state.relay_parent, signed_2.clone());
+
+		virtual_overseer.send(FromOrchestra::Communication { msg: statement_2 }).await;
+
+		// Ensure backing subsystem is not doing any work
+		assert_matches!(virtual_overseer.recv().timeout(Duration::from_secs(1)).await, None);
+
+		// Now send a statement from a honest validator and make sure it gets processed
+		let public3 = Keystore::sr25519_generate_new(
+			&*test_state.keystore,
+			ValidatorId::ID,
+			Some(&test_state.validators[3].to_seed()),
+		)
+		.expect("Insert key into keystore");
+
+		let signed_3 = SignedFullStatementWithPVD::sign(
+			&test_state.keystore,
+			StatementWithPVD::Seconded(candidate.clone(), pvd.clone()),
+			&test_state.signing_context,
+			ValidatorIndex(3),
+			&public3.into(),
+		)
+		.ok()
+		.flatten()
+		.expect("should be signed");
+
+		let statement_3 =
+			CandidateBackingMessage::Statement(test_state.relay_parent, signed_3.clone());
+
+		virtual_overseer.send(FromOrchestra::Communication { msg: statement_3 }).await;
+
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::RuntimeApi(
+				RuntimeApiMessage::Request(_, RuntimeApiRequest::ValidationCodeByHash(hash, tx))
+			) if hash == validation_code.hash() => {
+				tx.send(Ok(Some(validation_code.clone()))).unwrap();
+			}
+		);
+
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::RuntimeApi(
+				RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx))
+			) => {
+				tx.send(Ok(1u32.into())).unwrap();
+			}
+		);
+
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::RuntimeApi(
+				RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx))
+			) if sess_idx == 1 => {
+				tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
+			}
+		);
+
+		// Sending a `Statement::Seconded` for our assignment will start
+		// validation process. The first thing requested is the PoV.
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::AvailabilityDistribution(
+				AvailabilityDistributionMessage::FetchPoV {
+					relay_parent,
+					tx,
+					..
+				}
+			) if relay_parent == test_state.relay_parent => {
+				tx.send(pov.clone()).unwrap();
+			}
+		);
+
+		// The next step is the actual request to Validation subsystem
+		// to validate the `Seconded` candidate.
+		let expected_pov = pov;
+		let expected_validation_code = validation_code;
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::CandidateValidation(
+				CandidateValidationMessage::ValidateFromExhaustive {
+					validation_data,
+					validation_code,
+					candidate_receipt,
+					pov,
+					executor_params: _,
+					exec_kind,
+					response_sender,
+				}
+			) if validation_data == pvd &&
+				validation_code == expected_validation_code &&
+				*pov == expected_pov && &candidate_receipt.descriptor == candidate.descriptor() &&
+				exec_kind == PvfExecKind::Backing &&
+				candidate_commitments_hash == candidate_receipt.commitments_hash =>
+			{
+				response_sender.send(Ok(
+					ValidationResult::Valid(CandidateCommitments {
+						head_data: expected_head_data.clone(),
+						upward_messages: Default::default(),
+						horizontal_messages: Default::default(),
+						new_validation_code: None,
+						processed_downward_messages: 0,
+						hrmp_watermark: 0,
+					}, test_state.validation_data.clone()),
+				)).unwrap();
+			}
+		);
+
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::AvailabilityStore(
+				AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
+			) if candidate_hash == candidate.hash() => {
+				tx.send(Ok(())).unwrap();
+			}
+		);
+
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::StatementDistribution(
+				StatementDistributionMessage::Share(hash, _stmt)
+			) => {
+				assert_eq!(test_state.relay_parent, hash);
+			}
+		);
+
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::Provisioner(
+				ProvisionerMessage::ProvisionableData(
+					_,
+					ProvisionableData::BackedCandidate(candidate_receipt)
+				)
+			) => {
+				assert_eq!(candidate_receipt, candidate.to_plain());
+			}
+		);
+
+		virtual_overseer
+			.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
+				ActiveLeavesUpdate::stop_work(test_state.relay_parent),
+			)))
+			.await;
+		virtual_overseer
+	});
+}
diff --git a/polkadot/node/core/backing/src/tests/prospective_parachains.rs b/polkadot/node/core/backing/src/tests/prospective_parachains.rs
index e7c29e11bb4..578f21bef66 100644
--- a/polkadot/node/core/backing/src/tests/prospective_parachains.rs
+++ b/polkadot/node/core/backing/src/tests/prospective_parachains.rs
@@ -195,6 +195,26 @@ async fn activate_leaf(
 				tx.send(Ok(test_state.minimum_backing_votes)).unwrap();
 			}
 		);
+
+		// Check that subsystem job issues a request for the runtime version.
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::RuntimeApi(
+				RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx))
+			) if parent == hash => {
+				tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)).unwrap();
+			}
+		);
+
+		// Check that the subsystem job issues a request for the disabled validators.
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::RuntimeApi(
+				RuntimeApiMessage::Request(parent, RuntimeApiRequest::DisabledValidators(tx))
+			) if parent == hash => {
+				tx.send(Ok(Vec::new())).unwrap();
+			}
+		);
 	}
 }
 
diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs
index 8893bdc6549..3970b857261 100644
--- a/polkadot/node/core/provisioner/src/lib.rs
+++ b/polkadot/node/core/provisioner/src/lib.rs
@@ -29,13 +29,13 @@ use polkadot_node_subsystem::{
 	jaeger,
 	messages::{
 		CandidateBackingMessage, ChainApiMessage, ProspectiveParachainsMessage, ProvisionableData,
-		ProvisionerInherentData, ProvisionerMessage, RuntimeApiMessage, RuntimeApiRequest,
+		ProvisionerInherentData, ProvisionerMessage, RuntimeApiRequest,
 	},
 	overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, PerLeafSpan,
-	RuntimeApiError, SpawnedSubsystem, SubsystemError,
+	SpawnedSubsystem, SubsystemError,
 };
 use polkadot_node_subsystem_util::{
-	request_availability_cores, request_persisted_validation_data,
+	has_required_runtime, request_availability_cores, request_persisted_validation_data,
 	runtime::{prospective_parachains_mode, ProspectiveParachainsMode},
 	TimeoutExt,
 };
@@ -856,56 +856,3 @@ fn bitfields_indicate_availability(
 
 	3 * availability.count_ones() >= 2 * availability.len()
 }
-
-// If we have to be absolutely precise here, this method gets the version of the `ParachainHost`
-// api. For brevity we'll just call it 'runtime version'.
-async fn has_required_runtime(
-	sender: &mut impl overseer::ProvisionerSenderTrait,
-	relay_parent: Hash,
-	required_runtime_version: u32,
-) -> bool {
-	gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
-
-	let (tx, rx) = oneshot::channel();
-	sender
-		.send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
-		.await;
-
-	match rx.await {
-		Result::Ok(Ok(runtime_version)) => {
-			gum::trace!(
-				target: LOG_TARGET,
-				?relay_parent,
-				?runtime_version,
-				?required_runtime_version,
-				"Fetched  ParachainHost runtime api version"
-			);
-			runtime_version >= required_runtime_version
-		},
-		Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
-			gum::trace!(
-				target: LOG_TARGET,
-				?relay_parent,
-				?error,
-				"Execution error while fetching ParachainHost runtime api version"
-			);
-			false
-		},
-		Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
-			gum::trace!(
-				target: LOG_TARGET,
-				?relay_parent,
-				"NotSupported error while fetching ParachainHost runtime api version"
-			);
-			false
-		},
-		Result::Err(_) => {
-			gum::trace!(
-				target: LOG_TARGET,
-				?relay_parent,
-				"Cancelled error while fetching ParachainHost runtime api version"
-			);
-			false
-		},
-	}
-}
diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs
index a5f3e9d4a0c..1fe52767df6 100644
--- a/polkadot/node/subsystem-util/src/lib.rs
+++ b/polkadot/node/subsystem-util/src/lib.rs
@@ -55,6 +55,7 @@ use sp_core::ByteArray;
 use sp_keystore::{Error as KeystoreError, KeystorePtr};
 use std::time::Duration;
 use thiserror::Error;
+use vstaging::get_disabled_validators_with_fallback;
 
 pub use metered;
 pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
@@ -79,6 +80,9 @@ pub mod inclusion_emulator;
 /// Convenient and efficient runtime info access.
 pub mod runtime;
 
+/// Helpers for working with unreleased runtime calls
+pub mod vstaging;
+
 /// Nested message sending
 ///
 /// Useful for having mostly synchronous code, with submodules spawning short lived asynchronous
@@ -92,6 +96,8 @@ mod determine_new_blocks;
 #[cfg(test)]
 mod tests;
 
+const LOG_TARGET: &'static str = "parachain::subsystem-util";
+
 /// Duration a job will wait after sending a stop signal before hard-aborting.
 pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
 /// Capacity of channels to and from individual jobs
@@ -157,6 +163,62 @@ where
 	rx
 }
 
+/// Verifies if `ParachainHost` runtime api is at least at version `required_runtime_version`. This
+/// method is used to determine if a given runtime call is supported by the runtime.
+pub async fn has_required_runtime<Sender>(
+	sender: &mut Sender,
+	relay_parent: Hash,
+	required_runtime_version: u32,
+) -> bool
+where
+	Sender: SubsystemSender<RuntimeApiMessage>,
+{
+	gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
+
+	let (tx, rx) = oneshot::channel();
+	sender
+		.send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
+		.await;
+
+	match rx.await {
+		Result::Ok(Ok(runtime_version)) => {
+			gum::trace!(
+				target: LOG_TARGET,
+				?relay_parent,
+				?runtime_version,
+				?required_runtime_version,
+				"Fetched  ParachainHost runtime api version"
+			);
+			runtime_version >= required_runtime_version
+		},
+		Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
+			gum::trace!(
+				target: LOG_TARGET,
+				?relay_parent,
+				?error,
+				"Execution error while fetching ParachainHost runtime api version"
+			);
+			false
+		},
+		Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
+			gum::trace!(
+				target: LOG_TARGET,
+				?relay_parent,
+				"NotSupported error while fetching ParachainHost runtime api version"
+			);
+			false
+		},
+		Result::Err(_) => {
+			gum::trace!(
+				target: LOG_TARGET,
+				?relay_parent,
+				"Cancelled error while fetching ParachainHost runtime api version"
+			);
+			false
+		},
+	}
+}
+
 /// Construct specialized request functions for the runtime.
 ///
 /// These would otherwise get pretty repetitive.
@@ -378,6 +440,7 @@ pub struct Validator {
 	signing_context: SigningContext,
 	key: ValidatorId,
 	index: ValidatorIndex,
+	disabled: bool,
 }
 
 impl Validator {
@@ -399,7 +462,12 @@ impl Validator {
 
 		let validators = validators?;
 
-		Self::construct(&validators, signing_context, keystore)
+		// TODO: https://github.com/paritytech/polkadot-sdk/issues/1940
+		// When `DisabledValidators` is released remove this and add a
+		// `request_disabled_validators` call here
+		let disabled_validators = get_disabled_validators_with_fallback(sender, parent).await?;
+
+		Self::construct(&validators, &disabled_validators, signing_context, keystore)
 	}
 
 	/// Construct a validator instance without performing runtime fetches.
@@ -407,13 +475,16 @@ impl Validator {
 	/// This can be useful if external code also needs the same data.
 	pub fn construct(
 		validators: &[ValidatorId],
+		disabled_validators: &[ValidatorIndex],
 		signing_context: SigningContext,
 		keystore: KeystorePtr,
 	) -> Result<Self, Error> {
 		let (key, index) =
 			signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
 
-		Ok(Validator { signing_context, key, index })
+		let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
+
+		Ok(Validator { signing_context, key, index, disabled })
 	}
 
 	/// Get this validator's id.
@@ -426,6 +497,11 @@ impl Validator {
 		self.index
 	}
 
+	/// Get the enabled/disabled state of this validator
+	pub fn disabled(&self) -> bool {
+		self.disabled
+	}
+
 	/// Get the current signing context.
 	pub fn signing_context(&self) -> &SigningContext {
 		&self.signing_context
diff --git a/polkadot/node/subsystem-util/src/vstaging.rs b/polkadot/node/subsystem-util/src/vstaging.rs
new file mode 100644
index 00000000000..3efd3b61f93
--- /dev/null
+++ b/polkadot/node/subsystem-util/src/vstaging.rs
@@ -0,0 +1,56 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Contains helpers for staging runtime calls.
+//!
+//! This module is intended to contain common boiler plate code handling unreleased runtime API
+//! calls.
+
+use polkadot_node_subsystem_types::messages::{RuntimeApiMessage, RuntimeApiRequest};
+use polkadot_overseer::SubsystemSender;
+use polkadot_primitives::{Hash, ValidatorIndex};
+
+use crate::{has_required_runtime, request_disabled_validators, Error};
+
+const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging";
+
+// TODO: https://github.com/paritytech/polkadot-sdk/issues/1940
+/// Returns disabled validators list if the runtime supports it. Otherwise logs a debug messages and
+/// returns an empty vec.
+/// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this function and
+/// replace all usages with `request_disabled_validators`
+pub async fn get_disabled_validators_with_fallback<Sender: SubsystemSender<RuntimeApiMessage>>(
+	sender: &mut Sender,
+	relay_parent: Hash,
+) -> Result<Vec<ValidatorIndex>, Error> {
+	let disabled_validators = if has_required_runtime(
+		sender,
+		relay_parent,
+		RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT,
+	)
+	.await
+	{
+		request_disabled_validators(relay_parent, sender)
+			.await
+			.await
+			.map_err(Error::Oneshot)??
+	} else {
+		gum::debug!(target: LOG_TARGET, "Runtime doesn't support `DisabledValidators` - continuing with an empty disabled validators set");
+		vec![]
+	};
+
+	Ok(disabled_validators)
+}
diff --git a/prdoc/pr_2764.prdoc b/prdoc/pr_2764.prdoc
new file mode 100644
index 00000000000..adfa4f47c93
--- /dev/null
+++ b/prdoc/pr_2764.prdoc
@@ -0,0 +1,16 @@
+title: Validator disabling in Backing.
+
+doc:
+  - audience: Node Operator
+    description: |
+      Once a validator has been disabled for misbehavior, it will no longer
+      sign backing statements in the current era.
+
+migrations:
+  db: []
+  runtime: []
+
+crates:
+  - name: polkadot-node-core-backing
+
+host_functions: []
-- 
GitLab