diff --git a/.gitlab/pipeline/zombienet.yml b/.gitlab/pipeline/zombienet.yml
index 558cc549cb08e62fdf2723e601e45f67306f8fe6..d5845611c60d14f619c5a27d68822967a23474e4 100644
--- a/.gitlab/pipeline/zombienet.yml
+++ b/.gitlab/pipeline/zombienet.yml
@@ -1,7 +1,7 @@
 .zombienet-refs:
   extends: .build-refs
   variables:
-    ZOMBIENET_IMAGE: "docker.io/paritytech/zombienet:v1.3.83"
+    ZOMBIENET_IMAGE: "docker.io/paritytech/zombienet:v1.3.86"
 
 include:
   # substrate tests
diff --git a/Cargo.lock b/Cargo.lock
index e9c334efff0f9c50f8bbdad9b93c5820102d4813..f8e1401d03ba8033dacf585c6644efdf81950b47 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -11846,6 +11846,7 @@ dependencies = [
 name = "polkadot-cli"
 version = "1.1.0"
 dependencies = [
+ "cfg-if",
  "clap 4.4.10",
  "frame-benchmarking-cli",
  "futures",
@@ -12346,6 +12347,7 @@ dependencies = [
  "tempfile",
  "test-parachain-adder",
  "test-parachain-halt",
+ "thiserror",
  "tokio",
  "tracing-gum",
 ]
diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs
index 4096cd2523f150ea5301a0d087f949ae54905b56..d384c9d9bd22028e835c80870306b2b0bf88121b 100644
--- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs
+++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs
@@ -291,6 +291,7 @@ fn build_polkadot_full_node(
 
 			// Cumulus doesn't spawn PVF workers, so we can disable version checks.
 			node_version: None,
+			secure_validator_mode: false,
 			workers_path: None,
 			workers_names: None,
 
diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml
index 0882d20f2b7386f22c276bc82e2b460e80e40263..72b2a18f36b343b5620ed37bd9d4b20725780c60 100644
--- a/polkadot/cli/Cargo.toml
+++ b/polkadot/cli/Cargo.toml
@@ -15,6 +15,7 @@ wasm-opt = false
 crate-type = ["cdylib", "rlib"]
 
 [dependencies]
+cfg-if = "1.0"
 clap = { version = "4.4.10", features = ["derive"], optional = true }
 log = "0.4.17"
 thiserror = "1.0.48"
diff --git a/polkadot/cli/src/cli.rs b/polkadot/cli/src/cli.rs
index f1e0d1e99d7d2d25aa07092cc92afad931dac61c..30f35ebcb6ffa95f0f2384821e168353d95df94b 100644
--- a/polkadot/cli/src/cli.rs
+++ b/polkadot/cli/src/cli.rs
@@ -88,6 +88,12 @@ pub struct RunCmd {
 	#[arg(long)]
 	pub no_beefy: bool,
 
+	/// Allows a validator to run insecurely outside of Secure Validator Mode. Security features
+	/// are still enabled on a best-effort basis, but missing features are no longer required. For
+	/// more information see <https://github.com/w3f/polkadot-wiki/issues/4881>.
+	#[arg(long = "insecure-validator-i-know-what-i-do", requires = "validator")]
+	pub insecure_validator: bool,
+
 	/// Enable the block authoring backoff that is triggered when finality is lagging.
 	#[arg(long)]
 	pub force_authoring_backoff: bool,
diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs
index ba41383c279cf995852b216fc6a5caa437cf6a98..018400fbcf8bf0513ebd3cae3c6ae6e6a0230cd3 100644
--- a/polkadot/cli/src/command.rs
+++ b/polkadot/cli/src/command.rs
@@ -238,6 +238,8 @@ where
 	let node_version =
 		if cli.run.disable_worker_version_check { None } else { Some(NODE_VERSION.to_string()) };
 
+	let secure_validator_mode = cli.run.base.validator && !cli.run.insecure_validator;
+
 	runner.run_node_until_exit(move |config| async move {
 		let hwbench = (!cli.run.no_hardware_benchmarks)
 			.then_some(config.database.path().map(|database_path| {
@@ -256,6 +258,7 @@ where
 				jaeger_agent,
 				telemetry_worker_handle: None,
 				node_version,
+				secure_validator_mode,
 				workers_path: cli.run.workers_path,
 				workers_names: None,
 				overseer_gen,
diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs
index 9f7b17f6129968dca8d85f45ae9ca2f30e261455..5c4e449b2c9025ec1b22ee4fda36331c245d3551 100644
--- a/polkadot/node/core/candidate-validation/src/lib.rs
+++ b/polkadot/node/core/candidate-validation/src/lib.rs
@@ -88,6 +88,8 @@ pub struct Config {
 	pub artifacts_cache_path: PathBuf,
 	/// The version of the node. `None` can be passed to skip the version check (only for tests).
 	pub node_version: Option<String>,
+	/// Whether the node is attempting to run as a secure validator.
+	pub secure_validator_mode: bool,
 	/// Path to the preparation worker binary
 	pub prep_worker_path: PathBuf,
 	/// Path to the execution worker binary
@@ -133,12 +135,19 @@ async fn run<Context>(
 	mut ctx: Context,
 	metrics: Metrics,
 	pvf_metrics: polkadot_node_core_pvf::Metrics,
-	Config { artifacts_cache_path, node_version, prep_worker_path, exec_worker_path }: Config,
+	Config {
+		artifacts_cache_path,
+		node_version,
+		secure_validator_mode,
+		prep_worker_path,
+		exec_worker_path,
+	}: Config,
 ) -> SubsystemResult<()> {
 	let (validation_host, task) = polkadot_node_core_pvf::start(
 		polkadot_node_core_pvf::Config::new(
 			artifacts_cache_path,
 			node_version,
+			secure_validator_mode,
 			prep_worker_path,
 			exec_worker_path,
 		),
diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml
index 2d15a25b88737ed79d2703e39fa535574c9e7ea4..a1e70eabc0e7110528aab78a23344a04470b4292 100644
--- a/polkadot/node/core/pvf/Cargo.toml
+++ b/polkadot/node/core/pvf/Cargo.toml
@@ -19,6 +19,7 @@ pin-project = "1.0.9"
 rand = "0.8.5"
 slotmap = "1.0"
 tempfile = "3.3.0"
+thiserror = "1.0.31"
 tokio = { version = "1.24.2", features = ["fs", "process"] }
 
 parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
diff --git a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
index e625a2303b5e533b3ccb7364c355dbe9d4193847..368649a8d71358c6dd02bdec7427e7b4bc7b696b 100644
--- a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
+++ b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
@@ -28,6 +28,8 @@ use tokio::{runtime::Handle, sync::Mutex};
 const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
 
 struct TestHost {
+	// Keep a reference to the tempdir as it gets deleted on drop.
+	cache_dir: tempfile::TempDir,
 	host: Mutex<ValidationHost>,
 }
 
@@ -42,13 +44,14 @@ impl TestHost {
 		let mut config = Config::new(
 			cache_dir.path().to_owned(),
 			None,
+			false,
 			prepare_worker_path,
 			execute_worker_path,
 		);
 		f(&mut config);
 		let (host, task) = start(config, Metrics::default()).await.unwrap();
 		let _ = handle.spawn(task);
-		Self { host: Mutex::new(host) }
+		Self { host: Mutex::new(host), cache_dir }
 	}
 
 	async fn precheck_pvf(
diff --git a/polkadot/node/core/pvf/common/src/lib.rs b/polkadot/node/core/pvf/common/src/lib.rs
index dced43ef21340379d9ea4fc8d766d6e986c242a7..abebd06f71a45738402909a53f795a75867e58d7 100644
--- a/polkadot/node/core/pvf/common/src/lib.rs
+++ b/polkadot/node/core/pvf/common/src/lib.rs
@@ -33,6 +33,7 @@ const LOG_TARGET: &str = "parachain::pvf-common";
 
 pub const RUNTIME_VERSION: &str = env!("SUBSTRATE_WASMTIME_VERSION");
 
+use parity_scale_codec::{Decode, Encode};
 use std::{
 	io::{self, Read, Write},
 	mem,
@@ -47,8 +48,11 @@ pub mod tests {
 }
 
 /// Status of security features on the current system.
-#[derive(Debug, Clone, Default, PartialEq, Eq)]
+#[derive(Debug, Clone, Default, PartialEq, Eq, Encode, Decode)]
 pub struct SecurityStatus {
+	/// Whether Secure Validator Mode is enabled. This mode enforces that all required security
+	/// features are present. All features are enabled on a best-effort basis regardless.
+	pub secure_validator_mode: bool,
 	/// Whether the landlock features we use are fully available on this system.
 	pub can_enable_landlock: bool,
 	/// Whether the seccomp features we use are fully available on this system.
@@ -57,6 +61,12 @@ pub struct SecurityStatus {
 	pub can_unshare_user_namespace_and_change_root: bool,
 }
 
+/// A handshake with information for the worker.
+#[derive(Debug, Encode, Decode)]
+pub struct WorkerHandshake {
+	pub security_status: SecurityStatus,
+}
+
 /// Write some data prefixed by its length into `w`. Sync version of `framed_send` to avoid
 /// dependency on tokio.
 pub fn framed_send_blocking(w: &mut (impl Write + Unpin), buf: &[u8]) -> io::Result<()> {
diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs
index d4f9bbc27ea62e930dd98ef3a277f9eac6ada7d8..5e7deb5ca782e91ad19dd492e013c43fd12a9237 100644
--- a/polkadot/node/core/pvf/common/src/worker/mod.rs
+++ b/polkadot/node/core/pvf/common/src/worker/mod.rs
@@ -18,9 +18,10 @@
 
 pub mod security;
 
-use crate::{SecurityStatus, LOG_TARGET};
+use crate::{framed_recv_blocking, WorkerHandshake, LOG_TARGET};
 use cpu_time::ProcessTime;
 use futures::never::Never;
+use parity_scale_codec::Decode;
 use std::{
 	any::Any,
 	fmt, io,
@@ -50,8 +51,6 @@ macro_rules! decl_worker_main {
 			#[cfg(target_os = "linux")]
 			use $crate::worker::security;
 
-			// TODO: Remove this dependency, and `pub use sp_tracing` in `lib.rs`.
-			// See <https://github.com/paritytech/polkadot/issues/7117>.
 			$crate::sp_tracing::try_init_simple();
 
 			let worker_pid = std::process::id();
@@ -79,14 +78,26 @@ macro_rules! decl_worker_main {
 
 				"--check-can-enable-landlock" => {
 					#[cfg(target_os = "linux")]
-					let status = if security::landlock::check_is_fully_enabled() { 0 } else { -1 };
+					let status = if let Err(err) = security::landlock::check_is_fully_enabled() {
+						// Write the error to stderr, log it on the host-side.
+						eprintln!("{}", err);
+						-1
+					} else {
+						0
+					};
 					#[cfg(not(target_os = "linux"))]
 					let status = -1;
 					std::process::exit(status)
 				},
 				"--check-can-enable-seccomp" => {
 					#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
-					let status = if security::seccomp::check_is_fully_enabled() { 0 } else { -1 };
+					let status = if let Err(err) = security::seccomp::check_is_fully_enabled() {
+						// Write the error to stderr, log it on the host-side.
+						eprintln!("{}", err);
+						-1
+					} else {
+						0
+					};
 					#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))]
 					let status = -1;
 					std::process::exit(status)
@@ -95,11 +106,9 @@ macro_rules! decl_worker_main {
 					#[cfg(target_os = "linux")]
 					let cache_path_tempdir = std::path::Path::new(&args[2]);
 					#[cfg(target_os = "linux")]
-					let status = if let Err(err) = security::unshare_user_namespace_and_change_root(
-						$crate::worker::WorkerKind::CheckPivotRoot,
-						worker_pid,
-						&cache_path_tempdir,
-					) {
+					let status = if let Err(err) =
+						security::change_root::check_is_fully_enabled(&cache_path_tempdir)
+					{
 						// Write the error to stderr, log it on the host-side.
 						eprintln!("{}", err);
 						-1
@@ -107,11 +116,7 @@ macro_rules! decl_worker_main {
 						0
 					};
 					#[cfg(not(target_os = "linux"))]
-					let status = {
-						// Write the error to stderr, log it on the host-side.
-						eprintln!("not available on macos");
-						-1
-					};
+					let status = -1;
 					std::process::exit(status)
 				},
 
@@ -134,9 +139,6 @@ macro_rules! decl_worker_main {
 			let mut socket_path = None;
 			let mut worker_dir_path = None;
 			let mut node_version = None;
-			let mut can_enable_landlock = false;
-			let mut can_enable_seccomp = false;
-			let mut can_unshare_user_namespace_and_change_root = false;
 
 			let mut i = 2;
 			while i < args.len() {
@@ -153,10 +155,6 @@ macro_rules! decl_worker_main {
 						node_version = Some(args[i + 1].as_str());
 						i += 1
 					},
-					"--can-enable-landlock" => can_enable_landlock = true,
-					"--can-enable-seccomp" => can_enable_seccomp = true,
-					"--can-unshare-user-namespace-and-change-root" =>
-						can_unshare_user_namespace_and_change_root = true,
 					arg => panic!("Unexpected argument found: {}", arg),
 				}
 				i += 1;
@@ -167,19 +165,8 @@ macro_rules! decl_worker_main {
 
 			let socket_path = std::path::Path::new(socket_path).to_owned();
 			let worker_dir_path = std::path::Path::new(worker_dir_path).to_owned();
-			let security_status = $crate::SecurityStatus {
-				can_enable_landlock,
-				can_enable_seccomp,
-				can_unshare_user_namespace_and_change_root,
-			};
-
-			$entrypoint(
-				socket_path,
-				worker_dir_path,
-				node_version,
-				Some($worker_version),
-				security_status,
-			);
+
+			$entrypoint(socket_path, worker_dir_path, node_version, Some($worker_version));
 		}
 	};
 }
@@ -205,73 +192,75 @@ impl fmt::Display for WorkerKind {
 	}
 }
 
+// Some fields are only used for logging, and dead-code analysis ignores Debug.
+#[allow(dead_code)]
+#[derive(Debug)]
+pub struct WorkerInfo {
+	pid: u32,
+	kind: WorkerKind,
+	version: Option<String>,
+	worker_dir_path: PathBuf,
+}
+
 // NOTE: The worker version must be passed in so that we accurately get the version of the worker,
 // and not the version that this crate was compiled with.
 //
 // NOTE: This must not spawn any threads due to safety requirements in `event_loop` and to avoid
-// errors in [`security::unshare_user_namespace_and_change_root`].
+// errors in [`security::change_root::try_restrict`].
 //
 /// Initializes the worker process, then runs the given event loop, which spawns a new job process
 /// to securely handle each incoming request.
 pub fn run_worker<F>(
 	worker_kind: WorkerKind,
 	socket_path: PathBuf,
-	#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf,
+	worker_dir_path: PathBuf,
 	node_version: Option<&str>,
 	worker_version: Option<&str>,
-	security_status: &SecurityStatus,
 	mut event_loop: F,
 ) where
 	F: FnMut(UnixStream, PathBuf) -> io::Result<Never>,
 {
-	let worker_pid = std::process::id();
+	#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
+	let mut worker_info = WorkerInfo {
+		pid: std::process::id(),
+		kind: worker_kind,
+		version: worker_version.map(|v| v.to_string()),
+		worker_dir_path,
+	};
 	gum::debug!(
 		target: LOG_TARGET,
-		%worker_pid,
+		?worker_info,
 		?socket_path,
-		?worker_dir_path,
-		?security_status,
 		"starting pvf worker ({})",
-		worker_kind
+		worker_info.kind
 	);
 
 	// Check for a mismatch between the node and worker versions.
-	if let (Some(node_version), Some(worker_version)) = (node_version, worker_version) {
+	if let (Some(node_version), Some(worker_version)) = (node_version, &worker_info.version) {
 		if node_version != worker_version {
 			gum::error!(
 				target: LOG_TARGET,
-				%worker_kind,
-				%worker_pid,
+				?worker_info,
 				%node_version,
-				%worker_version,
 				"Node and worker version mismatch, node needs restarting, forcing shutdown",
 			);
 			kill_parent_node_in_emergency();
-			worker_shutdown_message(worker_kind, worker_pid, "Version mismatch");
-			return
+			worker_shutdown(worker_info, "Version mismatch");
 		}
 	}
 
 	// Make sure that we can read the worker dir path, and log its contents.
 	let entries = || -> Result<Vec<_>, io::Error> {
-		std::fs::read_dir(&worker_dir_path)?
+		std::fs::read_dir(&worker_info.worker_dir_path)?
 			.map(|res| res.map(|e| e.file_name()))
 			.collect()
 	}();
 	match entries {
 		Ok(entries) =>
-			gum::trace!(target: LOG_TARGET, %worker_pid, ?worker_dir_path, "content of worker dir: {:?}", entries),
+			gum::trace!(target: LOG_TARGET, ?worker_info, "content of worker dir: {:?}", entries),
 		Err(err) => {
-			gum::error!(
-				target: LOG_TARGET,
-				%worker_kind,
-				%worker_pid,
-				?worker_dir_path,
-				"Could not read worker dir: {}",
-				err.to_string()
-			);
-			worker_shutdown_message(worker_kind, worker_pid, &err.to_string());
-			return
+			let err = format!("Could not read worker dir: {}", err.to_string());
+			worker_shutdown_error(worker_info, &err);
 		},
 	}
 
@@ -281,23 +270,20 @@ pub fn run_worker<F>(
 		let _ = std::fs::remove_file(&socket_path);
 		Ok(stream)
 	}();
-	let stream = match stream {
-		Ok(s) => s,
-		Err(err) => {
-			gum::error!(
-				target: LOG_TARGET,
-				%worker_kind,
-				%worker_pid,
-				"{}",
-				err
-			);
-			worker_shutdown_message(worker_kind, worker_pid, &err.to_string());
-			return
-		},
+	let mut stream = match stream {
+		Ok(ok) => ok,
+		Err(err) => worker_shutdown_error(worker_info, &err.to_string()),
+	};
+
+	let WorkerHandshake { security_status } = match recv_worker_handshake(&mut stream) {
+		Ok(ok) => ok,
+		Err(err) => worker_shutdown_error(worker_info, &err.to_string()),
 	};
 
 	// Enable some security features.
 	{
+		gum::trace!(target: LOG_TARGET, ?security_status, "Enabling security features");
+
 		// Call based on whether we can change root. Error out if it should work but fails.
 		//
 		// NOTE: This should not be called in a multi-threaded context (i.e. inside the tokio
@@ -306,39 +292,29 @@ pub fn run_worker<F>(
 		//       > CLONE_NEWUSER requires that the calling process is not threaded.
 		#[cfg(target_os = "linux")]
 		if security_status.can_unshare_user_namespace_and_change_root {
-			if let Err(err) = security::unshare_user_namespace_and_change_root(
-				worker_kind,
-				worker_pid,
-				&worker_dir_path,
-			) {
-				// The filesystem may be in an inconsistent state, bail out.
-				gum::error!(
-					target: LOG_TARGET,
-					%worker_kind,
-					%worker_pid,
-					?worker_dir_path,
-					"Could not change root to be the worker cache path: {}",
-					err
-				);
-				worker_shutdown_message(worker_kind, worker_pid, &err);
-				return
+			if let Err(err) = security::change_root::enable_for_worker(&worker_info) {
+				// The filesystem may be in an inconsistent state, always bail out.
+				let err = format!("Could not change root to be the worker cache path: {}", err);
+				worker_shutdown_error(worker_info, &err);
 			}
-			worker_dir_path = std::path::Path::new("/").to_owned();
+			worker_info.worker_dir_path = std::path::Path::new("/").to_owned();
 		}
 
 		#[cfg(target_os = "linux")]
 		if security_status.can_enable_landlock {
-			let landlock_status =
-				security::landlock::enable_for_worker(worker_kind, worker_pid, &worker_dir_path);
-			if !matches!(landlock_status, Ok(landlock::RulesetStatus::FullyEnforced)) {
-				// We previously were able to enable, so this should never happen.
+			if let Err(err) = security::landlock::enable_for_worker(&worker_info) {
+				// We previously were able to enable, so this should never happen. Shutdown if
+				// running in secure mode.
+				let err = format!("could not fully enable landlock: {:?}", err);
 				gum::error!(
 					target: LOG_TARGET,
-					%worker_kind,
-					%worker_pid,
-					"could not fully enable landlock: {:?}. This should not happen, please report an issue",
-					landlock_status
+					?worker_info,
+					"{}. This should not happen, please report an issue",
+					err
 				);
+				if security_status.secure_validator_mode {
+					worker_shutdown(worker_info, &err);
+				}
 			}
 		}
 
@@ -346,48 +322,54 @@ pub fn run_worker<F>(
 		//       job to catch regressions. See <https://github.com/paritytech/ci_cd/issues/609>.
 		#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
 		if security_status.can_enable_seccomp {
-			let seccomp_status =
-				security::seccomp::enable_for_worker(worker_kind, worker_pid, &worker_dir_path);
-			if !matches!(seccomp_status, Ok(())) {
-				// We previously were able to enable, so this should never happen.
-				//
-				// TODO: Make this a real error in secure-mode. See:
-				// <https://github.com/paritytech/polkadot-sdk/issues/1444>
+			if let Err(err) = security::seccomp::enable_for_worker(&worker_info) {
+				// We previously were able to enable, so this should never happen. Shutdown if
+				// running in secure mode.
+				let err = format!("could not fully enable seccomp: {:?}", err);
 				gum::error!(
 					target: LOG_TARGET,
-					%worker_kind,
-					%worker_pid,
-					"could not fully enable seccomp: {:?}. This should not happen, please report an issue",
-					seccomp_status
+					?worker_info,
+					"{}. This should not happen, please report an issue",
+					err
 				);
+				if security_status.secure_validator_mode {
+					worker_shutdown(worker_info, &err);
+				}
 			}
 		}
 
-		if !security::check_env_vars_were_cleared(worker_kind, worker_pid) {
+		if !security::check_env_vars_were_cleared(&worker_info) {
 			let err = "not all env vars were cleared when spawning the process";
 			gum::error!(
 				target: LOG_TARGET,
-				%worker_kind,
-				%worker_pid,
+				?worker_info,
 				"{}",
 				err
 			);
-			worker_shutdown_message(worker_kind, worker_pid, err);
-			return
+			if security_status.secure_validator_mode {
+				worker_shutdown(worker_info, err);
+			}
 		}
 	}
 
 	// Run the main worker loop.
-	let err = event_loop(stream, worker_dir_path)
+	let err = event_loop(stream, worker_info.worker_dir_path.clone())
 		// It's never `Ok` because it's `Ok(Never)`.
 		.unwrap_err();
 
-	worker_shutdown_message(worker_kind, worker_pid, &err.to_string());
+	worker_shutdown(worker_info, &err.to_string());
+}
+
+/// Provide a consistent message on unexpected worker shutdown.
+fn worker_shutdown(worker_info: WorkerInfo, err: &str) -> ! {
+	gum::warn!(target: LOG_TARGET, ?worker_info, "quitting pvf worker ({}): {}", worker_info.kind, err);
+	std::process::exit(1);
 }
 
-/// Provide a consistent message on worker shutdown.
-fn worker_shutdown_message(worker_kind: WorkerKind, worker_pid: u32, err: &str) {
-	gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {}", worker_kind, err);
+/// Provide a consistent error on unexpected worker shutdown.
+fn worker_shutdown_error(worker_info: WorkerInfo, err: &str) -> ! {
+	gum::error!(target: LOG_TARGET, ?worker_info, "quitting pvf worker ({}): {}", worker_info.kind, err);
+	std::process::exit(1);
 }
 
 /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
@@ -458,6 +440,18 @@ fn kill_parent_node_in_emergency() {
 	}
 }
 
+/// Receives a handshake with information for the worker.
+fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result<WorkerHandshake> {
+	let worker_handshake = framed_recv_blocking(stream)?;
+	let worker_handshake = WorkerHandshake::decode(&mut &worker_handshake[..]).map_err(|e| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			format!("recv_worker_handshake: failed to decode WorkerHandshake: {}", e),
+		)
+	})?;
+	Ok(worker_handshake)
+}
+
 /// Functionality related to threads spawned by the workers.
 ///
 /// The motivation for this module is to coordinate worker threads without using async Rust.
diff --git a/polkadot/node/core/pvf/common/src/worker/security/change_root.rs b/polkadot/node/core/pvf/common/src/worker/security/change_root.rs
new file mode 100644
index 0000000000000000000000000000000000000000..375cc8ff6f28e5ff10d33fd9f1cac35fa16de7b1
--- /dev/null
+++ b/polkadot/node/core/pvf/common/src/worker/security/change_root.rs
@@ -0,0 +1,173 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Functionality for securing workers by unsharing some namespaces from other processes and
+//! changing the root.
+
+use crate::{
+	worker::{WorkerInfo, WorkerKind},
+	LOG_TARGET,
+};
+use std::{env, ffi::CString, io, os::unix::ffi::OsStrExt, path::Path, ptr};
+
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+	#[error("{0}")]
+	OsErrWithContext(String),
+	#[error(transparent)]
+	Io(#[from] io::Error),
+	#[error("assertion failed: {0}")]
+	AssertionFailed(String),
+}
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+/// Try to enable for the given kind of worker.
+///
+/// NOTE: This should not be called in a multi-threaded context. `unshare(2)`:
+///       "CLONE_NEWUSER requires that the calling process is not threaded."
+pub fn enable_for_worker(worker_info: &WorkerInfo) -> Result<()> {
+	gum::trace!(
+		target: LOG_TARGET,
+		?worker_info,
+		"enabling change-root",
+	);
+
+	try_restrict(worker_info)
+}
+
+/// Runs a check for unshare-and-change-root and returns an error indicating whether it can be fully
+/// enabled on the current Linux environment.
+///
+/// NOTE: This should not be called in a multi-threaded context. `unshare(2)`:
+///       "CLONE_NEWUSER requires that the calling process is not threaded."
+#[cfg(target_os = "linux")]
+pub fn check_is_fully_enabled(tempdir: &Path) -> Result<()> {
+	let worker_dir_path = tempdir.to_owned();
+	try_restrict(&WorkerInfo {
+		pid: std::process::id(),
+		kind: WorkerKind::CheckPivotRoot,
+		version: None,
+		worker_dir_path,
+	})
+}
+
+/// Unshare the user namespace and change root to be the worker directory.
+///
+/// NOTE: This should not be called in a multi-threaded context. `unshare(2)`:
+///       "CLONE_NEWUSER requires that the calling process is not threaded."
+#[cfg(target_os = "linux")]
+fn try_restrict(worker_info: &WorkerInfo) -> Result<()> {
+	// TODO: Remove this once this is stable: https://github.com/rust-lang/rust/issues/105723
+	macro_rules! cstr_ptr {
+		($e:expr) => {
+			concat!($e, "\0").as_ptr().cast::<core::ffi::c_char>()
+		};
+	}
+
+	gum::trace!(
+		target: LOG_TARGET,
+		?worker_info,
+		"unsharing the user namespace and calling pivot_root",
+	);
+
+	let worker_dir_path_c = CString::new(worker_info.worker_dir_path.as_os_str().as_bytes())
+		.expect("on unix; the path will never contain 0 bytes; qed");
+
+	// Wrapper around all the work to prevent repetitive error handling.
+	//
+	// # Errors
+	//
+	// It's the caller's responsibility to call `Error::last_os_error`. Note that that alone does
+	// not give the context of which call failed, so we return a &str error.
+	|| -> std::result::Result<(), &'static str> {
+		// SAFETY: We pass null-terminated C strings and use the APIs as documented. In fact, steps
+		//         (2) and (3) are adapted from the example in pivot_root(2), with the additional
+		//         change described in the `pivot_root(".", ".")` section.
+		unsafe {
+			// 1. `unshare` the user and the mount namespaces.
+			if libc::unshare(libc::CLONE_NEWUSER | libc::CLONE_NEWNS) < 0 {
+				return Err("unshare user and mount namespaces")
+			}
+
+			// 2. Setup mounts.
+			//
+			// Ensure that new root and its parent mount don't have shared propagation (which would
+			// cause pivot_root() to return an error), and prevent propagation of mount events to
+			// the initial mount namespace.
+			if libc::mount(
+				ptr::null(),
+				cstr_ptr!("/"),
+				ptr::null(),
+				libc::MS_REC | libc::MS_PRIVATE,
+				ptr::null(),
+			) < 0
+			{
+				return Err("mount MS_PRIVATE")
+			}
+			// Ensure that the new root is a mount point.
+			let additional_flags =
+				if let WorkerKind::Execute | WorkerKind::CheckPivotRoot = worker_info.kind {
+					libc::MS_RDONLY
+				} else {
+					0
+				};
+			if libc::mount(
+				worker_dir_path_c.as_ptr(),
+				worker_dir_path_c.as_ptr(),
+				ptr::null(), // ignored when MS_BIND is used
+				libc::MS_BIND |
+					libc::MS_REC | libc::MS_NOEXEC |
+					libc::MS_NODEV | libc::MS_NOSUID |
+					libc::MS_NOATIME | additional_flags,
+				ptr::null(), // ignored when MS_BIND is used
+			) < 0
+			{
+				return Err("mount MS_BIND")
+			}
+
+			// 3. `pivot_root` to the artifact directory.
+			if libc::chdir(worker_dir_path_c.as_ptr()) < 0 {
+				return Err("chdir to worker dir path")
+			}
+			if libc::syscall(libc::SYS_pivot_root, cstr_ptr!("."), cstr_ptr!(".")) < 0 {
+				return Err("pivot_root")
+			}
+			if libc::umount2(cstr_ptr!("."), libc::MNT_DETACH) < 0 {
+				return Err("umount the old root mount point")
+			}
+		}
+
+		Ok(())
+	}()
+	.map_err(|err_ctx| {
+		let err = io::Error::last_os_error();
+		Error::OsErrWithContext(format!("{}: {}", err_ctx, err))
+	})?;
+
+	// Do some assertions.
+	if env::current_dir()? != Path::new("/") {
+		return Err(Error::AssertionFailed("expected current dir after pivot_root to be `/`".into()))
+	}
+	env::set_current_dir("..")?;
+	if env::current_dir()? != Path::new("/") {
+		return Err(Error::AssertionFailed(
+			"expected not to be able to break out of new root by doing `..`".into(),
+		))
+	}
+
+	Ok(())
+}
diff --git a/polkadot/node/core/pvf/common/src/worker/security/landlock.rs b/polkadot/node/core/pvf/common/src/worker/security/landlock.rs
index 51500c733b8cea52805f0f6acdfdeb99ef4d7b68..211d12c2e443aacd6b11b6ef9e4cfddf5aa9bf26 100644
--- a/polkadot/node/core/pvf/common/src/worker/security/landlock.rs
+++ b/polkadot/node/core/pvf/common/src/worker/security/landlock.rs
@@ -28,7 +28,7 @@
 pub use landlock::RulesetStatus;
 
 use crate::{
-	worker::{stringify_panic_payload, WorkerKind},
+	worker::{stringify_panic_payload, WorkerInfo, WorkerKind},
 	LOG_TARGET,
 };
 use landlock::*;
@@ -74,6 +74,8 @@ pub const LANDLOCK_ABI: ABI = ABI::V1;
 
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
+	#[error("Could not fully enable: {0:?}")]
+	NotFullyEnabled(RulesetStatus),
 	#[error("Invalid exception path: {0:?}")]
 	InvalidExceptionPath(PathBuf),
 	#[error(transparent)]
@@ -85,17 +87,13 @@ pub enum Error {
 pub type Result<T> = std::result::Result<T, Error>;
 
 /// Try to enable landlock for the given kind of worker.
-pub fn enable_for_worker(
-	worker_kind: WorkerKind,
-	worker_pid: u32,
-	worker_dir_path: &Path,
-) -> Result<RulesetStatus> {
-	let exceptions: Vec<(PathBuf, BitFlags<AccessFs>)> = match worker_kind {
+pub fn enable_for_worker(worker_info: &WorkerInfo) -> Result<()> {
+	let exceptions: Vec<(PathBuf, BitFlags<AccessFs>)> = match worker_info.kind {
 		WorkerKind::Prepare => {
-			vec![(worker_dir_path.to_owned(), AccessFs::WriteFile.into())]
+			vec![(worker_info.worker_dir_path.to_owned(), AccessFs::WriteFile.into())]
 		},
 		WorkerKind::Execute => {
-			vec![(worker_dir_path.to_owned(), AccessFs::ReadFile.into())]
+			vec![(worker_info.worker_dir_path.to_owned(), AccessFs::ReadFile.into())]
 		},
 		WorkerKind::CheckPivotRoot =>
 			panic!("this should only be passed for checking pivot_root; qed"),
@@ -103,9 +101,7 @@ pub fn enable_for_worker(
 
 	gum::trace!(
 		target: LOG_TARGET,
-		%worker_kind,
-		%worker_pid,
-		?worker_dir_path,
+		?worker_info,
 		"enabling landlock with exceptions: {:?}",
 		exceptions,
 	);
@@ -114,18 +110,14 @@ pub fn enable_for_worker(
 }
 
 // TODO: <https://github.com/landlock-lsm/rust-landlock/issues/36>
-/// Runs a check for landlock and returns a single bool indicating whether the given landlock
-/// ABI is fully enabled on the current Linux environment.
-pub fn check_is_fully_enabled() -> bool {
-	let status_from_thread: Result<RulesetStatus> =
-		match std::thread::spawn(|| try_restrict(std::iter::empty::<(PathBuf, AccessFs)>())).join()
-		{
-			Ok(Ok(status)) => Ok(status),
-			Ok(Err(ruleset_err)) => Err(ruleset_err.into()),
-			Err(err) => Err(Error::Panic(stringify_panic_payload(err))),
-		};
-
-	matches!(status_from_thread, Ok(RulesetStatus::FullyEnforced))
+/// Runs a check for landlock in its own thread, and returns an error indicating whether the given
+/// landlock ABI is fully enabled on the current Linux environment.
+pub fn check_is_fully_enabled() -> Result<()> {
+	match std::thread::spawn(|| try_restrict(std::iter::empty::<(PathBuf, AccessFs)>())).join() {
+		Ok(Ok(())) => Ok(()),
+		Ok(Err(err)) => Err(err),
+		Err(err) => Err(Error::Panic(stringify_panic_payload(err))),
+	}
 }
 
 /// Tries to restrict the current thread (should only be called in a process' main thread) with
@@ -139,7 +131,7 @@ pub fn check_is_fully_enabled() -> bool {
 /// # Returns
 ///
 /// The status of the restriction (whether it was fully, partially, or not-at-all enforced).
-fn try_restrict<I, P, A>(fs_exceptions: I) -> Result<RulesetStatus>
+fn try_restrict<I, P, A>(fs_exceptions: I) -> Result<()>
 where
 	I: IntoIterator<Item = (P, A)>,
 	P: AsRef<Path>,
@@ -156,8 +148,13 @@ where
 		}
 		ruleset = ruleset.add_rules(rules)?;
 	}
+
 	let status = ruleset.restrict_self()?;
-	Ok(status.ruleset)
+	if !matches!(status.ruleset, RulesetStatus::FullyEnforced) {
+		return Err(Error::NotFullyEnabled(status.ruleset))
+	}
+
+	Ok(())
 }
 
 #[cfg(test)]
@@ -168,7 +165,7 @@ mod tests {
 	#[test]
 	fn restricted_thread_cannot_read_file() {
 		// TODO: This would be nice: <https://github.com/rust-lang/rust/issues/68007>.
-		if !check_is_fully_enabled() {
+		if check_is_fully_enabled().is_err() {
 			return
 		}
 
@@ -191,7 +188,7 @@ mod tests {
 
 			// Apply Landlock with a read exception for only one of the files.
 			let status = try_restrict(vec![(path1, AccessFs::ReadFile)]);
-			if !matches!(status, Ok(RulesetStatus::FullyEnforced)) {
+			if !matches!(status, Ok(())) {
 				panic!(
 					"Ruleset should be enforced since we checked if landlock is enabled: {:?}",
 					status
@@ -212,7 +209,7 @@ mod tests {
 
 			// Apply Landlock for all files.
 			let status = try_restrict(std::iter::empty::<(PathBuf, AccessFs)>());
-			if !matches!(status, Ok(RulesetStatus::FullyEnforced)) {
+			if !matches!(status, Ok(())) {
 				panic!(
 					"Ruleset should be enforced since we checked if landlock is enabled: {:?}",
 					status
@@ -233,7 +230,7 @@ mod tests {
 	#[test]
 	fn restricted_thread_cannot_write_file() {
 		// TODO: This would be nice: <https://github.com/rust-lang/rust/issues/68007>.
-		if !check_is_fully_enabled() {
+		if check_is_fully_enabled().is_err() {
 			return
 		}
 
@@ -252,7 +249,7 @@ mod tests {
 
 			// Apply Landlock with a write exception for only one of the files.
 			let status = try_restrict(vec![(path1, AccessFs::WriteFile)]);
-			if !matches!(status, Ok(RulesetStatus::FullyEnforced)) {
+			if !matches!(status, Ok(())) {
 				panic!(
 					"Ruleset should be enforced since we checked if landlock is enabled: {:?}",
 					status
@@ -270,7 +267,7 @@ mod tests {
 
 			// Apply Landlock for all files.
 			let status = try_restrict(std::iter::empty::<(PathBuf, AccessFs)>());
-			if !matches!(status, Ok(RulesetStatus::FullyEnforced)) {
+			if !matches!(status, Ok(())) {
 				panic!(
 					"Ruleset should be enforced since we checked if landlock is enabled: {:?}",
 					status
@@ -292,7 +289,7 @@ mod tests {
 	#[test]
 	fn restricted_thread_can_truncate_file() {
 		// TODO: This would be nice: <https://github.com/rust-lang/rust/issues/68007>.
-		if !check_is_fully_enabled() {
+		if check_is_fully_enabled().is_err() {
 			return
 		}
 
@@ -308,7 +305,7 @@ mod tests {
 
 			// Apply Landlock with all exceptions under the current ABI.
 			let status = try_restrict(vec![(path, AccessFs::from_all(LANDLOCK_ABI))]);
-			if !matches!(status, Ok(RulesetStatus::FullyEnforced)) {
+			if !matches!(status, Ok(())) {
 				panic!(
 					"Ruleset should be enforced since we checked if landlock is enabled: {:?}",
 					status
diff --git a/polkadot/node/core/pvf/common/src/worker/security/mod.rs b/polkadot/node/core/pvf/common/src/worker/security/mod.rs
index 9a38ed172773dff10f533434c1c928d8ed99868d..ff4c712f6bdca1351dcc7da79aee3558b8121a44 100644
--- a/polkadot/node/core/pvf/common/src/worker/security/mod.rs
+++ b/polkadot/node/core/pvf/common/src/worker/security/mod.rs
@@ -27,134 +27,21 @@
 //! - Restrict networking by blocking socket creation and io_uring.
 //! - Remove env vars
 
-use crate::{worker::WorkerKind, LOG_TARGET};
+use crate::{worker::WorkerInfo, LOG_TARGET};
 
+#[cfg(target_os = "linux")]
+pub mod change_root;
 #[cfg(target_os = "linux")]
 pub mod landlock;
-
 #[cfg(all(target_os = "linux", target_arch = "x86_64"))]
 pub mod seccomp;
 
-/// Unshare the user namespace and change root to be the artifact directory.
-///
-/// NOTE: This should not be called in a multi-threaded context. `unshare(2)`:
-///       "CLONE_NEWUSER requires that the calling process is not threaded."
-#[cfg(target_os = "linux")]
-pub fn unshare_user_namespace_and_change_root(
-	worker_kind: WorkerKind,
-	worker_pid: u32,
-	worker_dir_path: &std::path::Path,
-) -> Result<(), String> {
-	use std::{env, ffi::CString, os::unix::ffi::OsStrExt, path::Path, ptr};
-
-	// TODO: Remove this once this is stable: https://github.com/rust-lang/rust/issues/105723
-	macro_rules! cstr_ptr {
-		($e:expr) => {
-			concat!($e, "\0").as_ptr().cast::<core::ffi::c_char>()
-		};
-	}
-
-	gum::trace!(
-		target: LOG_TARGET,
-		%worker_kind,
-		%worker_pid,
-		?worker_dir_path,
-		"unsharing the user namespace and calling pivot_root",
-	);
-
-	let worker_dir_path_c = CString::new(worker_dir_path.as_os_str().as_bytes())
-		.expect("on unix; the path will never contain 0 bytes; qed");
-
-	// Wrapper around all the work to prevent repetitive error handling.
-	//
-	// # Errors
-	//
-	// It's the caller's responsibility to call `Error::last_os_error`. Note that that alone does
-	// not give the context of which call failed, so we return a &str error.
-	|| -> Result<(), &'static str> {
-		// SAFETY: We pass null-terminated C strings and use the APIs as documented. In fact, steps
-		//         (2) and (3) are adapted from the example in pivot_root(2), with the additional
-		//         change described in the `pivot_root(".", ".")` section.
-		unsafe {
-			// 1. `unshare` the user and the mount namespaces.
-			if libc::unshare(libc::CLONE_NEWUSER | libc::CLONE_NEWNS) < 0 {
-				return Err("unshare user and mount namespaces")
-			}
-
-			// 2. Setup mounts.
-			//
-			// Ensure that new root and its parent mount don't have shared propagation (which would
-			// cause pivot_root() to return an error), and prevent propagation of mount events to
-			// the initial mount namespace.
-			if libc::mount(
-				ptr::null(),
-				cstr_ptr!("/"),
-				ptr::null(),
-				libc::MS_REC | libc::MS_PRIVATE,
-				ptr::null(),
-			) < 0
-			{
-				return Err("mount MS_PRIVATE")
-			}
-			// Ensure that the new root is a mount point.
-			let additional_flags =
-				if let WorkerKind::Execute | WorkerKind::CheckPivotRoot = worker_kind {
-					libc::MS_RDONLY
-				} else {
-					0
-				};
-			if libc::mount(
-				worker_dir_path_c.as_ptr(),
-				worker_dir_path_c.as_ptr(),
-				ptr::null(), // ignored when MS_BIND is used
-				libc::MS_BIND |
-					libc::MS_REC | libc::MS_NOEXEC |
-					libc::MS_NODEV | libc::MS_NOSUID |
-					libc::MS_NOATIME | additional_flags,
-				ptr::null(), // ignored when MS_BIND is used
-			) < 0
-			{
-				return Err("mount MS_BIND")
-			}
-
-			// 3. `pivot_root` to the artifact directory.
-			if libc::chdir(worker_dir_path_c.as_ptr()) < 0 {
-				return Err("chdir to worker dir path")
-			}
-			if libc::syscall(libc::SYS_pivot_root, cstr_ptr!("."), cstr_ptr!(".")) < 0 {
-				return Err("pivot_root")
-			}
-			if libc::umount2(cstr_ptr!("."), libc::MNT_DETACH) < 0 {
-				return Err("umount the old root mount point")
-			}
-		}
-
-		Ok(())
-	}()
-	.map_err(|err_ctx| {
-		let err = std::io::Error::last_os_error();
-		format!("{}: {}", err_ctx, err)
-	})?;
-
-	// Do some assertions.
-	if env::current_dir().map_err(|err| err.to_string())? != Path::new("/") {
-		return Err("expected current dir after pivot_root to be `/`".into())
-	}
-	env::set_current_dir("..").map_err(|err| err.to_string())?;
-	if env::current_dir().map_err(|err| err.to_string())? != Path::new("/") {
-		return Err("expected not to be able to break out of new root by doing `..`".into())
-	}
-
-	Ok(())
-}
-
 /// Require env vars to have been removed when spawning the process, to prevent malicious code from
 /// accessing them.
-pub fn check_env_vars_were_cleared(worker_kind: WorkerKind, worker_pid: u32) -> bool {
+pub fn check_env_vars_were_cleared(worker_info: &WorkerInfo) -> bool {
 	gum::trace!(
 		target: LOG_TARGET,
-		%worker_kind,
-		%worker_pid,
+		?worker_info,
 		"clearing env vars in worker",
 	);
 
@@ -162,8 +49,8 @@ pub fn check_env_vars_were_cleared(worker_kind: WorkerKind, worker_pid: u32) ->
 
 	for (key, value) in std::env::vars_os() {
 		// TODO: *theoretically* the value (or mere presence) of `RUST_LOG` can be a source of
-		// randomness for malicious code. In the future we can remove it also and log in the host;
-		// see <https://github.com/paritytech/polkadot/issues/7117>.
+		// randomness for malicious code. It should be removed in the job process, which does no
+		// logging.
 		if key == "RUST_LOG" {
 			continue
 		}
@@ -175,8 +62,7 @@ pub fn check_env_vars_were_cleared(worker_kind: WorkerKind, worker_pid: u32) ->
 
 		gum::error!(
 			target: LOG_TARGET,
-			%worker_kind,
-			%worker_pid,
+			?worker_info,
 			?key,
 			?value,
 			"env var was present that should have been removed",
diff --git a/polkadot/node/core/pvf/common/src/worker/security/seccomp.rs b/polkadot/node/core/pvf/common/src/worker/security/seccomp.rs
index c3822d3c4c698834c4a92de5a9204398641e016f..4f270f75b345c96fc1118f5373b3fdca229e8e52 100644
--- a/polkadot/node/core/pvf/common/src/worker/security/seccomp.rs
+++ b/polkadot/node/core/pvf/common/src/worker/security/seccomp.rs
@@ -72,11 +72,11 @@
 //! candidate.
 
 use crate::{
-	worker::{stringify_panic_payload, WorkerKind},
+	worker::{stringify_panic_payload, WorkerInfo},
 	LOG_TARGET,
 };
 use seccompiler::*;
-use std::{collections::BTreeMap, path::Path};
+use std::collections::BTreeMap;
 
 /// The action to take on caught syscalls.
 #[cfg(not(test))]
@@ -98,36 +98,28 @@ pub enum Error {
 pub type Result<T> = std::result::Result<T, Error>;
 
 /// Try to enable seccomp for the given kind of worker.
-pub fn enable_for_worker(
-	worker_kind: WorkerKind,
-	worker_pid: u32,
-	worker_dir_path: &Path,
-) -> Result<()> {
+pub fn enable_for_worker(worker_info: &WorkerInfo) -> Result<()> {
 	gum::trace!(
 		target: LOG_TARGET,
-		%worker_kind,
-		%worker_pid,
-		?worker_dir_path,
+		?worker_info,
 		"enabling seccomp",
 	);
 
 	try_restrict()
 }
 
-/// Runs a check for seccomp and returns a single bool indicating whether seccomp with our rules is
-/// fully enabled on the current Linux environment.
-pub fn check_is_fully_enabled() -> bool {
-	let status_from_thread: Result<()> = match std::thread::spawn(|| try_restrict()).join() {
+/// Runs a check for seccomp in its own thread, and returns an error indicating whether seccomp with
+/// our rules is fully enabled on the current Linux environment.
+pub fn check_is_fully_enabled() -> Result<()> {
+	match std::thread::spawn(|| try_restrict()).join() {
 		Ok(Ok(())) => Ok(()),
-		Ok(Err(err)) => Err(err.into()),
+		Ok(Err(err)) => Err(err),
 		Err(err) => Err(Error::Panic(stringify_panic_payload(err))),
-	};
-
-	matches!(status_from_thread, Ok(()))
+	}
 }
 
 /// Applies a `seccomp` filter to disable networking for the PVF threads.
-pub fn try_restrict() -> Result<()> {
+fn try_restrict() -> Result<()> {
 	// Build a `seccomp` filter which by default allows all syscalls except those blocked in the
 	// blacklist.
 	let mut blacklisted_rules = BTreeMap::default();
@@ -169,7 +161,7 @@ mod tests {
 	#[test]
 	fn sandboxed_thread_cannot_use_sockets() {
 		// TODO: This would be nice: <https://github.com/rust-lang/rust/issues/68007>.
-		if !check_is_fully_enabled() {
+		if check_is_fully_enabled().is_err() {
 			return
 		}
 
diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs
index 8c985fc7996ea28417fff0776625232dd40c15cb..b33a9d5069dffaa0d4264897022e1e7709577991 100644
--- a/polkadot/node/core/pvf/execute-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs
@@ -16,9 +16,7 @@
 
 //! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary.
 
-pub use polkadot_node_core_pvf_common::{
-	executor_interface::execute_artifact, worker_dir, SecurityStatus,
-};
+pub use polkadot_node_core_pvf_common::{executor_interface::execute_artifact, worker_dir};
 
 // NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
 //       separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`.
@@ -92,12 +90,13 @@ use std::{
 /// The stack size for the execute thread.
 pub const EXECUTE_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 + DEFAULT_NATIVE_STACK_MAX as usize;
 
-fn recv_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
+/// Receives a handshake with information specific to the execute worker.
+fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
 	let handshake_enc = framed_recv_blocking(stream)?;
 	let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
 		io::Error::new(
 			io::ErrorKind::Other,
-			"execute pvf recv_handshake: failed to decode Handshake".to_owned(),
+			"execute pvf recv_execute_handshake: failed to decode Handshake".to_owned(),
 		)
 	})?;
 	Ok(handshake)
@@ -139,7 +138,6 @@ pub fn worker_entrypoint(
 	worker_dir_path: PathBuf,
 	node_version: Option<&str>,
 	worker_version: Option<&str>,
-	security_status: SecurityStatus,
 ) {
 	run_worker(
 		WorkerKind::Execute,
@@ -147,12 +145,11 @@ pub fn worker_entrypoint(
 		worker_dir_path,
 		node_version,
 		worker_version,
-		&security_status,
 		|mut stream, worker_dir_path| {
 			let worker_pid = process::id();
 			let artifact_path = worker_dir::execute_artifact(&worker_dir_path);
 
-			let Handshake { executor_params } = recv_handshake(&mut stream)?;
+			let Handshake { executor_params } = recv_execute_handshake(&mut stream)?;
 
 			loop {
 				let (params, execution_timeout) = recv_request(&mut stream)?;
diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
index b3f8a6b11ba43b5e9e9f27679f2a35464f7857ec..af5ac8c5974900055a9623fe5ec44242d2d77a2d 100644
--- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
@@ -50,7 +50,7 @@ use polkadot_node_core_pvf_common::{
 		thread::{self, spawn_worker_thread, WaitOutcome},
 		WorkerKind,
 	},
-	worker_dir, ProcessTime, SecurityStatus,
+	worker_dir, ProcessTime,
 };
 use polkadot_primitives::ExecutorParams;
 use std::{
@@ -193,7 +193,6 @@ pub fn worker_entrypoint(
 	worker_dir_path: PathBuf,
 	node_version: Option<&str>,
 	worker_version: Option<&str>,
-	security_status: SecurityStatus,
 ) {
 	run_worker(
 		WorkerKind::Prepare,
@@ -201,7 +200,6 @@ pub fn worker_entrypoint(
 		worker_dir_path,
 		node_version,
 		worker_version,
-		&security_status,
 		|mut stream, worker_dir_path| {
 			let worker_pid = process::id();
 			let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_dir_path);
diff --git a/polkadot/node/core/pvf/src/artifacts.rs b/polkadot/node/core/pvf/src/artifacts.rs
index 79b53467b4e3305e2148d990b829ee9e77f98a48..710e266841f510f6b8c16352a64bbb55134f41b1 100644
--- a/polkadot/node/core/pvf/src/artifacts.rs
+++ b/polkadot/node/core/pvf/src/artifacts.rs
@@ -66,6 +66,7 @@ use polkadot_parachain_primitives::primitives::ValidationCodeHash;
 use polkadot_primitives::ExecutorParamsHash;
 use std::{
 	collections::HashMap,
+	io,
 	path::{Path, PathBuf},
 	str::FromStr as _,
 	time::{Duration, SystemTime},
@@ -290,7 +291,17 @@ impl Artifacts {
 		}
 
 		// Make sure that the cache path directory and all its parents are created.
-		let _ = tokio::fs::create_dir_all(cache_path).await;
+		if let Err(err) = tokio::fs::create_dir_all(cache_path).await {
+			if err.kind() != io::ErrorKind::AlreadyExists {
+				gum::error!(
+					target: LOG_TARGET,
+					?err,
+					"failed to create dir {:?}",
+					cache_path,
+				);
+				return
+			}
+		}
 
 		let mut dir = match tokio::fs::read_dir(cache_path).await {
 			Ok(dir) => dir,
diff --git a/polkadot/node/core/pvf/src/execute/worker_interface.rs b/polkadot/node/core/pvf/src/execute/worker_interface.rs
index 7864ce548c24da58ca6ffc3ce397abc8a6ce96f6..9f7738f00e699ab981d7fa4396fcd09d5e1a4abe 100644
--- a/polkadot/node/core/pvf/src/execute/worker_interface.rs
+++ b/polkadot/node/core/pvf/src/execute/worker_interface.rs
@@ -62,16 +62,16 @@ pub async fn spawn(
 		security_status,
 	)
 	.await?;
-	send_handshake(&mut idle_worker.stream, Handshake { executor_params })
+	send_execute_handshake(&mut idle_worker.stream, Handshake { executor_params })
 		.await
 		.map_err(|error| {
+			let err = SpawnErr::Handshake { err: error.to_string() };
 			gum::warn!(
 				target: LOG_TARGET,
 				worker_pid = %idle_worker.pid,
-				?error,
-				"failed to send a handshake to the spawned worker",
+				%err
 			);
-			SpawnErr::Handshake
+			err
 		})?;
 	Ok((idle_worker, worker_handle))
 }
@@ -286,7 +286,8 @@ where
 	outcome
 }
 
-async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> {
+/// Sends a handshake with information specific to the execute worker.
+async fn send_execute_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> {
 	framed_send(stream, &handshake.encode()).await
 }
 
diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs
index be8f7aee778477973ef800a48b84d025ab254765..f7817853dd1be261a9d52df978521f1459057617 100644
--- a/polkadot/node/core/pvf/src/host.rs
+++ b/polkadot/node/core/pvf/src/host.rs
@@ -36,7 +36,7 @@ use polkadot_node_core_pvf_common::{
 	prepare::PrepareSuccess,
 	pvf::PvfPrepData,
 };
-use polkadot_node_subsystem::SubsystemResult;
+use polkadot_node_subsystem::{SubsystemError, SubsystemResult};
 use polkadot_parachain_primitives::primitives::ValidationResult;
 use std::{
 	collections::HashMap,
@@ -156,6 +156,8 @@ pub struct Config {
 	pub cache_path: PathBuf,
 	/// The version of the node. `None` can be passed to skip the version check (only for tests).
 	pub node_version: Option<String>,
+	/// Whether the node is attempting to run as a secure validator.
+	pub secure_validator_mode: bool,
 
 	/// The path to the program that can be used to spawn the prepare workers.
 	pub prepare_worker_program_path: PathBuf,
@@ -180,12 +182,14 @@ impl Config {
 	pub fn new(
 		cache_path: PathBuf,
 		node_version: Option<String>,
+		secure_validator_mode: bool,
 		prepare_worker_program_path: PathBuf,
 		execute_worker_program_path: PathBuf,
 	) -> Self {
 		Self {
 			cache_path,
 			node_version,
+			secure_validator_mode,
 
 			prepare_worker_program_path,
 			prepare_worker_spawn_timeout: Duration::from_secs(3),
@@ -213,8 +217,12 @@ pub async fn start(
 ) -> SubsystemResult<(ValidationHost, impl Future<Output = ()>)> {
 	gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host");
 
-	// Run checks for supported security features once per host startup. Warn here if not enabled.
-	let security_status = security::check_security_status(&config).await;
+	// Run checks for supported security features once per host startup. If some checks fail, warn
+	// if Secure Validator Mode is disabled and return an error otherwise.
+	let security_status = match security::check_security_status(&config).await {
+		Ok(ok) => ok,
+		Err(err) => return Err(SubsystemError::Context(err)),
+	};
 
 	let (to_host_tx, to_host_rx) = mpsc::channel(10);
 
diff --git a/polkadot/node/core/pvf/src/security.rs b/polkadot/node/core/pvf/src/security.rs
index 2fd3b53e96b4bf4e31cbda32410dd159ddfb6bde..9d0d4cf49afe940a3376097744ed28dcd71f5e7c 100644
--- a/polkadot/node/core/pvf/src/security.rs
+++ b/polkadot/node/core/pvf/src/security.rs
@@ -18,18 +18,19 @@ use crate::{Config, SecurityStatus, LOG_TARGET};
 use futures::join;
 use std::{fmt, path::Path};
 
-const SECURE_MODE_ANNOUNCEMENT: &'static str =
-	"In the next release this will be a hard error by default.
-     \nMore information: https://wiki.polkadot.network/docs/maintain-guides-secure-validator#secure-validator-mode";
-
 /// Run checks for supported security features.
 ///
 /// # Returns
 ///
 /// Returns the set of security features that we were able to enable. If an error occurs while
 /// enabling a security feature we set the corresponding status to `false`.
-pub async fn check_security_status(config: &Config) -> SecurityStatus {
-	let Config { prepare_worker_program_path, cache_path, .. } = config;
+///
+/// # Errors
+///
+/// Returns an error only if we could not fully enforce the security level required by the current
+/// configuration.
+pub async fn check_security_status(config: &Config) -> Result<SecurityStatus, String> {
+	let Config { prepare_worker_program_path, secure_validator_mode, cache_path, .. } = config;
 
 	let (landlock, seccomp, change_root) = join!(
 		check_landlock(prepare_worker_program_path),
@@ -37,26 +38,81 @@ pub async fn check_security_status(config: &Config) -> SecurityStatus {
 		check_can_unshare_user_namespace_and_change_root(prepare_worker_program_path, cache_path)
 	);
 
-	let security_status = SecurityStatus {
-		can_enable_landlock: landlock.is_ok(),
-		can_enable_seccomp: seccomp.is_ok(),
-		can_unshare_user_namespace_and_change_root: change_root.is_ok(),
-	};
+	let full_security_status =
+		FullSecurityStatus::new(*secure_validator_mode, landlock, seccomp, change_root);
+	let security_status = full_security_status.as_partial();
 
-	let errs: Vec<SecureModeError> = [landlock, seccomp, change_root]
-		.into_iter()
-		.filter_map(|result| result.err())
-		.collect();
-	let err_occurred = print_secure_mode_message(errs);
-	if err_occurred {
-		gum::error!(
+	if full_security_status.err_occurred() {
+		print_secure_mode_error_or_warning(&full_security_status);
+		if !full_security_status.all_errs_allowed() {
+			return Err("could not enable Secure Validator Mode; check logs".into())
+		}
+	}
+
+	if security_status.secure_validator_mode {
+		gum::info!(
 			target: LOG_TARGET,
-			"{}",
-			SECURE_MODE_ANNOUNCEMENT,
+			"👮‍♀️ Running in Secure Validator Mode. \
+			 It is highly recommended that you operate according to our security guidelines. \
+			 \nMore information: https://wiki.polkadot.network/docs/maintain-guides-secure-validator#secure-validator-mode"
 		);
 	}
 
-	security_status
+	Ok(security_status)
+}
+
+/// Contains the full security status including error states.
+struct FullSecurityStatus {
+	partial: SecurityStatus,
+	errs: Vec<SecureModeError>,
+}
+
+impl FullSecurityStatus {
+	fn new(
+		secure_validator_mode: bool,
+		landlock: SecureModeResult,
+		seccomp: SecureModeResult,
+		change_root: SecureModeResult,
+	) -> Self {
+		Self {
+			partial: SecurityStatus {
+				secure_validator_mode,
+				can_enable_landlock: landlock.is_ok(),
+				can_enable_seccomp: seccomp.is_ok(),
+				can_unshare_user_namespace_and_change_root: change_root.is_ok(),
+			},
+			errs: [landlock, seccomp, change_root]
+				.into_iter()
+				.filter_map(|result| result.err())
+				.collect(),
+		}
+	}
+
+	fn as_partial(&self) -> SecurityStatus {
+		self.partial.clone()
+	}
+
+	fn err_occurred(&self) -> bool {
+		!self.errs.is_empty()
+	}
+
+	fn all_errs_allowed(&self) -> bool {
+		!self.partial.secure_validator_mode ||
+			self.errs.iter().all(|err| err.is_allowed_in_secure_mode(&self.partial))
+	}
+
+	fn errs_string(&self) -> String {
+		self.errs
+			.iter()
+			.map(|err| {
+				format!(
+					"\n  - {}{}",
+					if err.is_allowed_in_secure_mode(&self.partial) { "Optional: " } else { "" },
+					err
+				)
+			})
+			.collect()
+	}
 }
 
 type SecureModeResult = std::result::Result<(), SecureModeError>;
@@ -71,12 +127,17 @@ enum SecureModeError {
 
 impl SecureModeError {
 	/// Whether this error is allowed with Secure Validator Mode enabled.
-	fn is_allowed_in_secure_mode(&self) -> bool {
+	fn is_allowed_in_secure_mode(&self, security_status: &SecurityStatus) -> bool {
 		use SecureModeError::*;
 		match self {
-			CannotEnableLandlock(_) => true,
+			// Landlock is present on relatively recent Linuxes. This is optional if the unshare
+			// capability is present, providing FS sandboxing a different way.
+			CannotEnableLandlock(_) => security_status.can_unshare_user_namespace_and_change_root,
+			// seccomp should be present on all modern Linuxes unless it's been disabled.
 			CannotEnableSeccomp(_) => false,
-			CannotUnshareUserNamespaceAndChangeRoot(_) => false,
+			// Should always be present on modern Linuxes. If not, Landlock also provides FS
+			// sandboxing, so don't enforce this.
+			CannotUnshareUserNamespaceAndChangeRoot(_) => security_status.can_enable_landlock,
 		}
 	}
 }
@@ -92,12 +153,8 @@ impl fmt::Display for SecureModeError {
 	}
 }
 
-/// Errors if Secure Validator Mode and some mandatory errors occurred, warn otherwise.
-///
-/// # Returns
-///
-/// `true` if an error was printed, `false` otherwise.
-fn print_secure_mode_message(errs: Vec<SecureModeError>) -> bool {
+/// Print an error if Secure Validator Mode and some mandatory errors occurred, warn otherwise.
+fn print_secure_mode_error_or_warning(security_status: &FullSecurityStatus) {
 	// Trying to run securely and some mandatory errors occurred.
 	const SECURE_MODE_ERROR: &'static str = "🚨 Your system cannot securely run a validator. \
 		 \nRunning validation of malicious PVF code has a higher risk of compromising this machine.";
@@ -105,39 +162,31 @@ fn print_secure_mode_message(errs: Vec<SecureModeError>) -> bool {
 	// securely.
 	const SECURE_MODE_WARNING: &'static str = "🚨 Some security issues have been detected. \
 		 \nRunning validation of malicious PVF code has a higher risk of compromising this machine.";
+	// Message to be printed only when running securely and mandatory errors occurred.
+	const IGNORE_SECURE_MODE_TIP: &'static str =
+		"\nYou can ignore this error with the `--insecure-validator-i-know-what-i-do` \
+		 command line argument if you understand and accept the risks of running insecurely. \
+		 With this flag, security features are enabled on a best-effort basis, but not mandatory. \
+		 \nMore information: https://wiki.polkadot.network/docs/maintain-guides-secure-validator#secure-validator-mode";
 
-	if errs.is_empty() {
-		return false
-	}
-
-	let errs_allowed = errs.iter().all(|err| err.is_allowed_in_secure_mode());
-	let errs_string: String = errs
-		.iter()
-		.map(|err| {
-			format!(
-				"\n  - {}{}",
-				if err.is_allowed_in_secure_mode() { "Optional: " } else { "" },
-				err
-			)
-		})
-		.collect();
+	let all_errs_allowed = security_status.all_errs_allowed();
+	let errs_string = security_status.errs_string();
 
-	if errs_allowed {
+	if all_errs_allowed {
 		gum::warn!(
 			target: LOG_TARGET,
 			"{}{}",
 			SECURE_MODE_WARNING,
 			errs_string,
 		);
-		false
 	} else {
 		gum::error!(
 			target: LOG_TARGET,
-			"{}{}",
+			"{}{}{}",
 			SECURE_MODE_ERROR,
 			errs_string,
+			IGNORE_SECURE_MODE_TIP
 		);
-		true
 	}
 }
 
@@ -298,3 +347,53 @@ async fn check_seccomp(
 		}
 	}
 }
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn test_secure_mode_error_optionality() {
+		let err = SecureModeError::CannotEnableLandlock(String::new());
+		assert!(err.is_allowed_in_secure_mode(&SecurityStatus {
+			secure_validator_mode: true,
+			can_enable_landlock: false,
+			can_enable_seccomp: false,
+			can_unshare_user_namespace_and_change_root: true
+		}));
+		assert!(!err.is_allowed_in_secure_mode(&SecurityStatus {
+			secure_validator_mode: true,
+			can_enable_landlock: false,
+			can_enable_seccomp: true,
+			can_unshare_user_namespace_and_change_root: false
+		}));
+
+		let err = SecureModeError::CannotEnableSeccomp(String::new());
+		assert!(!err.is_allowed_in_secure_mode(&SecurityStatus {
+			secure_validator_mode: true,
+			can_enable_landlock: false,
+			can_enable_seccomp: false,
+			can_unshare_user_namespace_and_change_root: true
+		}));
+		assert!(!err.is_allowed_in_secure_mode(&SecurityStatus {
+			secure_validator_mode: true,
+			can_enable_landlock: false,
+			can_enable_seccomp: true,
+			can_unshare_user_namespace_and_change_root: false
+		}));
+
+		let err = SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(String::new());
+		assert!(err.is_allowed_in_secure_mode(&SecurityStatus {
+			secure_validator_mode: true,
+			can_enable_landlock: true,
+			can_enable_seccomp: false,
+			can_unshare_user_namespace_and_change_root: false
+		}));
+		assert!(!err.is_allowed_in_secure_mode(&SecurityStatus {
+			secure_validator_mode: true,
+			can_enable_landlock: false,
+			can_enable_seccomp: true,
+			can_unshare_user_namespace_and_change_root: false
+		}));
+	}
+}
diff --git a/polkadot/node/core/pvf/src/worker_interface.rs b/polkadot/node/core/pvf/src/worker_interface.rs
index 9d6907c10929f1f7ee8ddb16c7330262a5a9beac..c68ff92b06eb35216a2d9f661d1b60d09847042c 100644
--- a/polkadot/node/core/pvf/src/worker_interface.rs
+++ b/polkadot/node/core/pvf/src/worker_interface.rs
@@ -19,8 +19,9 @@
 use crate::LOG_TARGET;
 use futures::FutureExt as _;
 use futures_timer::Delay;
+use parity_scale_codec::Encode;
 use pin_project::pin_project;
-use polkadot_node_core_pvf_common::SecurityStatus;
+use polkadot_node_core_pvf_common::{SecurityStatus, WorkerHandshake};
 use rand::Rng;
 use std::{
 	fmt, mem,
@@ -68,83 +69,54 @@ pub async fn spawn_with_program_path(
 	let program_path = program_path.into();
 	let worker_dir = WorkerDir::new(debug_id, cache_path).await?;
 	let extra_args: Vec<String> = extra_args.iter().map(|arg| arg.to_string()).collect();
+	// Hack the borrow-checker.
+	let program_path_clone = program_path.clone();
+	let worker_dir_clone = worker_dir.path().to_owned();
+	let extra_args_clone = extra_args.clone();
 
 	with_transient_socket_path(debug_id, |socket_path| {
 		let socket_path = socket_path.to_owned();
-		let worker_dir_path = worker_dir.path().to_owned();
 
 		async move {
-			let listener = UnixListener::bind(&socket_path).map_err(|err| {
-				gum::warn!(
-					target: LOG_TARGET,
-					%debug_id,
-					?program_path,
-					?extra_args,
-					?worker_dir,
-					?socket_path,
-					"cannot bind unix socket: {:?}",
-					err,
-				);
-				SpawnErr::Bind
-			})?;
-
-			let handle = WorkerHandle::spawn(
-				&program_path,
-				&extra_args,
-				&socket_path,
-				&worker_dir_path,
-				security_status,
-			)
-			.map_err(|err| {
-				gum::warn!(
-					target: LOG_TARGET,
-					%debug_id,
-					?program_path,
-					?extra_args,
-					?worker_dir_path,
-					?socket_path,
-					"cannot spawn a worker: {:?}",
-					err,
-				);
-				SpawnErr::ProcessSpawn
-			})?;
+			let listener = match UnixListener::bind(&socket_path) {
+				Ok(ok) => ok,
+				Err(err) => return Err(SpawnErr::Bind { socket_path, err: err.to_string() }),
+			};
+
+			let handle =
+				WorkerHandle::spawn(&program_path, &extra_args, &socket_path, &worker_dir.path())
+					.map_err(|err| SpawnErr::ProcessSpawn { program_path, err: err.to_string() })?;
 
 			futures::select! {
 				accept_result = listener.accept().fuse() => {
-					let (stream, _) = accept_result.map_err(|err| {
-						gum::warn!(
-							target: LOG_TARGET,
-							%debug_id,
-							?program_path,
-							?extra_args,
-							?worker_dir_path,
-							?socket_path,
-							"cannot accept a worker: {:?}",
-							err,
-						);
-						SpawnErr::Accept
-					})?;
+					let (mut stream, _) = accept_result
+						.map_err(|err| SpawnErr::Accept { socket_path, err: err.to_string() })?;
+					send_worker_handshake(&mut stream, WorkerHandshake { security_status })
+						.await
+						.map_err(|err| SpawnErr::Handshake { err: err.to_string() })?;
 					Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
 				}
-				_ = Delay::new(spawn_timeout).fuse() => {
-					gum::warn!(
-						target: LOG_TARGET,
-						%debug_id,
-						?program_path,
-						?extra_args,
-						?worker_dir_path,
-						?socket_path,
-						?spawn_timeout,
-						"spawning and connecting to socket timed out",
-					);
-					Err(SpawnErr::AcceptTimeout)
-				}
+				_ = Delay::new(spawn_timeout).fuse() => Err(SpawnErr::AcceptTimeout{spawn_timeout}),
 			}
 		}
 	})
 	.await
+	.map_err(|err| {
+		gum::warn!(
+			target: LOG_TARGET,
+			%debug_id,
+			?program_path_clone,
+			?extra_args_clone,
+			?worker_dir_clone,
+			"error spawning worker: {}",
+			err,
+		);
+		err
+	})
 }
 
+/// A temporary, random, free path that is necessary only to establish socket communications. If a
+/// directory exists at the path at the end of this function, it is removed then.
 async fn with_transient_socket_path<T, F, Fut>(debug_id: &'static str, f: F) -> Result<T, SpawnErr>
 where
 	F: FnOnce(&Path) -> Fut,
@@ -214,21 +186,26 @@ pub struct IdleWorker {
 	pub worker_dir: WorkerDir,
 }
 
+/// This is publicly exposed only for integration tests.
+///
 /// An error happened during spawning a worker process.
-#[derive(Clone, Debug)]
+#[derive(thiserror::Error, Clone, Debug)]
+#[doc(hidden)]
 pub enum SpawnErr {
-	/// Cannot obtain a temporary path location.
+	#[error("cannot obtain a temporary path location")]
 	TmpPath,
-	/// Cannot bind the socket to the given path.
-	Bind,
-	/// An error happened during accepting a connection to the socket.
-	Accept,
-	/// An error happened during spawning the process.
-	ProcessSpawn,
-	/// The deadline allotted for the worker spawning and connecting to the socket has elapsed.
-	AcceptTimeout,
-	/// Failed to send handshake after successful spawning was signaled
-	Handshake,
+	#[error("cannot bind the socket to the given path {socket_path:?}: {err}")]
+	Bind { socket_path: PathBuf, err: String },
+	#[error(
+		"an error happened during accepting a connection to the socket {socket_path:?}: {err}"
+	)]
+	Accept { socket_path: PathBuf, err: String },
+	#[error("an error happened during spawning the process at path {program_path:?}: {err}")]
+	ProcessSpawn { program_path: PathBuf, err: String },
+	#[error("the deadline {}ms allotted for the worker spawning and connecting to the socket has elapsed", .spawn_timeout.as_millis())]
+	AcceptTimeout { spawn_timeout: Duration },
+	#[error("failed to send handshake after successful spawning was signaled: {err}")]
+	Handshake { err: String },
 }
 
 /// This is a representation of a potentially running worker. Drop it and the process will be
@@ -256,22 +233,7 @@ impl WorkerHandle {
 		extra_args: &[String],
 		socket_path: impl AsRef<Path>,
 		worker_dir_path: impl AsRef<Path>,
-		security_status: SecurityStatus,
 	) -> io::Result<Self> {
-		let security_args = {
-			let mut args = vec![];
-			if security_status.can_enable_landlock {
-				args.push("--can-enable-landlock".to_string());
-			}
-			if security_status.can_enable_seccomp {
-				args.push("--can-enable-seccomp".to_string());
-			}
-			if security_status.can_unshare_user_namespace_and_change_root {
-				args.push("--can-unshare-user-namespace-and-change-root".to_string());
-			}
-			args
-		};
-
 		// Clear all env vars from the spawned process.
 		let mut command = process::Command::new(program.as_ref());
 		command.env_clear();
@@ -286,7 +248,6 @@ impl WorkerHandle {
 			.arg(socket_path.as_ref().as_os_str())
 			.arg("--worker-dir-path")
 			.arg(worker_dir_path.as_ref().as_os_str())
-			.args(&security_args)
 			.stdout(std::process::Stdio::piped())
 			.kill_on_drop(true)
 			.spawn()?;
@@ -386,6 +347,14 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
 	Ok(buf)
 }
 
+/// Sends a handshake with information for the worker.
+async fn send_worker_handshake(
+	stream: &mut UnixStream,
+	handshake: WorkerHandshake,
+) -> io::Result<()> {
+	framed_send(stream, &handshake.encode()).await
+}
+
 /// A temporary worker dir that contains only files needed by the worker. The worker will change its
 /// root (the `/` directory) to this directory; it should have access to no other paths on its
 /// filesystem.
@@ -433,8 +402,6 @@ impl WorkerDir {
 
 // Not async since Rust has trouble with async recursion. There should be few files here anyway.
 //
-// TODO: A lingering malicious job can still access future files in this dir. See
-// <https://github.com/paritytech/polkadot-sdk/issues/574> for how to fully secure this.
 /// Clear the temporary worker dir without deleting it. Not deleting is important because the worker
 /// has mounted its own separate filesystem here.
 ///
diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs
index bd6b04182fd7b4b159db4a706a6a45b44154f299..e82ade5edfa1e51c827abeaf00989edcb3743acb 100644
--- a/polkadot/node/core/pvf/tests/it/main.rs
+++ b/polkadot/node/core/pvf/tests/it/main.rs
@@ -39,6 +39,7 @@ const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6);
 const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6);
 
 struct TestHost {
+	// Keep a reference to the tempdir as it gets deleted on drop.
 	cache_dir: tempfile::TempDir,
 	host: Mutex<ValidationHost>,
 }
@@ -58,6 +59,7 @@ impl TestHost {
 		let mut config = Config::new(
 			cache_dir.path().to_owned(),
 			None,
+			false,
 			prepare_worker_path,
 			execute_worker_path,
 		);
@@ -415,19 +417,28 @@ async fn prepare_can_run_serially() {
 #[tokio::test]
 async fn all_security_features_work() {
 	// Landlock is only available starting Linux 5.13, and we may be testing on an old kernel.
-	let sysinfo = sc_sysinfo::gather_sysinfo();
-	// The version will look something like "5.15.0-87-generic".
-	let version = sysinfo.linux_kernel.unwrap();
-	let version_split: Vec<&str> = version.split(".").collect();
-	let major: u32 = version_split[0].parse().unwrap();
-	let minor: u32 = version_split[1].parse().unwrap();
-	let can_enable_landlock = if major >= 6 { true } else { minor >= 13 };
+	let can_enable_landlock = {
+		let sysinfo = sc_sysinfo::gather_sysinfo();
+		// The version will look something like "5.15.0-87-generic".
+		let version = sysinfo.linux_kernel.unwrap();
+		let version_split: Vec<&str> = version.split(".").collect();
+		let major: u32 = version_split[0].parse().unwrap();
+		let minor: u32 = version_split[1].parse().unwrap();
+		if major >= 6 {
+			true
+		} else if major == 5 {
+			minor >= 13
+		} else {
+			false
+		}
+	};
 
 	let host = TestHost::new().await;
 
 	assert_eq!(
 		host.security_status().await,
 		SecurityStatus {
+			secure_validator_mode: false,
 			can_enable_landlock,
 			can_enable_seccomp: true,
 			can_unshare_user_namespace_and_change_root: true,
diff --git a/polkadot/node/core/pvf/tests/it/worker_common.rs b/polkadot/node/core/pvf/tests/it/worker_common.rs
index 4b736b08ba60537fc3d8d185347591997137e4fa..2c24a15b682df651e1c033f686f0549488b32ffa 100644
--- a/polkadot/node/core/pvf/tests/it/worker_common.rs
+++ b/polkadot/node/core/pvf/tests/it/worker_common.rs
@@ -27,6 +27,7 @@ async fn spawn_immediate_exit() {
 
 	// There's no explicit `exit` subcommand in the worker; it will panic on an unknown
 	// subcommand anyway
+	let spawn_timeout = Duration::from_secs(2);
 	let result = spawn_with_program_path(
 		"integration-test",
 		prepare_worker_path,
@@ -36,23 +37,28 @@ async fn spawn_immediate_exit() {
 		SecurityStatus::default(),
 	)
 	.await;
-	assert!(matches!(result, Err(SpawnErr::AcceptTimeout)));
+	assert!(
+		matches!(result, Err(SpawnErr::AcceptTimeout { spawn_timeout: s }) if s == spawn_timeout)
+	);
 }
 
 #[tokio::test]
 async fn spawn_timeout() {
 	let (_, execute_worker_path) = build_workers_and_get_paths();
 
+	let spawn_timeout = Duration::from_secs(2);
 	let result = spawn_with_program_path(
 		"integration-test",
 		execute_worker_path,
 		&env::temp_dir(),
 		&["test-sleep"],
-		Duration::from_secs(2),
+		spawn_timeout,
 		SecurityStatus::default(),
 	)
 	.await;
-	assert!(matches!(result, Err(SpawnErr::AcceptTimeout)));
+	assert!(
+		matches!(result, Err(SpawnErr::AcceptTimeout { spawn_timeout: s }) if s == spawn_timeout)
+	);
 }
 
 #[tokio::test]
diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs
index 70159301fc4121b1c8565044581bda16a04d79a0..e92e15fc0e0058c4318df3ad88b5f7259dd94774 100644
--- a/polkadot/node/service/src/lib.rs
+++ b/polkadot/node/service/src/lib.rs
@@ -633,6 +633,8 @@ pub struct NewFullParams<OverseerGenerator: OverseerGen> {
 	/// The version of the node. TESTING ONLY: `None` can be passed to skip the node/worker version
 	/// check, both on startup and in the workers.
 	pub node_version: Option<String>,
+	/// Whether the node is attempting to run as a secure validator.
+	pub secure_validator_mode: bool,
 	/// An optional path to a directory containing the workers.
 	pub workers_path: Option<std::path::PathBuf>,
 	/// Optional custom names for the prepare and execute workers.
@@ -722,6 +724,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
 		jaeger_agent,
 		telemetry_worker_handle,
 		node_version,
+		secure_validator_mode,
 		workers_path,
 		workers_names,
 		overseer_gen,
@@ -953,6 +956,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
 				.ok_or(Error::DatabasePathRequired)?
 				.join("pvf-artifacts"),
 			node_version,
+			secure_validator_mode,
 			prep_worker_path,
 			exec_worker_path,
 		})
diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs
index 312113869bc3faa11cc02888993213aff3af49e9..e9423d513bf023c59887c2c8c459eef2299ee269 100644
--- a/polkadot/node/test/service/src/lib.rs
+++ b/polkadot/node/test/service/src/lib.rs
@@ -85,6 +85,7 @@ pub fn new_full(
 			jaeger_agent: None,
 			telemetry_worker_handle: None,
 			node_version: None,
+			secure_validator_mode: false,
 			workers_path,
 			workers_names: None,
 			overseer_gen: polkadot_service::RealOverseerGen,
diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs
index 3984026f5112e429e806b012c27e23f3d52022e7..6ce93ef4ad148341b4aece7668261cb5d1284751 100644
--- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs
+++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs
@@ -69,6 +69,7 @@ fn main() -> Result<()> {
 
 						// Collators don't spawn PVF workers, so we can disable version checks.
 						node_version: None,
+						secure_validator_mode: false,
 						workers_path: None,
 						workers_names: None,
 
diff --git a/polkadot/parachain/test-parachains/undying/collator/src/main.rs b/polkadot/parachain/test-parachains/undying/collator/src/main.rs
index d70a98c7ef6935c1d5861ebe8dfe7f4b5dc80c07..4a15cdd697c4c1fae5a530224884a7293ff82b6a 100644
--- a/polkadot/parachain/test-parachains/undying/collator/src/main.rs
+++ b/polkadot/parachain/test-parachains/undying/collator/src/main.rs
@@ -89,6 +89,7 @@ fn main() -> Result<()> {
 
 						// Collators don't spawn PVF workers, so we can disable version checks.
 						node_version: None,
+						secure_validator_mode: false,
 						workers_path: None,
 						workers_names: None,
 
diff --git a/polkadot/zombienet_tests/misc/0002-update-cmd.sh b/polkadot/zombienet_tests/misc/0002-update-cmd.sh
new file mode 100755
index 0000000000000000000000000000000000000000..7d0dc53ca0df264f9926b0038137c3586a340adc
--- /dev/null
+++ b/polkadot/zombienet_tests/misc/0002-update-cmd.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+
+set -euxo pipefail
+
+if [[ $(grep "insecure-validator-i-know-what-i-do" /cfg/zombie.cmd) ]]; then
+  echo "insecure flag is already part of the cmd";
+else
+  echo -n " --insecure-validator-i-know-what-i-do" >> /cfg/zombie.cmd;
+fi;
+
+echo "update-cmd" > /tmp/zombiepipe;
\ No newline at end of file
diff --git a/polkadot/zombienet_tests/misc/0002-upgrade-node.zndsl b/polkadot/zombienet_tests/misc/0002-upgrade-node.zndsl
index 9191fb027de0d0240805d6cb65e68adfc4f15942..db0a60ac1df617e5c89dc6a1385c4c106c1ead05 100644
--- a/polkadot/zombienet_tests/misc/0002-upgrade-node.zndsl
+++ b/polkadot/zombienet_tests/misc/0002-upgrade-node.zndsl
@@ -13,6 +13,11 @@ dave: parachain 2001 block height is at least 10 within 200 seconds
 # avg 30s in our infra
 alice: run ./0002-download-polkadot-from-pr.sh with "{{POLKADOT_PR_ARTIFACTS_URL}}" within 60 seconds
 bob: run ./0002-download-polkadot-from-pr.sh with "{{POLKADOT_PR_ARTIFACTS_URL}}" within 60 seconds
+# update the cmd to add the flag '--insecure-validator-i-know-what-i-do'
+# once the base image include the version with this flag we can remove this logic.
+alice: run ./0002-update-cmd.sh within 60 seconds
+bob: run ./0002-update-cmd.sh within 60 seconds
+# restart
 alice: restart after 5 seconds
 bob: restart after 5 seconds
 
diff --git a/prdoc/pr_2486.prdoc b/prdoc/pr_2486.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..0d50a7279d10e063ccda9b389ec16632b00207b8
--- /dev/null
+++ b/prdoc/pr_2486.prdoc
@@ -0,0 +1,22 @@
+title: "PVF: Add Secure Validator Mode"
+
+doc:
+  - audience: Node Operator
+    description: |
+      Secure Validator Mode has been enabled for Polkadot validators by default.
+      This enforces PVF validation security, and prevents starting a validator node if some security features are missing on the machine.
+      SVM can be disabled using the `--insecure-validator-i-know-what-i-do` flag.
+
+migrations:
+  db: []
+
+  runtime: []
+
+crates:
+  - name: polkadot-cli
+  - name: polkadot-node-core-pvf
+  - name: polkadot-node-core-pvf-common
+  - name: polkadot-node-core-pvf-prepare-worker
+  - name: polkadot-node-core-pvf-execute-worker
+
+host_functions: []