diff --git a/Cargo.lock b/Cargo.lock
index 54a01f12f35c1a39f26acd8e53884250f6cd3d02..4db38311fe665da6079543701c556309aa1334e9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -13589,8 +13589,10 @@ dependencies = [
  "nix 0.28.0",
  "parity-scale-codec",
  "polkadot-node-core-pvf-common",
+ "polkadot-node-primitives",
  "polkadot-parachain-primitives",
  "polkadot-primitives",
+ "sp-maybe-compressed-blob",
  "tracing-gum",
 ]
 
@@ -13605,6 +13607,7 @@ dependencies = [
  "nix 0.28.0",
  "parity-scale-codec",
  "polkadot-node-core-pvf-common",
+ "polkadot-node-primitives",
  "polkadot-primitives",
  "rayon",
  "rococo-runtime",
diff --git a/polkadot/node/core/candidate-validation/Cargo.toml b/polkadot/node/core/candidate-validation/Cargo.toml
index 13ab3e3fba50af4db1dc13497729f6c932ab7348..fcacc38cae65cb6b6bd8fe56a247c38a06fb05a1 100644
--- a/polkadot/node/core/candidate-validation/Cargo.toml
+++ b/polkadot/node/core/candidate-validation/Cargo.toml
@@ -17,7 +17,6 @@ gum = { workspace = true, default-features = true }
 
 sp-keystore = { workspace = true }
 sp-application-crypto = { workspace = true }
-sp-maybe-compressed-blob = { workspace = true, default-features = true }
 codec = { features = ["bit-vec", "derive"], workspace = true }
 
 polkadot-primitives = { workspace = true, default-features = true }
@@ -36,5 +35,6 @@ sp-keyring = { workspace = true, default-features = true }
 futures = { features = ["thread-pool"], workspace = true }
 assert_matches = { workspace = true }
 polkadot-node-subsystem-test-helpers = { workspace = true }
+sp-maybe-compressed-blob = { workspace = true, default-features = true }
 sp-core = { workspace = true, default-features = true }
 polkadot-primitives-test-helpers = { workspace = true }
diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs
index 1985964ebc512ea8bdb53631896643f9dcb85735..103d29e8d269429be3dc94fa3f0e4d85d4c98a67 100644
--- a/polkadot/node/core/candidate-validation/src/lib.rs
+++ b/polkadot/node/core/candidate-validation/src/lib.rs
@@ -27,9 +27,7 @@ use polkadot_node_core_pvf::{
 	InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
 	PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
 };
-use polkadot_node_primitives::{
-	BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
-};
+use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult};
 use polkadot_node_subsystem::{
 	errors::RuntimeApiError,
 	messages::{
@@ -41,9 +39,7 @@ use polkadot_node_subsystem::{
 };
 use polkadot_node_subsystem_util as util;
 use polkadot_overseer::ActiveLeavesUpdate;
-use polkadot_parachain_primitives::primitives::{
-	ValidationParams, ValidationResult as WasmValidationResult,
-};
+use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
 use polkadot_primitives::{
 	executor_params::{
 		DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT,
@@ -504,21 +500,12 @@ where
 			continue;
 		};
 
-		let pvf = match sp_maybe_compressed_blob::decompress(
-			&validation_code.0,
-			VALIDATION_CODE_BOMB_LIMIT,
-		) {
-			Ok(code) => PvfPrepData::from_code(
-				code.into_owned(),
-				executor_params.clone(),
-				timeout,
-				PrepareJobKind::Prechecking,
-			),
-			Err(e) => {
-				gum::debug!(target: LOG_TARGET, err=?e, "cannot decompress validation code");
-				continue
-			},
-		};
+		let pvf = PvfPrepData::from_code(
+			validation_code.0,
+			executor_params.clone(),
+			timeout,
+			PrepareJobKind::Prechecking,
+		);
 
 		active_pvfs.push(pvf);
 		processed_code_hashes.push(code_hash);
@@ -651,21 +638,12 @@ where
 
 	let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck);
 
-	let pvf = match sp_maybe_compressed_blob::decompress(
-		&validation_code.0,
-		VALIDATION_CODE_BOMB_LIMIT,
-	) {
-		Ok(code) => PvfPrepData::from_code(
-			code.into_owned(),
-			executor_params,
-			timeout,
-			PrepareJobKind::Prechecking,
-		),
-		Err(e) => {
-			gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code");
-			return PreCheckOutcome::Invalid
-		},
-	};
+	let pvf = PvfPrepData::from_code(
+		validation_code.0,
+		executor_params,
+		timeout,
+		PrepareJobKind::Prechecking,
+	);
 
 	match validation_backend.precheck_pvf(pvf).await {
 		Ok(_) => PreCheckOutcome::Valid,
@@ -873,41 +851,7 @@ async fn validate_candidate_exhaustive(
 		return Ok(ValidationResult::Invalid(e))
 	}
 
-	let raw_validation_code = match sp_maybe_compressed_blob::decompress(
-		&validation_code.0,
-		VALIDATION_CODE_BOMB_LIMIT,
-	) {
-		Ok(code) => code,
-		Err(e) => {
-			gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (validation code)");
-
-			// Code already passed pre-checking, if decompression fails now this most likely means
-			// some local corruption happened.
-			return Err(ValidationFailed("Code decompression failed".to_string()))
-		},
-	};
-	metrics.observe_code_size(raw_validation_code.len());
-
-	metrics.observe_pov_size(pov.block_data.0.len(), true);
-	let raw_block_data =
-		match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
-			Ok(block_data) => BlockData(block_data.to_vec()),
-			Err(e) => {
-				gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (PoV code)");
-
-				// If the PoV is invalid, the candidate certainly is.
-				return Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))
-			},
-		};
-	metrics.observe_pov_size(raw_block_data.0.len(), false);
-
-	let params = ValidationParams {
-		parent_head: persisted_validation_data.parent_head.clone(),
-		block_data: raw_block_data,
-		relay_parent_number: persisted_validation_data.relay_parent_number,
-		relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root,
-	};
-
+	let persisted_validation_data = Arc::new(persisted_validation_data);
 	let result = match exec_kind {
 		// Retry is disabled to reduce the chance of nondeterministic blocks getting backed and
 		// honest backers getting slashed.
@@ -915,7 +859,7 @@ async fn validate_candidate_exhaustive(
 			let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
 			let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind);
 			let pvf = PvfPrepData::from_code(
-				raw_validation_code.to_vec(),
+				validation_code.0,
 				executor_params,
 				prep_timeout,
 				PrepareJobKind::Compilation,
@@ -925,7 +869,8 @@ async fn validate_candidate_exhaustive(
 				.validate_candidate(
 					pvf,
 					exec_timeout,
-					params.encode(),
+					persisted_validation_data.clone(),
+					pov,
 					polkadot_node_core_pvf::Priority::Normal,
 				)
 				.await
@@ -933,9 +878,10 @@ async fn validate_candidate_exhaustive(
 		PvfExecKind::Approval =>
 			validation_backend
 				.validate_candidate_with_retry(
-					raw_validation_code.to_vec(),
+					validation_code.0,
 					pvf_exec_timeout(&executor_params, exec_kind),
-					params,
+					persisted_validation_data.clone(),
+					pov,
 					executor_params,
 					PVF_APPROVAL_EXECUTION_RETRY_DELAY,
 					polkadot_node_core_pvf::Priority::Critical,
@@ -961,6 +907,8 @@ async fn validate_candidate_exhaustive(
 			Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
 		Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
 			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
+		Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) =>
+			Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)),
 		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) =>
 			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
 				"ambiguous worker death".to_string(),
@@ -1007,7 +955,7 @@ async fn validate_candidate_exhaustive(
 					// invalid.
 					Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))
 				} else {
-					Ok(ValidationResult::Valid(outputs, persisted_validation_data))
+					Ok(ValidationResult::Valid(outputs, (*persisted_validation_data).clone()))
 				}
 			},
 	}
@@ -1020,7 +968,8 @@ trait ValidationBackend {
 		&mut self,
 		pvf: PvfPrepData,
 		exec_timeout: Duration,
-		encoded_params: Vec<u8>,
+		pvd: Arc<PersistedValidationData>,
+		pov: Arc<PoV>,
 		// The priority for the preparation job.
 		prepare_priority: polkadot_node_core_pvf::Priority,
 	) -> Result<WasmValidationResult, ValidationError>;
@@ -1035,9 +984,10 @@ trait ValidationBackend {
 	/// preparation.
 	async fn validate_candidate_with_retry(
 		&mut self,
-		raw_validation_code: Vec<u8>,
+		code: Vec<u8>,
 		exec_timeout: Duration,
-		params: ValidationParams,
+		pvd: Arc<PersistedValidationData>,
+		pov: Arc<PoV>,
 		executor_params: ExecutorParams,
 		retry_delay: Duration,
 		// The priority for the preparation job.
@@ -1046,7 +996,7 @@ trait ValidationBackend {
 		let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
 		// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
 		let pvf = PvfPrepData::from_code(
-			raw_validation_code,
+			code,
 			executor_params,
 			prep_timeout,
 			PrepareJobKind::Compilation,
@@ -1057,7 +1007,13 @@ trait ValidationBackend {
 
 		// Use `Priority::Critical` as finality trumps parachain liveliness.
 		let mut validation_result = self
-			.validate_candidate(pvf.clone(), exec_timeout, params.encode(), prepare_priority)
+			.validate_candidate(
+				pvf.clone(),
+				exec_timeout,
+				pvd.clone(),
+				pov.clone(),
+				prepare_priority,
+			)
 			.await;
 		if validation_result.is_ok() {
 			return validation_result
@@ -1130,10 +1086,14 @@ trait ValidationBackend {
 					validation_result
 				);
 
-				// Encode the params again when re-trying. We expect the retry case to be relatively
-				// rare, and we want to avoid unconditionally cloning data.
 				validation_result = self
-					.validate_candidate(pvf.clone(), new_timeout, params.encode(), prepare_priority)
+					.validate_candidate(
+						pvf.clone(),
+						new_timeout,
+						pvd.clone(),
+						pov.clone(),
+						prepare_priority,
+					)
 					.await;
 			}
 		}
@@ -1153,13 +1113,13 @@ impl ValidationBackend for ValidationHost {
 		&mut self,
 		pvf: PvfPrepData,
 		exec_timeout: Duration,
-		encoded_params: Vec<u8>,
+		pvd: Arc<PersistedValidationData>,
+		pov: Arc<PoV>,
 		// The priority for the preparation job.
 		prepare_priority: polkadot_node_core_pvf::Priority,
 	) -> Result<WasmValidationResult, ValidationError> {
 		let (tx, rx) = oneshot::channel();
-		if let Err(err) =
-			self.execute_pvf(pvf, exec_timeout, encoded_params, prepare_priority, tx).await
+		if let Err(err) = self.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, tx).await
 		{
 			return Err(InternalValidationError::HostCommunication(format!(
 				"cannot send pvf to the validation host, it might have shut down: {:?}",
diff --git a/polkadot/node/core/candidate-validation/src/metrics.rs b/polkadot/node/core/candidate-validation/src/metrics.rs
index 28fc957ddb1a7acd0b5c67f4a52e70bf5e56f734..1459907aa5999980dd82ec0e08a5839959167cef 100644
--- a/polkadot/node/core/candidate-validation/src/metrics.rs
+++ b/polkadot/node/core/candidate-validation/src/metrics.rs
@@ -23,8 +23,6 @@ pub(crate) struct MetricsInner {
 	pub(crate) validate_from_chain_state: prometheus::Histogram,
 	pub(crate) validate_from_exhaustive: prometheus::Histogram,
 	pub(crate) validate_candidate_exhaustive: prometheus::Histogram,
-	pub(crate) pov_size: prometheus::HistogramVec,
-	pub(crate) code_size: prometheus::Histogram,
 }
 
 /// Candidate validation metrics.
@@ -70,21 +68,6 @@ impl Metrics {
 			.as_ref()
 			.map(|metrics| metrics.validate_candidate_exhaustive.start_timer())
 	}
-
-	pub fn observe_code_size(&self, code_size: usize) {
-		if let Some(metrics) = &self.0 {
-			metrics.code_size.observe(code_size as f64);
-		}
-	}
-
-	pub fn observe_pov_size(&self, pov_size: usize, compressed: bool) {
-		if let Some(metrics) = &self.0 {
-			metrics
-				.pov_size
-				.with_label_values(&[if compressed { "true" } else { "false" }])
-				.observe(pov_size as f64);
-		}
-	}
 }
 
 impl metrics::Metrics for Metrics {
@@ -121,33 +104,6 @@ impl metrics::Metrics for Metrics {
 				))?,
 				registry,
 			)?,
-			pov_size: prometheus::register(
-				prometheus::HistogramVec::new(
-					prometheus::HistogramOpts::new(
-						"polkadot_parachain_candidate_validation_pov_size",
-						"The compressed and decompressed size of the proof of validity of a candidate",
-					)
-					.buckets(
-						prometheus::exponential_buckets(16384.0, 2.0, 10)
-							.expect("arguments are always valid; qed"),
-					),
-					&["compressed"],
-				)?,
-				registry,
-			)?,
-			code_size: prometheus::register(
-				prometheus::Histogram::with_opts(
-					prometheus::HistogramOpts::new(
-						"polkadot_parachain_candidate_validation_code_size",
-						"The size of the decompressed WASM validation blob used for checking a candidate",
-					)
-					.buckets(
-						prometheus::exponential_buckets(16384.0, 2.0, 10)
-							.expect("arguments are always valid; qed"),
-					),
-				)?,
-				registry,
-			)?,
 		};
 		Ok(Metrics(Some(metrics)))
 	}
diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs
index 86d855f78b454dcea2e8ac50fb77ea6ed2afe727..55282fdf4ee1da540f8aa95f272ccee0352a7390 100644
--- a/polkadot/node/core/candidate-validation/src/tests.rs
+++ b/polkadot/node/core/candidate-validation/src/tests.rs
@@ -20,6 +20,7 @@ use super::*;
 use assert_matches::assert_matches;
 use futures::executor;
 use polkadot_node_core_pvf::PrepareError;
+use polkadot_node_primitives::{BlockData, VALIDATION_CODE_BOMB_LIMIT};
 use polkadot_node_subsystem::messages::AllMessages;
 use polkadot_node_subsystem_util::reexports::SubsystemContext;
 use polkadot_overseer::ActivatedLeaf;
@@ -385,7 +386,8 @@ impl ValidationBackend for MockValidateCandidateBackend {
 		&mut self,
 		_pvf: PvfPrepData,
 		_timeout: Duration,
-		_encoded_params: Vec<u8>,
+		_pvd: Arc<PersistedValidationData>,
+		_pov: Arc<PoV>,
 		_prepare_priority: polkadot_node_core_pvf::Priority,
 	) -> Result<WasmValidationResult, ValidationError> {
 		// This is expected to panic if called more times than expected, indicating an error in the
@@ -950,115 +952,6 @@ fn compressed_code_works() {
 	assert_matches!(v, Ok(ValidationResult::Valid(_, _)));
 }
 
-#[test]
-fn code_decompression_failure_is_error() {
-	let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
-	let pov = PoV { block_data: BlockData(vec![1; 32]) };
-	let head_data = HeadData(vec![1, 1, 1]);
-
-	let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1];
-	let validation_code =
-		sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1)
-			.map(ValidationCode)
-			.unwrap();
-
-	let descriptor = make_valid_candidate_descriptor(
-		ParaId::from(1_u32),
-		dummy_hash(),
-		validation_data.hash(),
-		pov.hash(),
-		validation_code.hash(),
-		head_data.hash(),
-		dummy_hash(),
-		Sr25519Keyring::Alice,
-	);
-
-	let validation_result = WasmValidationResult {
-		head_data,
-		new_validation_code: None,
-		upward_messages: Default::default(),
-		horizontal_messages: Default::default(),
-		processed_downward_messages: 0,
-		hrmp_watermark: 0,
-	};
-
-	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
-
-	let pool = TaskExecutor::new();
-	let (_ctx, _ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::<
-		AllMessages,
-		_,
-	>(pool.clone());
-
-	let v = executor::block_on(validate_candidate_exhaustive(
-		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
-		validation_data,
-		validation_code,
-		candidate_receipt,
-		Arc::new(pov),
-		ExecutorParams::default(),
-		PvfExecKind::Backing,
-		&Default::default(),
-	));
-
-	assert_matches!(v, Err(_));
-}
-
-#[test]
-fn pov_decompression_failure_is_invalid() {
-	let validation_data =
-		PersistedValidationData { max_pov_size: POV_BOMB_LIMIT as u32, ..Default::default() };
-	let head_data = HeadData(vec![1, 1, 1]);
-
-	let raw_block_data = vec![2u8; POV_BOMB_LIMIT + 1];
-	let pov = sp_maybe_compressed_blob::compress(&raw_block_data, POV_BOMB_LIMIT + 1)
-		.map(|raw| PoV { block_data: BlockData(raw) })
-		.unwrap();
-
-	let validation_code = ValidationCode(vec![2; 16]);
-
-	let descriptor = make_valid_candidate_descriptor(
-		ParaId::from(1_u32),
-		dummy_hash(),
-		validation_data.hash(),
-		pov.hash(),
-		validation_code.hash(),
-		head_data.hash(),
-		dummy_hash(),
-		Sr25519Keyring::Alice,
-	);
-
-	let validation_result = WasmValidationResult {
-		head_data,
-		new_validation_code: None,
-		upward_messages: Default::default(),
-		horizontal_messages: Default::default(),
-		processed_downward_messages: 0,
-		hrmp_watermark: 0,
-	};
-
-	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
-
-	let pool = TaskExecutor::new();
-	let (_ctx, _ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::<
-		AllMessages,
-		_,
-	>(pool.clone());
-
-	let v = executor::block_on(validate_candidate_exhaustive(
-		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
-		validation_data,
-		validation_code,
-		candidate_receipt,
-		Arc::new(pov),
-		ExecutorParams::default(),
-		PvfExecKind::Backing,
-		&Default::default(),
-	));
-
-	assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)));
-}
-
 struct MockPreCheckBackend {
 	result: Result<(), PrepareError>,
 }
@@ -1075,7 +968,8 @@ impl ValidationBackend for MockPreCheckBackend {
 		&mut self,
 		_pvf: PvfPrepData,
 		_timeout: Duration,
-		_encoded_params: Vec<u8>,
+		_pvd: Arc<PersistedValidationData>,
+		_pov: Arc<PoV>,
 		_prepare_priority: polkadot_node_core_pvf::Priority,
 	) -> Result<WasmValidationResult, ValidationError> {
 		unreachable!()
@@ -1149,70 +1043,6 @@ fn precheck_works() {
 	executor::block_on(test_fut);
 }
 
-#[test]
-fn precheck_invalid_pvf_blob_compression() {
-	let relay_parent = [3; 32].into();
-
-	let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1];
-	let validation_code =
-		sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1)
-			.map(ValidationCode)
-			.unwrap();
-	let validation_code_hash = validation_code.hash();
-
-	let pool = TaskExecutor::new();
-	let (mut ctx, mut ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::<
-		AllMessages,
-		_,
-	>(pool.clone());
-
-	let (check_fut, check_result) = precheck_pvf(
-		ctx.sender(),
-		MockPreCheckBackend::with_hardcoded_result(Ok(())),
-		relay_parent,
-		validation_code_hash,
-	)
-	.remote_handle();
-
-	let test_fut = async move {
-		assert_matches!(
-			ctx_handle.recv().await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				rp,
-				RuntimeApiRequest::ValidationCodeByHash(
-					vch,
-					tx
-				),
-			)) => {
-				assert_eq!(vch, validation_code_hash);
-				assert_eq!(rp, relay_parent);
-
-				let _ = tx.send(Ok(Some(validation_code.clone())));
-			}
-		);
-		assert_matches!(
-			ctx_handle.recv().await,
-			AllMessages::RuntimeApi(
-				RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx))
-			) => {
-				tx.send(Ok(1u32.into())).unwrap();
-			}
-		);
-		assert_matches!(
-			ctx_handle.recv().await,
-			AllMessages::RuntimeApi(
-				RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx))
-			) => {
-				tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
-			}
-		);
-		assert_matches!(check_result.await, PreCheckOutcome::Invalid);
-	};
-
-	let test_fut = future::join(test_fut, check_fut);
-	executor::block_on(test_fut);
-}
-
 #[test]
 fn precheck_properly_classifies_outcomes() {
 	let inner = |prepare_result, precheck_outcome| {
@@ -1292,7 +1122,8 @@ impl ValidationBackend for MockHeadsUp {
 		&mut self,
 		_pvf: PvfPrepData,
 		_timeout: Duration,
-		_encoded_params: Vec<u8>,
+		_pvd: Arc<PersistedValidationData>,
+		_pov: Arc<PoV>,
 		_prepare_priority: polkadot_node_core_pvf::Priority,
 	) -> Result<WasmValidationResult, ValidationError> {
 		unreachable!()
diff --git a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
index 97a03e6596d16e2d10219331e58dd2d19647b531..342128b7cca21d5fd00544062e1cb35ba4126239 100644
--- a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
+++ b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
@@ -116,7 +116,7 @@ fn host_prepare_rococo_runtime(c: &mut Criterion) {
 						cfg.prepare_workers_hard_max_num = 1;
 					})
 					.await,
-					pvf.clone().code(),
+					pvf.clone().maybe_compressed_code(),
 				)
 			},
 			|result| async move {
diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs
index 7ee05448d3c5cb64f54d8ed64dd865da1ad8c87a..b0cdba9501dbeec396c2262da8d02f2627992ed0 100644
--- a/polkadot/node/core/pvf/common/src/error.rs
+++ b/polkadot/node/core/pvf/common/src/error.rs
@@ -94,6 +94,10 @@ pub enum PrepareError {
 	#[codec(index = 11)]
 	#[error("prepare: error interfacing with the kernel: {0}")]
 	Kernel(String),
+	/// Code blob failed to decompress
+	#[codec(index = 12)]
+	#[error("prepare: could not decompress code blob: {0}")]
+	CouldNotDecompressCodeBlob(String),
 }
 
 impl PrepareError {
@@ -106,7 +110,11 @@ impl PrepareError {
 	pub fn is_deterministic(&self) -> bool {
 		use PrepareError::*;
 		match self {
-			Prevalidation(_) | Preparation(_) | JobError(_) | OutOfMemory => true,
+			Prevalidation(_) |
+			Preparation(_) |
+			JobError(_) |
+			OutOfMemory |
+			CouldNotDecompressCodeBlob(_) => true,
 			IoErr(_) |
 			JobDied { .. } |
 			CreateTmpFile(_) |
diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs
index 46862f9f80b60324185363c595e221582ce53b1e..cff3f3b86e95265956ef935427a570e8565a7d7f 100644
--- a/polkadot/node/core/pvf/common/src/execute.rs
+++ b/polkadot/node/core/pvf/common/src/execute.rs
@@ -35,6 +35,8 @@ pub struct WorkerResponse {
 	pub job_response: JobResponse,
 	/// The amount of CPU time taken by the job.
 	pub duration: Duration,
+	/// The uncompressed PoV size.
+	pub pov_size: u32,
 }
 
 /// An error occurred in the worker process.
@@ -77,6 +79,8 @@ pub enum JobResponse {
 	RuntimeConstruction(String),
 	/// The candidate is invalid.
 	InvalidCandidate(String),
+	/// PoV decompression failed
+	PoVDecompressionFailure,
 }
 
 impl JobResponse {
diff --git a/polkadot/node/core/pvf/common/src/prepare.rs b/polkadot/node/core/pvf/common/src/prepare.rs
index 81e165a7b8a499432b4b766b43b9400172ba5822..4cd1beb309918b1dedb7e3a0e7ee2dd03794c50d 100644
--- a/polkadot/node/core/pvf/common/src/prepare.rs
+++ b/polkadot/node/core/pvf/common/src/prepare.rs
@@ -44,6 +44,8 @@ pub struct PrepareStats {
 	pub cpu_time_elapsed: std::time::Duration,
 	/// The observed memory statistics for the preparation job.
 	pub memory_stats: MemoryStats,
+	/// The decompressed Wasm code length observed during the preparation.
+	pub observed_wasm_code_len: u32,
 }
 
 /// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if
diff --git a/polkadot/node/core/pvf/common/src/pvf.rs b/polkadot/node/core/pvf/common/src/pvf.rs
index e2ac36a2406ac28213107d1e0b42d8f215e04ea3..4019a8d8b0d0031a1a44b345b59124a994606d32 100644
--- a/polkadot/node/core/pvf/common/src/pvf.rs
+++ b/polkadot/node/core/pvf/common/src/pvf.rs
@@ -26,9 +26,9 @@ use std::{fmt, sync::Arc, time::Duration};
 /// Should be cheap to clone.
 #[derive(Clone, Encode, Decode)]
 pub struct PvfPrepData {
-	/// Wasm code (uncompressed)
-	code: Arc<Vec<u8>>,
-	/// Wasm code hash
+	/// Wasm code (maybe compressed)
+	maybe_compressed_code: Arc<Vec<u8>>,
+	/// Wasm code hash.
 	code_hash: ValidationCodeHash,
 	/// Executor environment parameters for the session for which artifact is prepared
 	executor_params: Arc<ExecutorParams>,
@@ -46,20 +46,20 @@ impl PvfPrepData {
 		prep_timeout: Duration,
 		prep_kind: PrepareJobKind,
 	) -> Self {
-		let code = Arc::new(code);
-		let code_hash = sp_crypto_hashing::blake2_256(&code).into();
+		let maybe_compressed_code = Arc::new(code);
+		let code_hash = sp_crypto_hashing::blake2_256(&maybe_compressed_code).into();
 		let executor_params = Arc::new(executor_params);
-		Self { code, code_hash, executor_params, prep_timeout, prep_kind }
+		Self { maybe_compressed_code, code_hash, executor_params, prep_timeout, prep_kind }
 	}
 
-	/// Returns validation code hash for the PVF
+	/// Returns validation code hash
 	pub fn code_hash(&self) -> ValidationCodeHash {
 		self.code_hash
 	}
 
-	/// Returns PVF code
-	pub fn code(&self) -> Arc<Vec<u8>> {
-		self.code.clone()
+	/// Returns PVF code blob
+	pub fn maybe_compressed_code(&self) -> Arc<Vec<u8>> {
+		self.maybe_compressed_code.clone()
 	}
 
 	/// Returns executor params
diff --git a/polkadot/node/core/pvf/execute-worker/Cargo.toml b/polkadot/node/core/pvf/execute-worker/Cargo.toml
index f24b66dc4a0e8ba20f6b7a8205c7092a47c30245..6ad340d25612860f87469497c332079e11eda266 100644
--- a/polkadot/node/core/pvf/execute-worker/Cargo.toml
+++ b/polkadot/node/core/pvf/execute-worker/Cargo.toml
@@ -19,8 +19,11 @@ libc = { workspace = true }
 codec = { features = ["derive"], workspace = true }
 
 polkadot-node-core-pvf-common = { workspace = true, default-features = true }
+polkadot-node-primitives = { workspace = true, default-features = true }
 polkadot-parachain-primitives = { workspace = true, default-features = true }
 polkadot-primitives = { workspace = true, default-features = true }
 
+sp-maybe-compressed-blob = { workspace = true, default-features = true }
+
 [features]
 builder = []
diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs
index 35858ab36cec3fd2990403ae0afcf848e5cb317e..4b7c167cc9ec322f070e9e964fecc5b782d68063 100644
--- a/polkadot/node/core/pvf/execute-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs
@@ -22,6 +22,7 @@
 pub use polkadot_node_core_pvf_common::{
 	error::ExecuteError, executor_interface::execute_artifact,
 };
+use polkadot_parachain_primitives::primitives::ValidationParams;
 
 // NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
 //       separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`.
@@ -50,8 +51,9 @@ use polkadot_node_core_pvf_common::{
 	},
 	worker_dir,
 };
+use polkadot_node_primitives::{BlockData, PoV, POV_BOMB_LIMIT};
 use polkadot_parachain_primitives::primitives::ValidationResult;
-use polkadot_primitives::ExecutorParams;
+use polkadot_primitives::{ExecutorParams, PersistedValidationData};
 use std::{
 	io::{self, Read},
 	os::{
@@ -85,8 +87,23 @@ fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
 	Ok(handshake)
 }
 
-fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, Duration)> {
-	let params = framed_recv_blocking(stream)?;
+fn recv_request(stream: &mut UnixStream) -> io::Result<(PersistedValidationData, PoV, Duration)> {
+	let pvd = framed_recv_blocking(stream)?;
+	let pvd = PersistedValidationData::decode(&mut &pvd[..]).map_err(|_| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			"execute pvf recv_request: failed to decode persisted validation data".to_string(),
+		)
+	})?;
+
+	let pov = framed_recv_blocking(stream)?;
+	let pov = PoV::decode(&mut &pov[..]).map_err(|_| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			"execute pvf recv_request: failed to decode PoV".to_string(),
+		)
+	})?;
+
 	let execution_timeout = framed_recv_blocking(stream)?;
 	let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| {
 		io::Error::new(
@@ -94,7 +111,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, Duration)> {
 			"execute pvf recv_request: failed to decode duration".to_string(),
 		)
 	})?;
-	Ok((params, execution_timeout))
+	Ok((pvd, pov, execution_timeout))
 }
 
 /// Sends an error to the host and returns the original error wrapped in `io::Error`.
@@ -149,7 +166,7 @@ pub fn worker_entrypoint(
 			let execute_thread_stack_size = max_stack_size(&executor_params);
 
 			loop {
-				let (params, execution_timeout) = recv_request(&mut stream).map_err(|e| {
+				let (pvd, pov, execution_timeout) = recv_request(&mut stream).map_err(|e| {
 					map_and_send_err!(
 						e,
 						InternalValidationError::HostCommunication,
@@ -197,7 +214,33 @@ pub fn worker_entrypoint(
 				let stream_fd = stream.as_raw_fd();
 
 				let compiled_artifact_blob = Arc::new(compiled_artifact_blob);
-				let params = Arc::new(params);
+
+				let raw_block_data =
+					match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
+						Ok(data) => data,
+						Err(_) => {
+							send_result::<WorkerResponse, WorkerError>(
+								&mut stream,
+								Ok(WorkerResponse {
+									job_response: JobResponse::PoVDecompressionFailure,
+									duration: Duration::ZERO,
+									pov_size: 0,
+								}),
+								worker_info,
+							)?;
+							continue;
+						},
+					};
+
+				let pov_size = raw_block_data.len() as u32;
+
+				let params = ValidationParams {
+					parent_head: pvd.parent_head.clone(),
+					block_data: BlockData(raw_block_data.to_vec()),
+					relay_parent_number: pvd.relay_parent_number,
+					relay_parent_storage_root: pvd.relay_parent_storage_root,
+				};
+				let params = Arc::new(params.encode());
 
 				cfg_if::cfg_if! {
 					if #[cfg(target_os = "linux")] {
@@ -214,6 +257,7 @@ pub fn worker_entrypoint(
 								worker_info,
 								security_status.can_unshare_user_namespace_and_change_root,
 								usage_before,
+								pov_size,
 							)?
 						} else {
 							// Fall back to using fork.
@@ -228,6 +272,7 @@ pub fn worker_entrypoint(
 								execute_thread_stack_size,
 								worker_info,
 								usage_before,
+								pov_size,
 							)?
 						};
 					} else {
@@ -242,6 +287,7 @@ pub fn worker_entrypoint(
 							execute_thread_stack_size,
 							worker_info,
 							usage_before,
+							pov_size,
 						)?;
 					}
 				}
@@ -300,6 +346,7 @@ fn handle_clone(
 	worker_info: &WorkerInfo,
 	have_unshare_newuser: bool,
 	usage_before: Usage,
+	pov_size: u32,
 ) -> io::Result<Result<WorkerResponse, WorkerError>> {
 	use polkadot_node_core_pvf_common::worker::security;
 
@@ -329,6 +376,7 @@ fn handle_clone(
 			worker_info,
 			child,
 			usage_before,
+			pov_size,
 			execution_timeout,
 		),
 		Err(security::clone::Error::Clone(errno)) =>
@@ -347,6 +395,7 @@ fn handle_fork(
 	execute_worker_stack_size: usize,
 	worker_info: &WorkerInfo,
 	usage_before: Usage,
+	pov_size: u32,
 ) -> io::Result<Result<WorkerResponse, WorkerError>> {
 	// SAFETY: new process is spawned within a single threaded process. This invariant
 	// is enforced by tests.
@@ -367,6 +416,7 @@ fn handle_fork(
 			worker_info,
 			child,
 			usage_before,
+			pov_size,
 			execution_timeout,
 		),
 		Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))),
@@ -513,6 +563,7 @@ fn handle_parent_process(
 	worker_info: &WorkerInfo,
 	job_pid: Pid,
 	usage_before: Usage,
+	pov_size: u32,
 	timeout: Duration,
 ) -> io::Result<Result<WorkerResponse, WorkerError>> {
 	// the read end will wait until all write ends have been closed,
@@ -578,7 +629,7 @@ fn handle_parent_process(
 						))));
 					}
 
-					Ok(Ok(WorkerResponse { job_response, duration: cpu_tv }))
+					Ok(Ok(WorkerResponse { job_response, pov_size, duration: cpu_tv }))
 				},
 				Err(job_error) => {
 					gum::warn!(
diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml
index 9e0d01fc438b0257b87b9bf6791f466d8691049e..56235bd82192f2e671eb0b9e08006a186966a814 100644
--- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml
+++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml
@@ -23,10 +23,12 @@ nix = { features = ["process", "resource", "sched"], workspace = true }
 codec = { features = ["derive"], workspace = true }
 
 polkadot-node-core-pvf-common = { workspace = true, default-features = true }
+polkadot-node-primitives = { workspace = true, default-features = true }
 polkadot-primitives = { workspace = true, default-features = true }
 
 sc-executor-common = { workspace = true, default-features = true }
 sc-executor-wasmtime = { workspace = true, default-features = true }
+sp-maybe-compressed-blob = { workspace = true, default-features = true }
 
 [target.'cfg(target_os = "linux")'.dependencies]
 tikv-jemallocator = "0.5.0"
diff --git a/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs
index d531c90b64b578e31f42a13f2399b4343469fa6d..49b30dc33ceb7695f0c6aba19279d6ed870edb7b 100644
--- a/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs
+++ b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs
@@ -24,7 +24,11 @@ use polkadot_primitives::ExecutorParams;
 use std::time::Duration;
 
 fn do_prepare_runtime(pvf: PvfPrepData) {
-	let blob = match prevalidate(&pvf.code()) {
+	let maybe_compressed_code = pvf.maybe_compressed_code();
+	let raw_validation_code =
+		sp_maybe_compressed_blob::decompress(&maybe_compressed_code, usize::MAX).unwrap();
+
+	let blob = match prevalidate(&raw_validation_code) {
 		Err(err) => panic!("{:?}", err),
 		Ok(b) => b,
 	};
diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
index ef33d11720eb5a7c4b7fa11b3484acbd5ad84d43..f8ebb6effcecdc4d3e71a95bbdaecfe51705352e 100644
--- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
@@ -38,6 +38,7 @@ use polkadot_node_core_pvf_common::{
 	executor_interface::{prepare, prevalidate},
 	worker::{pipe2_cloexec, PipeFd, WorkerInfo},
 };
+use polkadot_node_primitives::VALIDATION_CODE_BOMB_LIMIT;
 
 use codec::{Decode, Encode};
 use polkadot_node_core_pvf_common::{
@@ -105,6 +106,12 @@ impl AsRef<[u8]> for CompiledArtifact {
 	}
 }
 
+#[derive(Encode, Decode)]
+pub struct PrepareOutcome {
+	pub compiled_artifact: CompiledArtifact,
+	pub observed_wasm_code_len: u32,
+}
+
 /// Get a worker request.
 fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
 	let pvf = framed_recv_blocking(stream)?;
@@ -294,14 +301,23 @@ pub fn worker_entrypoint(
 	);
 }
 
-fn prepare_artifact(pvf: PvfPrepData) -> Result<CompiledArtifact, PrepareError> {
-	let blob = match prevalidate(&pvf.code()) {
+fn prepare_artifact(pvf: PvfPrepData) -> Result<PrepareOutcome, PrepareError> {
+	let maybe_compressed_code = pvf.maybe_compressed_code();
+	let raw_validation_code =
+		sp_maybe_compressed_blob::decompress(&maybe_compressed_code, VALIDATION_CODE_BOMB_LIMIT)
+			.map_err(|e| PrepareError::CouldNotDecompressCodeBlob(e.to_string()))?;
+	let observed_wasm_code_len = raw_validation_code.len() as u32;
+
+	let blob = match prevalidate(&raw_validation_code) {
 		Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
 		Ok(b) => b,
 	};
 
 	match prepare(blob, &pvf.executor_params()) {
-		Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
+		Ok(compiled_artifact) => Ok(PrepareOutcome {
+			compiled_artifact: CompiledArtifact::new(compiled_artifact),
+			observed_wasm_code_len,
+		}),
 		Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
 	}
 }
@@ -322,6 +338,7 @@ fn runtime_construction_check(
 struct JobResponse {
 	artifact: CompiledArtifact,
 	memory_stats: MemoryStats,
+	observed_wasm_code_len: u32,
 }
 
 #[cfg(target_os = "linux")]
@@ -500,11 +517,11 @@ fn handle_child_process(
 		"prepare worker",
 		move || {
 			#[allow(unused_mut)]
-			let mut result = prepare_artifact(pvf);
+			let mut result = prepare_artifact(pvf).map(|o| (o,));
 
 			// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
 			#[cfg(target_os = "linux")]
-			let mut result = result.map(|artifact| (artifact, get_max_rss_thread()));
+			let mut result = result.map(|outcome| (outcome.0, get_max_rss_thread()));
 
 			// If we are pre-checking, check for runtime construction errors.
 			//
@@ -513,7 +530,10 @@ fn handle_child_process(
 			// anyway.
 			if let PrepareJobKind::Prechecking = prepare_job_kind {
 				result = result.and_then(|output| {
-					runtime_construction_check(output.0.as_ref(), &executor_params)?;
+					runtime_construction_check(
+						output.0.compiled_artifact.as_ref(),
+						&executor_params,
+					)?;
 					Ok(output)
 				});
 			}
@@ -553,9 +573,9 @@ fn handle_child_process(
 				Ok(ok) => {
 					cfg_if::cfg_if! {
 						if #[cfg(target_os = "linux")] {
-							let (artifact, max_rss) = ok;
+							let (PrepareOutcome { compiled_artifact, observed_wasm_code_len }, max_rss) = ok;
 						} else {
-							let artifact = ok;
+							let (PrepareOutcome { compiled_artifact, observed_wasm_code_len },) = ok;
 						}
 					}
 
@@ -574,7 +594,11 @@ fn handle_child_process(
 						peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 },
 					};
 
-					Ok(JobResponse { artifact, memory_stats })
+					Ok(JobResponse {
+						artifact: compiled_artifact,
+						observed_wasm_code_len,
+						memory_stats,
+					})
 				},
 			}
 		},
@@ -665,7 +689,7 @@ fn handle_parent_process(
 
 			match result {
 				Err(err) => Err(err),
-				Ok(JobResponse { artifact, memory_stats }) => {
+				Ok(JobResponse { artifact, memory_stats, observed_wasm_code_len }) => {
 					// The exit status should have been zero if no error occurred.
 					if exit_status != 0 {
 						return Err(PrepareError::JobError(format!(
@@ -696,7 +720,11 @@ fn handle_parent_process(
 					let checksum = blake3::hash(&artifact.as_ref()).to_hex().to_string();
 					Ok(PrepareWorkerSuccess {
 						checksum,
-						stats: PrepareStats { memory_stats, cpu_time_elapsed: cpu_tv },
+						stats: PrepareStats {
+							memory_stats,
+							cpu_time_elapsed: cpu_tv,
+							observed_wasm_code_len,
+						},
 					})
 				},
 			}
diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs
index 8dc96305eadb8f3ced1231889f5fa6c5ffaeac57..a0634106052d7fb197c0662854415190a59a0e70 100644
--- a/polkadot/node/core/pvf/src/error.rs
+++ b/polkadot/node/core/pvf/src/error.rs
@@ -52,6 +52,9 @@ pub enum InvalidCandidate {
 	/// PVF execution (compilation is not included) took more time than was allotted.
 	#[error("invalid: hard timeout")]
 	HardTimeout,
+	/// Proof-of-validity failed to decompress correctly
+	#[error("invalid: PoV failed to decompress")]
+	PoVDecompressionFailure,
 }
 
 /// Possibly transient issue that may resolve after retries.
diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs
index bb00a5a652d6434cd06327803d9ce51a3d5c1e93..11031bf1074a297e246a4e341efe8488a2692b1b 100644
--- a/polkadot/node/core/pvf/src/execute/queue.rs
+++ b/polkadot/node/core/pvf/src/execute/queue.rs
@@ -34,12 +34,14 @@ use polkadot_node_core_pvf_common::{
 	execute::{JobResponse, WorkerError, WorkerResponse},
 	SecurityStatus,
 };
-use polkadot_primitives::{ExecutorParams, ExecutorParamsHash};
+use polkadot_node_primitives::PoV;
+use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, PersistedValidationData};
 use slotmap::HopSlotMap;
 use std::{
 	collections::VecDeque,
 	fmt,
 	path::PathBuf,
+	sync::Arc,
 	time::{Duration, Instant},
 };
 
@@ -68,7 +70,8 @@ pub enum FromQueue {
 #[derive(Debug)]
 pub struct PendingExecutionRequest {
 	pub exec_timeout: Duration,
-	pub params: Vec<u8>,
+	pub pvd: Arc<PersistedValidationData>,
+	pub pov: Arc<PoV>,
 	pub executor_params: ExecutorParams,
 	pub result_tx: ResultSender,
 }
@@ -76,7 +79,8 @@ pub struct PendingExecutionRequest {
 struct ExecuteJob {
 	artifact: ArtifactPathId,
 	exec_timeout: Duration,
-	params: Vec<u8>,
+	pvd: Arc<PersistedValidationData>,
+	pov: Arc<PoV>,
 	executor_params: ExecutorParams,
 	result_tx: ResultSender,
 	waiting_since: Instant,
@@ -293,18 +297,20 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
 
 fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
 	let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue;
-	let PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } =
+	let PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } =
 		pending_execution_request;
 	gum::debug!(
 		target: LOG_TARGET,
 		validation_code_hash = ?artifact.id.code_hash,
 		"enqueueing an artifact for execution",
 	);
+	queue.metrics.observe_pov_size(pov.block_data.0.len(), true);
 	queue.metrics.execute_enqueued();
 	let job = ExecuteJob {
 		artifact,
 		exec_timeout,
-		params,
+		pvd,
+		pov,
 		executor_params,
 		result_tx,
 		waiting_since: Instant::now(),
@@ -352,15 +358,19 @@ async fn handle_job_finish(
 	artifact_id: ArtifactId,
 	result_tx: ResultSender,
 ) {
-	let (idle_worker, result, duration, sync_channel) = match worker_result {
+	let (idle_worker, result, duration, sync_channel, pov_size) = match worker_result {
 		Ok(WorkerInterfaceResponse {
 			worker_response:
-				WorkerResponse { job_response: JobResponse::Ok { result_descriptor }, duration },
+				WorkerResponse {
+					job_response: JobResponse::Ok { result_descriptor },
+					duration,
+					pov_size,
+				},
 			idle_worker,
 		}) => {
 			// TODO: propagate the soft timeout
 
-			(Some(idle_worker), Ok(result_descriptor), Some(duration), None)
+			(Some(idle_worker), Ok(result_descriptor), Some(duration), None, Some(pov_size))
 		},
 		Ok(WorkerInterfaceResponse {
 			worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. },
@@ -370,6 +380,18 @@ async fn handle_job_finish(
 			Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))),
 			None,
 			None,
+			None,
+		),
+		Ok(WorkerInterfaceResponse {
+			worker_response:
+				WorkerResponse { job_response: JobResponse::PoVDecompressionFailure, .. },
+			idle_worker,
+		}) => (
+			Some(idle_worker),
+			Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)),
+			None,
+			None,
+			None,
 		),
 		Ok(WorkerInterfaceResponse {
 			worker_response:
@@ -393,39 +415,46 @@ async fn handle_job_finish(
 				))),
 				None,
 				Some(result_rx),
+				None,
 			)
 		},
 
 		Err(WorkerInterfaceError::InternalError(err)) |
 		Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) =>
-			(None, Err(ValidationError::Internal(err)), None, None),
+			(None, Err(ValidationError::Internal(err)), None, None, None),
 		// Either the worker or the job timed out. Kill the worker in either case. Treated as
 		// definitely-invalid, because if we timed out, there's no time left for a retry.
 		Err(WorkerInterfaceError::HardTimeout) |
 		Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) =>
-			(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None),
+			(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None, None),
 		// "Maybe invalid" errors (will retry).
 		Err(WorkerInterfaceError::CommunicationErr(_err)) => (
 			None,
 			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
 			None,
 			None,
+			None,
 		),
 		Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => (
 			None,
 			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))),
 			None,
 			None,
+			None,
 		),
 		Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => (
 			None,
 			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))),
 			None,
 			None,
+			None,
 		),
 	};
 
 	queue.metrics.execute_finished();
+	if let Some(pov_size) = pov_size {
+		queue.metrics.observe_pov_size(pov_size as usize, false)
+	}
 	if let Err(ref err) = result {
 		gum::warn!(
 			target: LOG_TARGET,
@@ -573,7 +602,8 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
 				idle,
 				job.artifact.clone(),
 				job.exec_timeout,
-				job.params,
+				job.pvd,
+				job.pov,
 			)
 			.await;
 			QueueEvent::StartWork(worker, result, job.artifact.id, job.result_tx)
diff --git a/polkadot/node/core/pvf/src/execute/worker_interface.rs b/polkadot/node/core/pvf/src/execute/worker_interface.rs
index d15d7c15426ebd63d33aaf1166bc8f000e28eb7a..77bd6bedd75c7f9932bc548d019870f87a1e201e 100644
--- a/polkadot/node/core/pvf/src/execute/worker_interface.rs
+++ b/polkadot/node/core/pvf/src/execute/worker_interface.rs
@@ -32,8 +32,9 @@ use polkadot_node_core_pvf_common::{
 	execute::{Handshake, WorkerError, WorkerResponse},
 	worker_dir, SecurityStatus,
 };
-use polkadot_primitives::ExecutorParams;
-use std::{path::Path, time::Duration};
+use polkadot_node_primitives::PoV;
+use polkadot_primitives::{ExecutorParams, PersistedValidationData};
+use std::{path::Path, sync::Arc, time::Duration};
 use tokio::{io, net::UnixStream};
 
 /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
@@ -123,7 +124,8 @@ pub async fn start_work(
 	worker: IdleWorker,
 	artifact: ArtifactPathId,
 	execution_timeout: Duration,
-	validation_params: Vec<u8>,
+	pvd: Arc<PersistedValidationData>,
+	pov: Arc<PoV>,
 ) -> Result<Response, Error> {
 	let IdleWorker { mut stream, pid, worker_dir } = worker;
 
@@ -137,18 +139,16 @@ pub async fn start_work(
 	);
 
 	with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move {
-		send_request(&mut stream, &validation_params, execution_timeout).await.map_err(
-			|error| {
-				gum::warn!(
-					target: LOG_TARGET,
-					worker_pid = %pid,
-					validation_code_hash = ?artifact.id.code_hash,
-					"failed to send an execute request: {}",
-					error,
-				);
-				Error::InternalError(InternalValidationError::HostCommunication(error.to_string()))
-			},
-		)?;
+		send_request(&mut stream, pvd, pov, execution_timeout).await.map_err(|error| {
+			gum::warn!(
+				target: LOG_TARGET,
+				worker_pid = %pid,
+				validation_code_hash = ?artifact.id.code_hash,
+				"failed to send an execute request: {}",
+				error,
+			);
+			Error::InternalError(InternalValidationError::HostCommunication(error.to_string()))
+		})?;
 
 		// We use a generous timeout here. This is in addition to the one in the child process, in
 		// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
@@ -288,10 +288,12 @@ async fn send_execute_handshake(stream: &mut UnixStream, handshake: Handshake) -
 
 async fn send_request(
 	stream: &mut UnixStream,
-	validation_params: &[u8],
+	pvd: Arc<PersistedValidationData>,
+	pov: Arc<PoV>,
 	execution_timeout: Duration,
 ) -> io::Result<()> {
-	framed_send(stream, validation_params).await?;
+	framed_send(stream, &pvd.encode()).await?;
+	framed_send(stream, &pov.encode()).await?;
 	framed_send(stream, &execution_timeout.encode()).await
 }
 
diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs
index 462631d33b525a7d77817de87711a650517d4204..44a4cba2fbf864d16117ee98ff4034e2469f55a2 100644
--- a/polkadot/node/core/pvf/src/host.rs
+++ b/polkadot/node/core/pvf/src/host.rs
@@ -36,11 +36,14 @@ use polkadot_node_core_pvf_common::{
 	prepare::PrepareSuccess,
 	pvf::PvfPrepData,
 };
+use polkadot_node_primitives::PoV;
 use polkadot_node_subsystem::{SubsystemError, SubsystemResult};
 use polkadot_parachain_primitives::primitives::ValidationResult;
+use polkadot_primitives::PersistedValidationData;
 use std::{
 	collections::HashMap,
 	path::PathBuf,
+	sync::Arc,
 	time::{Duration, SystemTime},
 };
 
@@ -108,7 +111,8 @@ impl ValidationHost {
 		&mut self,
 		pvf: PvfPrepData,
 		exec_timeout: Duration,
-		params: Vec<u8>,
+		pvd: Arc<PersistedValidationData>,
+		pov: Arc<PoV>,
 		priority: Priority,
 		result_tx: ResultSender,
 	) -> Result<(), String> {
@@ -116,7 +120,8 @@ impl ValidationHost {
 			.send(ToHost::ExecutePvf(ExecutePvfInputs {
 				pvf,
 				exec_timeout,
-				params,
+				pvd,
+				pov,
 				priority,
 				result_tx,
 			}))
@@ -147,7 +152,8 @@ enum ToHost {
 struct ExecutePvfInputs {
 	pvf: PvfPrepData,
 	exec_timeout: Duration,
-	params: Vec<u8>,
+	pvd: Arc<PersistedValidationData>,
+	pov: Arc<PoV>,
 	priority: Priority,
 	result_tx: ResultSender,
 }
@@ -539,7 +545,7 @@ async fn handle_execute_pvf(
 	awaiting_prepare: &mut AwaitingPrepare,
 	inputs: ExecutePvfInputs,
 ) -> Result<(), Fatal> {
-	let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs;
+	let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, result_tx } = inputs;
 	let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
 	let executor_params = (*pvf.executor_params()).clone();
 
@@ -558,7 +564,8 @@ async fn handle_execute_pvf(
 							artifact: ArtifactPathId::new(artifact_id, path),
 							pending_execution_request: PendingExecutionRequest {
 								exec_timeout,
-								params,
+								pvd,
+								pov,
 								executor_params,
 								result_tx,
 							},
@@ -587,7 +594,8 @@ async fn handle_execute_pvf(
 						artifact_id,
 						PendingExecutionRequest {
 							exec_timeout,
-							params,
+							pvd,
+							pov,
 							executor_params,
 							result_tx,
 						},
@@ -598,7 +606,7 @@ async fn handle_execute_pvf(
 			ArtifactState::Preparing { .. } => {
 				awaiting_prepare.add(
 					artifact_id,
-					PendingExecutionRequest { exec_timeout, params, executor_params, result_tx },
+					PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx },
 				);
 			},
 			ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
@@ -627,7 +635,8 @@ async fn handle_execute_pvf(
 						artifact_id,
 						PendingExecutionRequest {
 							exec_timeout,
-							params,
+							pvd,
+							pov,
 							executor_params,
 							result_tx,
 						},
@@ -648,7 +657,7 @@ async fn handle_execute_pvf(
 			pvf,
 			priority,
 			artifact_id,
-			PendingExecutionRequest { exec_timeout, params, executor_params, result_tx },
+			PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx },
 		)
 		.await?;
 	}
@@ -770,7 +779,7 @@ async fn handle_prepare_done(
 	// It's finally time to dispatch all the execution requests that were waiting for this artifact
 	// to be prepared.
 	let pending_requests = awaiting_prepare.take(&artifact_id);
-	for PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } in
+	for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } in
 		pending_requests
 	{
 		if result_tx.is_canceled() {
@@ -793,7 +802,8 @@ async fn handle_prepare_done(
 				artifact: ArtifactPathId::new(artifact_id.clone(), &path),
 				pending_execution_request: PendingExecutionRequest {
 					exec_timeout,
-					params,
+					pvd,
+					pov,
 					executor_params,
 					result_tx,
 				},
@@ -967,6 +977,8 @@ pub(crate) mod tests {
 	use assert_matches::assert_matches;
 	use futures::future::BoxFuture;
 	use polkadot_node_core_pvf_common::prepare::PrepareStats;
+	use polkadot_node_primitives::BlockData;
+	use sp_core::H256;
 
 	const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
 	pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
@@ -1223,12 +1235,21 @@ pub(crate) mod tests {
 	async fn execute_pvf_requests() {
 		let mut test = Builder::default().build();
 		let mut host = test.host_handle();
+		let pvd = Arc::new(PersistedValidationData {
+			parent_head: Default::default(),
+			relay_parent_number: 1u32,
+			relay_parent_storage_root: H256::default(),
+			max_pov_size: 4096 * 1024,
+		});
+		let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) });
+		let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) });
 
 		let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf1".to_vec(),
+			pvd.clone(),
+			pov1.clone(),
 			Priority::Normal,
 			result_tx,
 		)
@@ -1239,7 +1260,8 @@ pub(crate) mod tests {
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf1".to_vec(),
+			pvd.clone(),
+			pov1,
 			Priority::Critical,
 			result_tx,
 		)
@@ -1250,7 +1272,8 @@ pub(crate) mod tests {
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(2),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf2".to_vec(),
+			pvd,
+			pov2,
 			Priority::Normal,
 			result_tx,
 		)
@@ -1382,6 +1405,13 @@ pub(crate) mod tests {
 	async fn test_prepare_done() {
 		let mut test = Builder::default().build();
 		let mut host = test.host_handle();
+		let pvd = Arc::new(PersistedValidationData {
+			parent_head: Default::default(),
+			relay_parent_number: 1u32,
+			relay_parent_storage_root: H256::default(),
+			max_pov_size: 4096 * 1024,
+		});
+		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
 
 		// Test mixed cases of receiving execute and precheck requests
 		// for the same PVF.
@@ -1391,7 +1421,8 @@ pub(crate) mod tests {
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf2".to_vec(),
+			pvd.clone(),
+			pov.clone(),
 			Priority::Critical,
 			result_tx,
 		)
@@ -1438,7 +1469,8 @@ pub(crate) mod tests {
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(2),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf2".to_vec(),
+			pvd,
+			pov,
 			Priority::Critical,
 			result_tx,
 		)
@@ -1534,13 +1566,21 @@ pub(crate) mod tests {
 	async fn test_execute_prepare_retry() {
 		let mut test = Builder::default().build();
 		let mut host = test.host_handle();
+		let pvd = Arc::new(PersistedValidationData {
+			parent_head: Default::default(),
+			relay_parent_number: 1u32,
+			relay_parent_storage_root: H256::default(),
+			max_pov_size: 4096 * 1024,
+		});
+		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
 
 		// Submit a execute request that fails.
 		let (result_tx, result_rx) = oneshot::channel();
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf".to_vec(),
+			pvd.clone(),
+			pov.clone(),
 			Priority::Critical,
 			result_tx,
 		)
@@ -1570,7 +1610,8 @@ pub(crate) mod tests {
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf".to_vec(),
+			pvd.clone(),
+			pov.clone(),
 			Priority::Critical,
 			result_tx_2,
 		)
@@ -1592,7 +1633,8 @@ pub(crate) mod tests {
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf".to_vec(),
+			pvd.clone(),
+			pov.clone(),
 			Priority::Critical,
 			result_tx_3,
 		)
@@ -1636,13 +1678,21 @@ pub(crate) mod tests {
 	async fn test_execute_prepare_no_retry() {
 		let mut test = Builder::default().build();
 		let mut host = test.host_handle();
+		let pvd = Arc::new(PersistedValidationData {
+			parent_head: Default::default(),
+			relay_parent_number: 1u32,
+			relay_parent_storage_root: H256::default(),
+			max_pov_size: 4096 * 1024,
+		});
+		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
 
 		// Submit an execute request that fails.
 		let (result_tx, result_rx) = oneshot::channel();
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf".to_vec(),
+			pvd.clone(),
+			pov.clone(),
 			Priority::Critical,
 			result_tx,
 		)
@@ -1672,7 +1722,8 @@ pub(crate) mod tests {
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf".to_vec(),
+			pvd.clone(),
+			pov.clone(),
 			Priority::Critical,
 			result_tx_2,
 		)
@@ -1694,7 +1745,8 @@ pub(crate) mod tests {
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf".to_vec(),
+			pvd.clone(),
+			pov.clone(),
 			Priority::Critical,
 			result_tx_3,
 		)
@@ -1755,12 +1807,20 @@ pub(crate) mod tests {
 	async fn cancellation() {
 		let mut test = Builder::default().build();
 		let mut host = test.host_handle();
+		let pvd = Arc::new(PersistedValidationData {
+			parent_head: Default::default(),
+			relay_parent_number: 1u32,
+			relay_parent_storage_root: H256::default(),
+			max_pov_size: 4096 * 1024,
+		});
+		let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) });
 
 		let (result_tx, result_rx) = oneshot::channel();
 		host.execute_pvf(
 			PvfPrepData::from_discriminator(1),
 			TEST_EXECUTION_TIMEOUT,
-			b"pvf1".to_vec(),
+			pvd,
+			pov,
 			Priority::Normal,
 			result_tx,
 		)
diff --git a/polkadot/node/core/pvf/src/metrics.rs b/polkadot/node/core/pvf/src/metrics.rs
index bc8d300037fe8e1d7c70b575d99b101b8343e94b..c59cab4641805eca5dac8d60e554e21b79c97606 100644
--- a/polkadot/node/core/pvf/src/metrics.rs
+++ b/polkadot/node/core/pvf/src/metrics.rs
@@ -105,6 +105,21 @@ impl Metrics {
 				.observe((memory_stats.peak_tracked_alloc / 1024) as f64);
 		}
 	}
+
+	pub(crate) fn observe_code_size(&self, code_size: usize) {
+		if let Some(metrics) = &self.0 {
+			metrics.code_size.observe(code_size as f64);
+		}
+	}
+
+	pub(crate) fn observe_pov_size(&self, pov_size: usize, compressed: bool) {
+		if let Some(metrics) = &self.0 {
+			metrics
+				.pov_size
+				.with_label_values(&[if compressed { "true" } else { "false" }])
+				.observe(pov_size as f64);
+		}
+	}
 }
 
 #[derive(Clone)]
@@ -129,6 +144,8 @@ struct MetricsInner {
 	preparation_max_resident: prometheus::Histogram,
 	// Peak allocation value, tracked by tracking-allocator
 	preparation_peak_tracked_allocation: prometheus::Histogram,
+	pov_size: prometheus::HistogramVec,
+	code_size: prometheus::Histogram,
 }
 
 impl metrics::Metrics for Metrics {
@@ -323,6 +340,35 @@ impl metrics::Metrics for Metrics {
 				)?,
 				registry,
 			)?,
+			// The following metrics was moved here from the candidate valiidation subsystem.
+			// Names are kept to avoid breaking dashboards and stuff.
+			pov_size: prometheus::register(
+				prometheus::HistogramVec::new(
+					prometheus::HistogramOpts::new(
+						"polkadot_parachain_candidate_validation_pov_size",
+						"The compressed and decompressed size of the proof of validity of a candidate",
+					)
+					.buckets(
+						prometheus::exponential_buckets(16384.0, 2.0, 10)
+							.expect("arguments are always valid; qed"),
+					),
+					&["compressed"],
+				)?,
+				registry,
+			)?,
+			code_size: prometheus::register(
+				prometheus::Histogram::with_opts(
+					prometheus::HistogramOpts::new(
+						"polkadot_parachain_candidate_validation_code_size",
+						"The size of the decompressed WASM validation blob used for checking a candidate",
+					)
+					.buckets(
+						prometheus::exponential_buckets(16384.0, 2.0, 10)
+							.expect("arguments are always valid; qed"),
+					),
+				)?,
+				registry,
+			)?,
 		};
 		Ok(Metrics(Some(inner)))
 	}
diff --git a/polkadot/node/core/pvf/src/prepare/worker_interface.rs b/polkadot/node/core/pvf/src/prepare/worker_interface.rs
index 22ee93319d84db14ed53a89efe872c7121d87cbd..d29d2717c4b6e38239c8357a711a2899f4010274 100644
--- a/polkadot/node/core/pvf/src/prepare/worker_interface.rs
+++ b/polkadot/node/core/pvf/src/prepare/worker_interface.rs
@@ -211,7 +211,7 @@ async fn handle_response(
 	//       https://github.com/paritytech/polkadot-sdk/issues/2399
 	let PrepareWorkerSuccess {
 		checksum: _,
-		stats: PrepareStats { cpu_time_elapsed, memory_stats },
+		stats: PrepareStats { cpu_time_elapsed, memory_stats, observed_wasm_code_len },
 	} = match result.clone() {
 		Ok(result) => result,
 		// Timed out on the child. This should already be logged by the child.
@@ -221,6 +221,8 @@ async fn handle_response(
 		Err(err) => return Outcome::Concluded { worker, result: Err(err) },
 	};
 
+	metrics.observe_code_size(observed_wasm_code_len as usize);
+
 	if cpu_time_elapsed > preparation_timeout {
 		// The job didn't complete within the timeout.
 		gum::warn!(
@@ -267,7 +269,11 @@ async fn handle_response(
 			result: Ok(PrepareSuccess {
 				path: artifact_path,
 				size,
-				stats: PrepareStats { cpu_time_elapsed, memory_stats: memory_stats.clone() },
+				stats: PrepareStats {
+					cpu_time_elapsed,
+					memory_stats: memory_stats.clone(),
+					observed_wasm_code_len,
+				},
 			}),
 		},
 		Err(err) => {
diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs
index 455e8c36c88d7c6718a2ce949c0fc961455d06f4..1a95a28fe07732f51d5a8bdd988ee51c79e66a3e 100644
--- a/polkadot/node/core/pvf/tests/it/adder.rs
+++ b/polkadot/node/core/pvf/tests/it/adder.rs
@@ -18,29 +18,33 @@
 
 use super::TestHost;
 use codec::{Decode, Encode};
+use polkadot_node_primitives::PoV;
 use polkadot_parachain_primitives::primitives::{
-	BlockData as GenericBlockData, HeadData as GenericHeadData, RelayChainBlockNumber,
-	ValidationParams,
+	BlockData as GenericBlockData, HeadData as GenericHeadData,
 };
+use polkadot_primitives::PersistedValidationData;
+use sp_core::H256;
 use test_parachain_adder::{hash_state, BlockData, HeadData};
 
 #[tokio::test]
 async fn execute_good_block_on_parent() {
 	let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
-
 	let block_data = BlockData { state: 0, add: 512 };
+	let pvd = PersistedValidationData {
+		parent_head: GenericHeadData(parent_head.encode()),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let pov = PoV { block_data: GenericBlockData(block_data.encode()) };
 
 	let host = TestHost::new().await;
 
 	let ret = host
 		.validate_candidate(
 			test_parachain_adder::wasm_binary_unwrap(),
-			ValidationParams {
-				parent_head: GenericHeadData(parent_head.encode()),
-				block_data: GenericBlockData(block_data.encode()),
-				relay_parent_number: 1,
-				relay_parent_storage_root: Default::default(),
-			},
+			pvd,
+			pov,
 			Default::default(),
 		)
 		.await
@@ -63,18 +67,20 @@ async fn execute_good_chain_on_parent() {
 	for (number, add) in (0..10).enumerate() {
 		let parent_head =
 			HeadData { number: number as u64, parent_hash, post_state: hash_state(last_state) };
-
 		let block_data = BlockData { state: last_state, add };
+		let pvd = PersistedValidationData {
+			parent_head: GenericHeadData(parent_head.encode()),
+			relay_parent_number: 1u32,
+			relay_parent_storage_root: H256::default(),
+			max_pov_size: 4096 * 1024,
+		};
+		let pov = PoV { block_data: GenericBlockData(block_data.encode()) };
 
 		let ret = host
 			.validate_candidate(
 				test_parachain_adder::wasm_binary_unwrap(),
-				ValidationParams {
-					parent_head: GenericHeadData(parent_head.encode()),
-					block_data: GenericBlockData(block_data.encode()),
-					relay_parent_number: number as RelayChainBlockNumber + 1,
-					relay_parent_storage_root: Default::default(),
-				},
+				pvd,
+				pov,
 				Default::default(),
 			)
 			.await
@@ -94,23 +100,25 @@ async fn execute_good_chain_on_parent() {
 #[tokio::test]
 async fn execute_bad_block_on_parent() {
 	let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
-
 	let block_data = BlockData {
 		state: 256, // start state is wrong.
 		add: 256,
 	};
+	let pvd = PersistedValidationData {
+		parent_head: GenericHeadData(parent_head.encode()),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let pov = PoV { block_data: GenericBlockData(block_data.encode()) };
 
 	let host = TestHost::new().await;
 
 	let _err = host
 		.validate_candidate(
 			test_parachain_adder::wasm_binary_unwrap(),
-			ValidationParams {
-				parent_head: GenericHeadData(parent_head.encode()),
-				block_data: GenericBlockData(block_data.encode()),
-				relay_parent_number: 1,
-				relay_parent_storage_root: Default::default(),
-			},
+			pvd,
+			pov,
 			Default::default(),
 		)
 		.await
@@ -124,15 +132,18 @@ async fn stress_spawn() {
 	async fn execute(host: std::sync::Arc<TestHost>) {
 		let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
 		let block_data = BlockData { state: 0, add: 512 };
+		let pvd = PersistedValidationData {
+			parent_head: GenericHeadData(parent_head.encode()),
+			relay_parent_number: 1u32,
+			relay_parent_storage_root: H256::default(),
+			max_pov_size: 4096 * 1024,
+		};
+		let pov = PoV { block_data: GenericBlockData(block_data.encode()) };
 		let ret = host
 			.validate_candidate(
 				test_parachain_adder::wasm_binary_unwrap(),
-				ValidationParams {
-					parent_head: GenericHeadData(parent_head.encode()),
-					block_data: GenericBlockData(block_data.encode()),
-					relay_parent_number: 1,
-					relay_parent_storage_root: Default::default(),
-				},
+				pvd,
+				pov,
 				Default::default(),
 			)
 			.await
@@ -161,15 +172,18 @@ async fn execute_can_run_serially() {
 	async fn execute(host: std::sync::Arc<TestHost>) {
 		let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
 		let block_data = BlockData { state: 0, add: 512 };
+		let pvd = PersistedValidationData {
+			parent_head: GenericHeadData(parent_head.encode()),
+			relay_parent_number: 1u32,
+			relay_parent_storage_root: H256::default(),
+			max_pov_size: 4096 * 1024,
+		};
+		let pov = PoV { block_data: GenericBlockData(block_data.encode()) };
 		let ret = host
 			.validate_candidate(
 				test_parachain_adder::wasm_binary_unwrap(),
-				ValidationParams {
-					parent_head: GenericHeadData(parent_head.encode()),
-					block_data: GenericBlockData(block_data.encode()),
-					relay_parent_number: 1,
-					relay_parent_storage_root: Default::default(),
-				},
+				pvd,
+				pov,
 				Default::default(),
 			)
 			.await
diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs
index 9ad486657512c7a6dda6c3b907500c074aef7433..a4a0853189579ce96cf96d49f685c3de7de5af00 100644
--- a/polkadot/node/core/pvf/tests/it/main.rs
+++ b/polkadot/node/core/pvf/tests/it/main.rs
@@ -17,7 +17,6 @@
 //! General PVF host integration tests checking the functionality of the PVF host itself.
 
 use assert_matches::assert_matches;
-use codec::Encode as _;
 #[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
 use polkadot_node_core_pvf::SecurityStatus;
 use polkadot_node_core_pvf::{
@@ -25,10 +24,14 @@ use polkadot_node_core_pvf::{
 	PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError,
 	ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
 };
-use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
-use polkadot_primitives::{ExecutorParam, ExecutorParams, PvfExecKind, PvfPrepKind};
+use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT};
+use polkadot_parachain_primitives::primitives::{BlockData, ValidationResult};
+use polkadot_primitives::{
+	ExecutorParam, ExecutorParams, PersistedValidationData, PvfExecKind, PvfPrepKind,
+};
+use sp_core::H256;
 
-use std::{io::Write, time::Duration};
+use std::{io::Write, sync::Arc, time::Duration};
 use tokio::sync::Mutex;
 
 mod adder;
@@ -80,9 +83,6 @@ impl TestHost {
 	) -> Result<(), PrepareError> {
 		let (result_tx, result_rx) = futures::channel::oneshot::channel();
 
-		let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
-			.expect("Compression works");
-
 		self.host
 			.lock()
 			.await
@@ -103,14 +103,12 @@ impl TestHost {
 	async fn validate_candidate(
 		&self,
 		code: &[u8],
-		params: ValidationParams,
+		pvd: PersistedValidationData,
+		pov: PoV,
 		executor_params: ExecutorParams,
 	) -> Result<ValidationResult, ValidationError> {
 		let (result_tx, result_rx) = futures::channel::oneshot::channel();
 
-		let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
-			.expect("Compression works");
-
 		self.host
 			.lock()
 			.await
@@ -122,7 +120,8 @@ impl TestHost {
 					PrepareJobKind::Compilation,
 				),
 				TEST_EXECUTION_TIMEOUT,
-				params.encode(),
+				Arc::new(pvd),
+				Arc::new(pov),
 				polkadot_node_core_pvf::Priority::Normal,
 				result_tx,
 			)
@@ -159,19 +158,17 @@ async fn prepare_job_terminates_on_timeout() {
 #[tokio::test]
 async fn execute_job_terminates_on_timeout() {
 	let host = TestHost::new().await;
+	let pvd = PersistedValidationData {
+		parent_head: Default::default(),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let pov = PoV { block_data: BlockData(Vec::new()) };
 
 	let start = std::time::Instant::now();
 	let result = host
-		.validate_candidate(
-			test_parachain_halt::wasm_binary_unwrap(),
-			ValidationParams {
-				block_data: BlockData(Vec::new()),
-				parent_head: Default::default(),
-				relay_parent_number: 1,
-				relay_parent_storage_root: Default::default(),
-			},
-			Default::default(),
-		)
+		.validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default())
 		.await;
 
 	match result {
@@ -189,24 +186,23 @@ async fn execute_job_terminates_on_timeout() {
 async fn ensure_parallel_execution() {
 	// Run some jobs that do not complete, thus timing out.
 	let host = TestHost::new().await;
+	let pvd = PersistedValidationData {
+		parent_head: Default::default(),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let pov = PoV { block_data: BlockData(Vec::new()) };
 	let execute_pvf_future_1 = host.validate_candidate(
 		test_parachain_halt::wasm_binary_unwrap(),
-		ValidationParams {
-			block_data: BlockData(Vec::new()),
-			parent_head: Default::default(),
-			relay_parent_number: 1,
-			relay_parent_storage_root: Default::default(),
-		},
+		pvd.clone(),
+		pov.clone(),
 		Default::default(),
 	);
 	let execute_pvf_future_2 = host.validate_candidate(
 		test_parachain_halt::wasm_binary_unwrap(),
-		ValidationParams {
-			block_data: BlockData(Vec::new()),
-			parent_head: Default::default(),
-			relay_parent_number: 1,
-			relay_parent_storage_root: Default::default(),
-		},
+		pvd,
+		pov,
 		Default::default(),
 	);
 
@@ -237,6 +233,13 @@ async fn execute_queue_doesnt_stall_if_workers_died() {
 		cfg.execute_workers_max_num = 5;
 	})
 	.await;
+	let pvd = PersistedValidationData {
+		parent_head: Default::default(),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let pov = PoV { block_data: BlockData(Vec::new()) };
 
 	// Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The
 	// first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of
@@ -245,12 +248,8 @@ async fn execute_queue_doesnt_stall_if_workers_died() {
 	futures::future::join_all((0u8..=8).map(|_| {
 		host.validate_candidate(
 			test_parachain_halt::wasm_binary_unwrap(),
-			ValidationParams {
-				block_data: BlockData(Vec::new()),
-				parent_head: Default::default(),
-				relay_parent_number: 1,
-				relay_parent_storage_root: Default::default(),
-			},
+			pvd.clone(),
+			pov.clone(),
 			Default::default(),
 		)
 	}))
@@ -275,6 +274,13 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() {
 		cfg.execute_workers_max_num = 2;
 	})
 	.await;
+	let pvd = PersistedValidationData {
+		parent_head: Default::default(),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let pov = PoV { block_data: BlockData(Vec::new()) };
 
 	let executor_params_1 = ExecutorParams::default();
 	let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]);
@@ -288,12 +294,8 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() {
 	futures::future::join_all((0u8..6).map(|i| {
 		host.validate_candidate(
 			test_parachain_halt::wasm_binary_unwrap(),
-			ValidationParams {
-				block_data: BlockData(Vec::new()),
-				parent_head: Default::default(),
-				relay_parent_number: 1,
-				relay_parent_storage_root: Default::default(),
-			},
+			pvd.clone(),
+			pov.clone(),
 			match i % 3 {
 				0 => executor_params_1.clone(),
 				_ => executor_params_2.clone(),
@@ -324,6 +326,13 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() {
 async fn deleting_prepared_artifact_does_not_dispute() {
 	let host = TestHost::new().await;
 	let cache_dir = host.cache_dir.path();
+	let pvd = PersistedValidationData {
+		parent_head: Default::default(),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let pov = PoV { block_data: BlockData(Vec::new()) };
 
 	let _stats = host
 		.precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default())
@@ -347,16 +356,7 @@ async fn deleting_prepared_artifact_does_not_dispute() {
 
 	// Try to validate, artifact should get recreated.
 	let result = host
-		.validate_candidate(
-			test_parachain_halt::wasm_binary_unwrap(),
-			ValidationParams {
-				block_data: BlockData(Vec::new()),
-				parent_head: Default::default(),
-				relay_parent_number: 1,
-				relay_parent_storage_root: Default::default(),
-			},
-			Default::default(),
-		)
+		.validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default())
 		.await;
 
 	assert_matches!(result, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)));
@@ -367,6 +367,13 @@ async fn deleting_prepared_artifact_does_not_dispute() {
 async fn corrupted_prepared_artifact_does_not_dispute() {
 	let host = TestHost::new().await;
 	let cache_dir = host.cache_dir.path();
+	let pvd = PersistedValidationData {
+		parent_head: Default::default(),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let pov = PoV { block_data: BlockData(Vec::new()) };
 
 	let _stats = host
 		.precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default())
@@ -400,16 +407,7 @@ async fn corrupted_prepared_artifact_does_not_dispute() {
 
 	// Try to validate, artifact should get removed because of the corruption.
 	let result = host
-		.validate_candidate(
-			test_parachain_halt::wasm_binary_unwrap(),
-			ValidationParams {
-				block_data: BlockData(Vec::new()),
-				parent_head: Default::default(),
-				relay_parent_number: 1,
-				relay_parent_storage_root: Default::default(),
-			},
-			Default::default(),
-		)
+		.validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default())
 		.await;
 
 	assert_matches!(
@@ -652,3 +650,65 @@ async fn artifact_does_reprepare_on_meaningful_exec_parameter_change() {
 
 	assert_eq!(cache_dir_contents.len(), 3); // new artifact has been added
 }
+
+// Checks that we cannot prepare oversized compressed code
+#[tokio::test]
+async fn invalid_compressed_code_fails_prechecking() {
+	let host = TestHost::new().await;
+	let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1];
+	let validation_code =
+		sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap();
+
+	let res = host.precheck_pvf(&validation_code, Default::default()).await;
+
+	assert_matches!(res, Err(PrepareError::CouldNotDecompressCodeBlob(_)));
+}
+
+// Checks that we cannot validate with oversized compressed code
+#[tokio::test]
+async fn invalid_compressed_code_fails_validation() {
+	let host = TestHost::new().await;
+	let pvd = PersistedValidationData {
+		parent_head: Default::default(),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let pov = PoV { block_data: BlockData(Vec::new()) };
+
+	let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1];
+	let validation_code =
+		sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap();
+
+	let result = host.validate_candidate(&validation_code, pvd, pov, Default::default()).await;
+
+	assert_matches!(
+		result,
+		Err(ValidationError::Preparation(PrepareError::CouldNotDecompressCodeBlob(_)))
+	);
+}
+
+// Checks that we cannot validate with an oversized PoV
+#[tokio::test]
+async fn invalid_compressed_pov_fails_validation() {
+	let host = TestHost::new().await;
+	let pvd = PersistedValidationData {
+		parent_head: Default::default(),
+		relay_parent_number: 1u32,
+		relay_parent_storage_root: H256::default(),
+		max_pov_size: 4096 * 1024,
+	};
+	let raw_block_data = vec![1u8; POV_BOMB_LIMIT + 1];
+	let block_data =
+		sp_maybe_compressed_blob::compress(&raw_block_data, POV_BOMB_LIMIT + 1).unwrap();
+	let pov = PoV { block_data: BlockData(block_data) };
+
+	let result = host
+		.validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default())
+		.await;
+
+	assert_matches!(
+		result,
+		Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure))
+	);
+}
diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs
index b8fd2cdce0ce67e0b1cfbfc57634bfa4d8c63d46..b3023c8a45c3b3087015e714c22f9bd0474d251e 100644
--- a/polkadot/node/core/pvf/tests/it/process.rs
+++ b/polkadot/node/core/pvf/tests/it/process.rs
@@ -23,11 +23,14 @@ use codec::Encode;
 use polkadot_node_core_pvf::{
 	InvalidCandidate, PossiblyInvalidError, PrepareError, ValidationError,
 };
+use polkadot_node_primitives::PoV;
 use polkadot_parachain_primitives::primitives::{
-	BlockData as GenericBlockData, HeadData as GenericHeadData, ValidationParams,
+	BlockData as GenericBlockData, HeadData as GenericHeadData,
 };
+use polkadot_primitives::PersistedValidationData;
 use procfs::process;
 use rusty_fork::rusty_fork_test;
+use sp_core::H256;
 use std::{future::Future, sync::Arc, time::Duration};
 use test_parachain_adder::{hash_state, BlockData, HeadData};
 
@@ -125,15 +128,18 @@ rusty_fork_test! {
 		test_wrapper(|host, _sid| async move {
 			let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
 			let block_data = BlockData { state: 0, add: 512 };
+			let pvd = PersistedValidationData {
+				parent_head: GenericHeadData(parent_head.encode()),
+				relay_parent_number: 1u32,
+				relay_parent_storage_root: H256::default(),
+				max_pov_size: 4096 * 1024,
+			};
+			let pov = PoV { block_data: GenericBlockData(block_data.encode()) };
 			host
 				.validate_candidate(
 					test_parachain_adder::wasm_binary_unwrap(),
-					ValidationParams {
-						parent_head: GenericHeadData(parent_head.encode()),
-						block_data: GenericBlockData(block_data.encode()),
-						relay_parent_number: 1,
-						relay_parent_storage_root: Default::default(),
-					},
+					pvd,
+					pov,
 					Default::default(),
 				)
 				.await
@@ -166,17 +172,20 @@ rusty_fork_test! {
 			// Prepare the artifact ahead of time.
 			let binary = test_parachain_halt::wasm_binary_unwrap();
 			host.precheck_pvf(binary, Default::default()).await.unwrap();
+			let pvd = PersistedValidationData {
+				parent_head: GenericHeadData(HeadData::default().encode()),
+				relay_parent_number: 1u32,
+				relay_parent_storage_root: H256::default(),
+				max_pov_size: 4096 * 1024,
+			};
+			let pov = PoV { block_data: GenericBlockData(Vec::new()) };
 
 			let (result, _) = futures::join!(
 				// Choose an job that would normally take the entire timeout.
 				host.validate_candidate(
 					binary,
-					ValidationParams {
-						block_data: GenericBlockData(Vec::new()),
-						parent_head: Default::default(),
-						relay_parent_number: 1,
-						relay_parent_storage_root: Default::default(),
-					},
+					pvd,
+					pov,
 					Default::default(),
 				),
 				// Send a stop signal to pause the worker.
@@ -218,17 +227,20 @@ rusty_fork_test! {
 			// Prepare the artifact ahead of time.
 			let binary = test_parachain_halt::wasm_binary_unwrap();
 			host.precheck_pvf(binary, Default::default()).await.unwrap();
+			let pvd = PersistedValidationData {
+				parent_head: GenericHeadData(HeadData::default().encode()),
+				relay_parent_number: 1u32,
+				relay_parent_storage_root: H256::default(),
+				max_pov_size: 4096 * 1024,
+			};
+			let pov = PoV { block_data: GenericBlockData(Vec::new()) };
 
 			let (result, _) = futures::join!(
 				// Choose an job that would normally take the entire timeout.
 				host.validate_candidate(
 					binary,
-					ValidationParams {
-						block_data: GenericBlockData(Vec::new()),
-						parent_head: Default::default(),
-						relay_parent_number: 1,
-						relay_parent_storage_root: Default::default(),
-					},
+					pvd,
+					pov,
 					Default::default(),
 				),
 				// Run a future that kills the job while it's running.
@@ -274,17 +286,20 @@ rusty_fork_test! {
 			// Prepare the artifact ahead of time.
 			let binary = test_parachain_halt::wasm_binary_unwrap();
 			host.precheck_pvf(binary, Default::default()).await.unwrap();
+			let pvd = PersistedValidationData {
+				parent_head: GenericHeadData(HeadData::default().encode()),
+				relay_parent_number: 1u32,
+				relay_parent_storage_root: H256::default(),
+				max_pov_size: 4096 * 1024,
+			};
+			let pov = PoV { block_data: GenericBlockData(Vec::new()) };
 
 			let (result, _) = futures::join!(
 				// Choose a job that would normally take the entire timeout.
 				host.validate_candidate(
 					binary,
-					ValidationParams {
-						block_data: GenericBlockData(Vec::new()),
-						parent_head: Default::default(),
-						relay_parent_number: 1,
-						relay_parent_storage_root: Default::default(),
-					},
+					pvd,
+					pov,
 					Default::default(),
 				),
 				// Run a future that kills the job while it's running.
@@ -342,17 +357,20 @@ rusty_fork_test! {
 			// Prepare the artifact ahead of time.
 			let binary = test_parachain_halt::wasm_binary_unwrap();
 			host.precheck_pvf(binary, Default::default()).await.unwrap();
+			let pvd = PersistedValidationData {
+				parent_head: GenericHeadData(HeadData::default().encode()),
+				relay_parent_number: 1u32,
+				relay_parent_storage_root: H256::default(),
+				max_pov_size: 4096 * 1024,
+			};
+			let pov = PoV { block_data: GenericBlockData(Vec::new()) };
 
 			let _ = futures::join!(
 				// Choose a job that would normally take the entire timeout.
 				host.validate_candidate(
 					binary,
-					ValidationParams {
-						block_data: GenericBlockData(Vec::new()),
-						parent_head: Default::default(),
-						relay_parent_number: 1,
-						relay_parent_storage_root: Default::default(),
-					},
+					pvd,
+					pov,
 					Default::default(),
 				),
 				// Run a future that tests the thread count while the worker is running.
diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 4e13d5eda76f61c646ad0814e1ec3d8068a11ca1..baaff9c7c9f63c5cff812e5ae01097ca538c4d75 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -466,7 +466,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
 	message_capacity=2048,
 )]
 pub struct Overseer<SupportsParachains> {
-	#[subsystem(blocking, CandidateValidationMessage, sends: [
+	#[subsystem(CandidateValidationMessage, sends: [
 		RuntimeApiMessage,
 	])]
 	candidate_validation: CandidateValidation,
diff --git a/prdoc/pr_5142.prdoc b/prdoc/pr_5142.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..4083e5bf53cdb420cdb236e920b5e409b6c82149
--- /dev/null
+++ b/prdoc/pr_5142.prdoc
@@ -0,0 +1,26 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: "Move decompression to worker processes"
+
+doc:
+  - audience: Node Dev
+    description: |
+      Candidate validation subsystem performed the PVF code decompression as well as the PoV
+      decompression itself which might affect the subsystem main loop performance and required
+      it to run on the blocking threadpool. This change moves the decompression to PVF host
+      workers running synchronously in separate processes.
+
+crates:
+  - name: polkadot-node-core-candidate-validation
+    bump: patch
+  - name: polkadot-overseer
+    bump: patch
+  - name: polkadot-node-core-pvf
+    bump: major
+  - name: polkadot-node-core-pvf-common
+    bump: major
+  - name: polkadot-node-core-pvf-execute-worker
+    bump: patch
+  - name: polkadot-node-core-pvf-prepare-worker
+    bump: patch