diff --git a/Cargo.lock b/Cargo.lock
index c57a5ce1393d0e7bf75e0dea2f125d3c3dfe1abd..d83080868221faff7eeeb2d39e1a6cb9bc1ca7e3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8627,6 +8627,17 @@ dependencies = [
  "static_assertions",
 ]
 
+[[package]]
+name = "nix"
+version = "0.27.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
+dependencies = [
+ "bitflags 2.4.0",
+ "cfg-if",
+ "libc",
+]
+
 [[package]]
 name = "no-std-net"
 version = "0.6.0"
@@ -9103,6 +9114,16 @@ dependencies = [
  "num-traits",
 ]
 
+[[package]]
+name = "os_pipe"
+version = "1.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177"
+dependencies = [
+ "libc",
+ "windows-sys 0.48.0",
+]
+
 [[package]]
 name = "os_str_bytes"
 version = "6.5.1"
@@ -12525,6 +12546,9 @@ name = "polkadot-node-core-pvf-execute-worker"
 version = "1.0.0"
 dependencies = [
  "cpu-time",
+ "libc",
+ "nix 0.27.1",
+ "os_pipe",
  "parity-scale-codec",
  "polkadot-node-core-pvf-common",
  "polkadot-parachain-primitives",
@@ -12539,6 +12563,8 @@ dependencies = [
  "cfg-if",
  "criterion 0.4.0",
  "libc",
+ "nix 0.27.1",
+ "os_pipe",
  "parity-scale-codec",
  "polkadot-node-core-pvf-common",
  "polkadot-primitives",
diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs
index 4232e5f1cdd88ac09d2fd5e0b30e811a0590c1a8..a3d6f04731363452532cf6d1e8c879b48a2105aa 100644
--- a/polkadot/node/core/candidate-validation/src/lib.rs
+++ b/polkadot/node/core/candidate-validation/src/lib.rs
@@ -642,14 +642,19 @@ async fn validate_candidate_exhaustive(
 		},
 		Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout)) =>
 			Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
-		Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedError(e))) =>
+		Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
 			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
 		Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =>
 			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
 				"ambiguous worker death".to_string(),
 			))),
-		Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(err))) =>
+		Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(err))) =>
 			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
+
+		Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath(err))) =>
+			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
+				"ambiguous job death: {err}"
+			)))),
 		Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::PrepareError(e))) => {
 			// In principle if preparation of the `WASM` fails, the current candidate can not be the
 			// reason for that. So we can't say whether it is invalid or not. In addition, with
@@ -741,9 +746,9 @@ trait ValidationBackend {
 		};
 
 		// Allow limited retries for each kind of error.
+		let mut num_death_retries_left = 1;
+		let mut num_job_error_retries_left = 1;
 		let mut num_internal_retries_left = 1;
-		let mut num_awd_retries_left = 1;
-		let mut num_panic_retries_left = 1;
 		loop {
 			// Stop retrying if we exceeded the timeout.
 			if total_time_start.elapsed() + retry_delay > exec_timeout {
@@ -752,11 +757,12 @@ trait ValidationBackend {
 
 			match validation_result {
 				Err(ValidationError::InvalidCandidate(
-					WasmInvalidCandidate::AmbiguousWorkerDeath,
-				)) if num_awd_retries_left > 0 => num_awd_retries_left -= 1,
-				Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(_)))
-					if num_panic_retries_left > 0 =>
-					num_panic_retries_left -= 1,
+					WasmInvalidCandidate::AmbiguousWorkerDeath |
+					WasmInvalidCandidate::AmbiguousJobDeath(_),
+				)) if num_death_retries_left > 0 => num_death_retries_left -= 1,
+				Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(_)))
+					if num_job_error_retries_left > 0 =>
+					num_job_error_retries_left -= 1,
 				Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 =>
 					num_internal_retries_left -= 1,
 				_ => break,
diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs
index af530a20c4e0ca61fcf44f4ce0e3121ffeab1363..cab823e1e63774196ea72db6365914e5ba45e472 100644
--- a/polkadot/node/core/candidate-validation/src/tests.rs
+++ b/polkadot/node/core/candidate-validation/src/tests.rs
@@ -695,11 +695,13 @@ fn candidate_validation_retry_panic_errors() {
 
 	let v = executor::block_on(validate_candidate_exhaustive(
 		MockValidateCandidateBackend::with_hardcoded_result_list(vec![
-			Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("foo".into()))),
-			// Throw an AWD error, we should still retry again.
-			Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
+			Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("foo".into()))),
+			// Throw an AJD error, we should still retry again.
+			Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath(
+				"baz".into(),
+			))),
 			// Throw another panic error.
-			Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("bar".into()))),
+			Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("bar".into()))),
 		]),
 		validation_data,
 		validation_code,
@@ -1216,7 +1218,7 @@ fn precheck_properly_classifies_outcomes() {
 
 	inner(Err(PrepareError::Prevalidation("foo".to_owned())), PreCheckOutcome::Invalid);
 	inner(Err(PrepareError::Preparation("bar".to_owned())), PreCheckOutcome::Invalid);
-	inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid);
+	inner(Err(PrepareError::JobError("baz".to_owned())), PreCheckOutcome::Invalid);
 
 	inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed);
 	inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed);
diff --git a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
index d0cefae6cdbf249af27e744e3a8212f0483f018d..378374a10b39a9b014ebe639691f05cc72c4af52 100644
--- a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
+++ b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
@@ -37,7 +37,7 @@ impl TestHost {
 	where
 		F: FnOnce(&mut Config),
 	{
-		let (prepare_worker_path, execute_worker_path) = testing::get_and_check_worker_paths();
+		let (prepare_worker_path, execute_worker_path) = testing::build_workers_and_get_paths(true);
 
 		let cache_dir = tempfile::tempdir().unwrap();
 		let mut config = Config::new(
diff --git a/polkadot/node/core/pvf/common/Cargo.toml b/polkadot/node/core/pvf/common/Cargo.toml
index 7dc8d307026e09f80e8ce0c4d168aa3146714cd7..e3fda06963e31b8b3929ee61076b8e6d0b445419 100644
--- a/polkadot/node/core/pvf/common/Cargo.toml
+++ b/polkadot/node/core/pvf/common/Cargo.toml
@@ -12,6 +12,7 @@ cpu-time = "1.0.0"
 futures = "0.3.21"
 gum = { package = "tracing-gum", path = "../../../gum" }
 libc = "0.2.139"
+thiserror = "1.0.31"
 
 parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
 
@@ -30,7 +31,6 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" }
 [target.'cfg(target_os = "linux")'.dependencies]
 landlock = "0.3.0"
 seccompiler = "0.4.0"
-thiserror = "1.0.31"
 
 [dev-dependencies]
 assert_matches = "1.4.0"
diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs
index 82b56562d8cc38463e8c029aa21ef3e1a68e992c..34475c481f73172b611c1db73142d86f84352a4b 100644
--- a/polkadot/node/core/pvf/common/src/error.rs
+++ b/polkadot/node/core/pvf/common/src/error.rs
@@ -35,9 +35,9 @@ pub enum PrepareError {
 	/// Instantiation of the WASM module instance failed.
 	#[codec(index = 2)]
 	RuntimeConstruction(String),
-	/// An unexpected panic has occurred in the preparation worker.
+	/// An unexpected error has occurred in the preparation job.
 	#[codec(index = 3)]
-	Panic(String),
+	JobError(String),
 	/// Failed to prepare the PVF due to the time limit.
 	#[codec(index = 4)]
 	TimedOut,
@@ -48,12 +48,12 @@ pub enum PrepareError {
 	/// The temporary file for the artifact could not be created at the given cache path. This
 	/// state is reported by the validation host (not by the worker).
 	#[codec(index = 6)]
-	CreateTmpFileErr(String),
+	CreateTmpFile(String),
 	/// The response from the worker is received, but the file cannot be renamed (moved) to the
 	/// final destination location. This state is reported by the validation host (not by the
 	/// worker).
 	#[codec(index = 7)]
-	RenameTmpFileErr {
+	RenameTmpFile {
 		err: String,
 		// Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible
 		// conversion to `Option<String>`.
@@ -68,11 +68,14 @@ pub enum PrepareError {
 	/// reported by the validation host (not by the worker).
 	#[codec(index = 9)]
 	ClearWorkerDir(String),
+	/// The preparation job process died, due to OOM, a seccomp violation, or some other factor.
+	JobDied(String),
+	#[codec(index = 10)]
+	/// Some error occurred when interfacing with the kernel.
+	#[codec(index = 11)]
+	Kernel(String),
 }
 
-/// Pre-encoded length-prefixed `PrepareResult::Err(PrepareError::OutOfMemory)`
-pub const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";
-
 impl PrepareError {
 	/// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those
 	/// errors depend on the PVF itself and the sc-executor/wasmtime logic.
@@ -83,12 +86,15 @@ impl PrepareError {
 	pub fn is_deterministic(&self) -> bool {
 		use PrepareError::*;
 		match self {
-			Prevalidation(_) | Preparation(_) | Panic(_) | OutOfMemory => true,
-			TimedOut |
+			Prevalidation(_) | Preparation(_) | JobError(_) | OutOfMemory => true,
 			IoErr(_) |
-			CreateTmpFileErr(_) |
-			RenameTmpFileErr { .. } |
-			ClearWorkerDir(_) => false,
+			JobDied(_) |
+			CreateTmpFile(_) |
+			RenameTmpFile { .. } |
+			ClearWorkerDir(_) |
+			Kernel(_) => false,
+			// Can occur due to issues with the PVF, but also due to factors like local load.
+			TimedOut => false,
 			// Can occur due to issues with the PVF, but also due to local errors.
 			RuntimeConstruction(_) => false,
 		}
@@ -102,14 +108,16 @@ impl fmt::Display for PrepareError {
 			Prevalidation(err) => write!(f, "prevalidation: {}", err),
 			Preparation(err) => write!(f, "preparation: {}", err),
 			RuntimeConstruction(err) => write!(f, "runtime construction: {}", err),
-			Panic(err) => write!(f, "panic: {}", err),
+			JobError(err) => write!(f, "panic: {}", err),
 			TimedOut => write!(f, "prepare: timeout"),
 			IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err),
-			CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
-			RenameTmpFileErr { err, src, dest } =>
+			JobDied(err) => write!(f, "prepare: prepare job died: {}", err),
+			CreateTmpFile(err) => write!(f, "prepare: error creating tmp file: {}", err),
+			RenameTmpFile { err, src, dest } =>
 				write!(f, "prepare: error renaming tmp file ({:?} -> {:?}): {}", src, dest, err),
 			OutOfMemory => write!(f, "prepare: out of memory"),
 			ClearWorkerDir(err) => write!(f, "prepare: error clearing worker cache: {}", err),
+			Kernel(err) => write!(f, "prepare: error interfacing with the kernel: {}", err),
 		}
 	}
 }
@@ -133,9 +141,9 @@ pub enum InternalValidationError {
 		// conversion to `Option<String>`.
 		path: Option<String>,
 	},
-	/// An error occurred in the CPU time monitor thread. Should be totally unrelated to
-	/// validation.
-	CpuTimeMonitorThread(String),
+	/// Some error occurred when interfacing with the kernel.
+	Kernel(String),
+
 	/// Some non-deterministic preparation error occurred.
 	NonDeterministicPrepareError(PrepareError),
 }
@@ -158,17 +166,8 @@ impl fmt::Display for InternalValidationError {
 				"validation: host could not clear the worker cache ({:?}) after a job: {}",
 				path, err
 			),
-			CpuTimeMonitorThread(err) =>
-				write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err),
+			Kernel(err) => write!(f, "validation: error interfacing with the kernel: {}", err),
 			NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err),
 		}
 	}
 }
-
-#[test]
-fn pre_encoded_payloads() {
-	let oom_enc = PrepareResult::Err(PrepareError::OutOfMemory).encode();
-	let mut oom_payload = oom_enc.len().to_le_bytes().to_vec();
-	oom_payload.extend(oom_enc);
-	assert_eq!(oom_payload, OOM_PAYLOAD);
-}
diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs
index b89ab089af1c02eba401517e8248c292dd7040f8..89e7c8e471a55d36e8441a55a6ba1a533cfc2cbc 100644
--- a/polkadot/node/core/pvf/common/src/execute.rs
+++ b/polkadot/node/core/pvf/common/src/execute.rs
@@ -28,9 +28,9 @@ pub struct Handshake {
 	pub executor_params: ExecutorParams,
 }
 
-/// The response from an execution job on the worker.
+/// The response from the execution worker.
 #[derive(Debug, Encode, Decode)]
-pub enum Response {
+pub enum WorkerResponse {
 	/// The job completed successfully.
 	Ok {
 		/// The result of parachain validation.
@@ -41,14 +41,38 @@ pub enum Response {
 	/// The candidate is invalid.
 	InvalidCandidate(String),
 	/// The job timed out.
-	TimedOut,
-	/// An unexpected panic has occurred in the execution worker.
-	Panic(String),
+	JobTimedOut,
+	/// The job process has died. We must kill the worker just in case.
+	///
+	/// We cannot treat this as an internal error because malicious code may have killed the job.
+	/// We still retry it, because in the non-malicious case it is likely spurious.
+	JobDied(String),
+	/// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic,
+	/// etc.
+	///
+	/// Because malicious code can cause a job error, we must not treat it as an internal error. We
+	/// still retry it, because in the non-malicious case it is likely spurious.
+	JobError(String),
+
 	/// Some internal error occurred.
 	InternalError(InternalValidationError),
 }
 
-impl Response {
+/// The result of a job on the execution worker.
+pub type JobResult = Result<JobResponse, JobError>;
+
+/// The successful response from a job on the execution worker.
+#[derive(Debug, Encode, Decode)]
+pub enum JobResponse {
+	Ok {
+		/// The result of parachain validation.
+		result_descriptor: ValidationResult,
+	},
+	/// The candidate is invalid.
+	InvalidCandidate(String),
+}
+
+impl JobResponse {
 	/// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty).
 	pub fn format_invalid(ctx: &'static str, msg: &str) -> Self {
 		if msg.is_empty() {
@@ -58,3 +82,18 @@ impl Response {
 		}
 	}
 }
+
+/// An unexpected error occurred in the execution job process. Because this comes from the job,
+/// which executes untrusted code, this error must likewise be treated as untrusted. That is, we
+/// cannot raise an internal error based on this.
+#[derive(thiserror::Error, Debug, Encode, Decode)]
+pub enum JobError {
+	#[error("The job timed out")]
+	TimedOut,
+	#[error("An unexpected panic has occurred in the execution job: {0}")]
+	Panic(String),
+	#[error("Could not spawn the requested thread: {0}")]
+	CouldNotSpawnThread(String),
+	#[error("An error occurred in the CPU time monitor thread: {0}")]
+	CpuTimeMonitorThread(String),
+}
diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs
index f6a67b98321a3f7a96cc6319825b5067f7203cf3..86f47acccac63768e14b6fe44c9b44853d2b0ab7 100644
--- a/polkadot/node/core/pvf/common/src/worker/mod.rs
+++ b/polkadot/node/core/pvf/common/src/worker/mod.rs
@@ -205,9 +205,15 @@ impl fmt::Display for WorkerKind {
 	}
 }
 
-// The worker version must be passed in so that we accurately get the version of the worker, and not
-// the version that this crate was compiled with.
-pub fn worker_event_loop<F>(
+// NOTE: The worker version must be passed in so that we accurately get the version of the worker,
+// and not the version that this crate was compiled with.
+//
+// NOTE: This must not spawn any threads due to safety requirements in `event_loop` and to avoid
+// errors in [`security::unshare_user_namespace_and_change_root`].
+//
+/// Initializes the worker process, then runs the given event loop, which spawns a new job process
+/// to securely handle each incoming request.
+pub fn run_worker<F>(
 	worker_kind: WorkerKind,
 	socket_path: PathBuf,
 	#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf,
diff --git a/polkadot/node/core/pvf/execute-worker/Cargo.toml b/polkadot/node/core/pvf/execute-worker/Cargo.toml
index 77a9420961c00a41c5cd3cf3470e57a9a39be2f0..40e0ff4f0a195cc06c41f9fa4295ea4d42acb569 100644
--- a/polkadot/node/core/pvf/execute-worker/Cargo.toml
+++ b/polkadot/node/core/pvf/execute-worker/Cargo.toml
@@ -9,6 +9,9 @@ license.workspace = true
 [dependencies]
 cpu-time = "1.0.0"
 gum = { package = "tracing-gum", path = "../../../gum" }
+os_pipe = "1.1.4"
+nix = { version = "0.27.1", features = ["resource", "process"]}
+libc = "0.2.139"
 
 parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
 
diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs
index 8872f9bc8dd302bba29bd054224c3cecd2fb0d8f..9ec811686b893c67ee6fcef6d494816209552547 100644
--- a/polkadot/node/core/pvf/execute-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs
@@ -25,23 +25,33 @@ pub use polkadot_node_core_pvf_common::{
 const LOG_TARGET: &str = "parachain::pvf-execute-worker";
 
 use cpu_time::ProcessTime;
+use nix::{
+	errno::Errno,
+	sys::{
+		resource::{Usage, UsageWho},
+		wait::WaitStatus,
+	},
+	unistd::{ForkResult, Pid},
+};
+use os_pipe::{self, PipeReader, PipeWriter};
 use parity_scale_codec::{Decode, Encode};
 use polkadot_node_core_pvf_common::{
 	error::InternalValidationError,
-	execute::{Handshake, Response},
+	execute::{Handshake, JobError, JobResponse, JobResult, WorkerResponse},
 	framed_recv_blocking, framed_send_blocking,
 	worker::{
-		cpu_time_monitor_loop, stringify_panic_payload,
+		cpu_time_monitor_loop, run_worker, stringify_panic_payload,
 		thread::{self, WaitOutcome},
-		worker_event_loop, WorkerKind,
+		WorkerKind,
 	},
 };
 use polkadot_parachain_primitives::primitives::ValidationResult;
 use polkadot_primitives::{executor_params::DEFAULT_NATIVE_STACK_MAX, ExecutorParams};
 use std::{
-	io,
+	io::{self, Read},
 	os::unix::net::UnixStream,
 	path::PathBuf,
+	process,
 	sync::{mpsc::channel, Arc},
 	time::Duration,
 };
@@ -105,7 +115,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, Duration)> {
 	Ok((params, execution_timeout))
 }
 
-fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> {
+fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Result<()> {
 	framed_send_blocking(stream, &response.encode())
 }
 
@@ -131,7 +141,7 @@ pub fn worker_entrypoint(
 	worker_version: Option<&str>,
 	security_status: SecurityStatus,
 ) {
-	worker_event_loop(
+	run_worker(
 		WorkerKind::Execute,
 		socket_path,
 		worker_dir_path,
@@ -139,7 +149,7 @@ pub fn worker_entrypoint(
 		worker_version,
 		&security_status,
 		|mut stream, worker_dir_path| {
-			let worker_pid = std::process::id();
+			let worker_pid = process::id();
 			let artifact_path = worker_dir::execute_artifact(&worker_dir_path);
 
 			let Handshake { executor_params } = recv_handshake(&mut stream)?;
@@ -157,7 +167,7 @@ pub fn worker_entrypoint(
 				let compiled_artifact_blob = match std::fs::read(&artifact_path) {
 					Ok(bytes) => bytes,
 					Err(err) => {
-						let response = Response::InternalError(
+						let response = WorkerResponse::InternalError(
 							InternalValidationError::CouldNotOpenFile(err.to_string()),
 						);
 						send_response(&mut stream, response)?;
@@ -165,82 +175,51 @@ pub fn worker_entrypoint(
 					},
 				};
 
-				// Conditional variable to notify us when a thread is done.
-				let condvar = thread::get_condvar();
+				let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
 
-				let cpu_time_start = ProcessTime::now();
+				let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
+					Ok(usage) => usage,
+					Err(errno) => {
+						let response = internal_error_from_errno("getrusage before", errno);
+						send_response(&mut stream, response)?;
+						continue
+					},
+				};
+
+				// SAFETY: new process is spawned within a single threaded process. This invariant
+				// is enforced by tests.
+				let response = match unsafe { nix::unistd::fork() } {
+					Err(errno) => internal_error_from_errno("fork", errno),
+					Ok(ForkResult::Child) => {
+						// Dropping the stream closes the underlying socket. We want to make sure
+						// that the sandboxed child can't get any kind of information from the
+						// outside world. The only IPC it should be able to do is sending its
+						// response over the pipe.
+						drop(stream);
+						// Drop the read end so we don't have too many FDs open.
+						drop(pipe_reader);
 
-				// Spawn a new thread that runs the CPU time monitor.
-				let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
-				let cpu_time_monitor_thread = thread::spawn_worker_thread(
-					"cpu time monitor thread",
-					move || {
-						cpu_time_monitor_loop(
-							cpu_time_start,
+						handle_child_process(
+							pipe_writer,
+							compiled_artifact_blob,
+							executor_params,
+							params,
 							execution_timeout,
-							cpu_time_monitor_rx,
-						)
-					},
-					Arc::clone(&condvar),
-					WaitOutcome::TimedOut,
-				)?;
-
-				let executor_params_2 = executor_params.clone();
-				let execute_thread = thread::spawn_worker_thread_with_stack_size(
-					"execute thread",
-					move || {
-						validate_using_artifact(
-							&compiled_artifact_blob,
-							&executor_params_2,
-							&params,
-							cpu_time_start,
 						)
 					},
-					Arc::clone(&condvar),
-					WaitOutcome::Finished,
-					EXECUTE_THREAD_STACK_SIZE,
-				)?;
-
-				let outcome = thread::wait_for_threads(condvar);
-
-				let response = match outcome {
-					WaitOutcome::Finished => {
-						let _ = cpu_time_monitor_tx.send(());
-						execute_thread
-							.join()
-							.unwrap_or_else(|e| Response::Panic(stringify_panic_payload(e)))
-					},
-					// If the CPU thread is not selected, we signal it to end, the join handle is
-					// dropped and the thread will finish in the background.
-					WaitOutcome::TimedOut => {
-						match cpu_time_monitor_thread.join() {
-							Ok(Some(cpu_time_elapsed)) => {
-								// Log if we exceed the timeout and the other thread hasn't
-								// finished.
-								gum::warn!(
-									target: LOG_TARGET,
-									%worker_pid,
-									"execute job took {}ms cpu time, exceeded execute timeout {}ms",
-									cpu_time_elapsed.as_millis(),
-									execution_timeout.as_millis(),
-								);
-								Response::TimedOut
-							},
-							Ok(None) => Response::InternalError(
-								InternalValidationError::CpuTimeMonitorThread(
-									"error communicating over finished channel".into(),
-								),
-							),
-							Err(e) => Response::InternalError(
-								InternalValidationError::CpuTimeMonitorThread(
-									stringify_panic_payload(e),
-								),
-							),
-						}
+					Ok(ForkResult::Parent { child }) => {
+						// the read end will wait until all write ends have been closed,
+						// this drop is necessary to avoid deadlock
+						drop(pipe_writer);
+
+						handle_parent_process(
+							pipe_reader,
+							child,
+							worker_pid,
+							usage_before,
+							execution_timeout,
+						)?
 					},
-					WaitOutcome::Pending => unreachable!(
-						"we run wait_while until the outcome is no longer pending; qed"
-					),
 				};
 
 				gum::trace!(
@@ -259,27 +238,275 @@ fn validate_using_artifact(
 	compiled_artifact_blob: &[u8],
 	executor_params: &ExecutorParams,
 	params: &[u8],
-	cpu_time_start: ProcessTime,
-) -> Response {
+) -> JobResponse {
 	let descriptor_bytes = match unsafe {
 		// SAFETY: this should be safe since the compiled artifact passed here comes from the
 		//         file created by the prepare workers. These files are obtained by calling
 		//         [`executor_intf::prepare`].
 		execute_artifact(compiled_artifact_blob, executor_params, params)
 	} {
-		Err(err) => return Response::format_invalid("execute", &err),
+		Err(err) => return JobResponse::format_invalid("execute", &err),
 		Ok(d) => d,
 	};
 
 	let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
 		Err(err) =>
-			return Response::format_invalid("validation result decoding failed", &err.to_string()),
+			return JobResponse::format_invalid(
+				"validation result decoding failed",
+				&err.to_string(),
+			),
 		Ok(r) => r,
 	};
 
-	// Include the decoding in the measured time, to prevent any potential attacks exploiting some
-	// bug in decoding.
-	let duration = cpu_time_start.elapsed();
+	JobResponse::Ok { result_descriptor }
+}
+
+/// This is used to handle child process during pvf execute worker.
+/// It execute the artifact and pipes back the response to the parent process
+///
+/// # Arguments
+///
+/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
+///
+/// - `compiled_artifact_blob`: The artifact bytes from compiled by the prepare worker`.
+///
+/// - `executor_params`: Deterministically serialized execution environment semantics.
+///
+/// - `params`: Validation parameters.
+///
+/// - `execution_timeout`: The timeout in `Duration`.
+///
+/// # Returns
+///
+/// - pipe back `JobResponse` to the parent process.
+fn handle_child_process(
+	mut pipe_write: PipeWriter,
+	compiled_artifact_blob: Vec<u8>,
+	executor_params: ExecutorParams,
+	params: Vec<u8>,
+	execution_timeout: Duration,
+) -> ! {
+	gum::debug!(
+		target: LOG_TARGET,
+		worker_job_pid = %process::id(),
+		"worker job: executing artifact",
+	);
+
+	// Conditional variable to notify us when a thread is done.
+	let condvar = thread::get_condvar();
+	let cpu_time_start = ProcessTime::now();
+
+	// Spawn a new thread that runs the CPU time monitor.
+	let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
+	let cpu_time_monitor_thread = thread::spawn_worker_thread(
+		"cpu time monitor thread",
+		move || cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx),
+		Arc::clone(&condvar),
+		WaitOutcome::TimedOut,
+	)
+	.unwrap_or_else(|err| {
+		send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
+	});
+
+	let executor_params_2 = executor_params.clone();
+	let execute_thread = thread::spawn_worker_thread_with_stack_size(
+		"execute thread",
+		move || validate_using_artifact(&compiled_artifact_blob, &executor_params_2, &params),
+		Arc::clone(&condvar),
+		WaitOutcome::Finished,
+		EXECUTE_THREAD_STACK_SIZE,
+	)
+	.unwrap_or_else(|err| {
+		send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
+	});
+
+	let outcome = thread::wait_for_threads(condvar);
+
+	let response = match outcome {
+		WaitOutcome::Finished => {
+			let _ = cpu_time_monitor_tx.send(());
+			execute_thread.join().map_err(|e| JobError::Panic(stringify_panic_payload(e)))
+		},
+		// If the CPU thread is not selected, we signal it to end, the join handle is
+		// dropped and the thread will finish in the background.
+		WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
+			Ok(Some(_cpu_time_elapsed)) => Err(JobError::TimedOut),
+			Ok(None) => Err(JobError::CpuTimeMonitorThread(
+				"error communicating over finished channel".into(),
+			)),
+			Err(e) => Err(JobError::CpuTimeMonitorThread(stringify_panic_payload(e))),
+		},
+		WaitOutcome::Pending =>
+			unreachable!("we run wait_while until the outcome is no longer pending; qed"),
+	};
+
+	send_child_response(&mut pipe_write, response);
+}
+
+/// Waits for child process to finish and handle child response from pipe.
+///
+/// # Arguments
+///
+/// - `pipe_read`: A `PipeReader` used to read data from the child process.
+///
+/// - `child`: The child pid.
+///
+/// - `usage_before`: Resource usage statistics before executing the child process.
+///
+/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`.
+///
+/// # Returns
+///
+/// - The response, either `Ok` or some error state.
+fn handle_parent_process(
+	mut pipe_read: PipeReader,
+	child: Pid,
+	worker_pid: u32,
+	usage_before: Usage,
+	timeout: Duration,
+) -> io::Result<WorkerResponse> {
+	// Read from the child. Don't decode unless the process exited normally, which we check later.
+	let mut received_data = Vec::new();
+	pipe_read
+		.read_to_end(&mut received_data)
+		// Could not decode job response. There is either a bug or the job was hijacked.
+		// Should retry at any rate.
+		.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
+
+	let status = nix::sys::wait::waitpid(child, None);
+	gum::trace!(
+		target: LOG_TARGET,
+		%worker_pid,
+		"execute worker received wait status from job: {:?}",
+		status,
+	);
+
+	let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
+		Ok(usage) => usage,
+		Err(errno) => return Ok(internal_error_from_errno("getrusage after", errno)),
+	};
+
+	// Using `getrusage` is needed to check whether child has timedout since we cannot rely on
+	// child to report its own time.
+	// As `getrusage` returns resource usage from all terminated child processes,
+	// it is necessary to subtract the usage before the current child process to isolate its cpu
+	// time
+	let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
+	if cpu_tv >= timeout {
+		gum::warn!(
+			target: LOG_TARGET,
+			%worker_pid,
+			"execute job took {}ms cpu time, exceeded execute timeout {}ms",
+			cpu_tv.as_millis(),
+			timeout.as_millis(),
+		);
+		return Ok(WorkerResponse::JobTimedOut)
+	}
+
+	match status {
+		Ok(WaitStatus::Exited(_, exit_status)) => {
+			let mut reader = io::BufReader::new(received_data.as_slice());
+			let result = match recv_child_response(&mut reader) {
+				Ok(result) => result,
+				Err(err) => return Ok(WorkerResponse::JobError(err.to_string())),
+			};
+
+			match result {
+				Ok(JobResponse::Ok { result_descriptor }) => {
+					// The exit status should have been zero if no error occurred.
+					if exit_status != 0 {
+						return Ok(WorkerResponse::JobError(format!(
+							"unexpected exit status: {}",
+							exit_status
+						)))
+					}
+
+					Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv })
+				},
+				Ok(JobResponse::InvalidCandidate(err)) => Ok(WorkerResponse::InvalidCandidate(err)),
+				Err(job_error) => {
+					gum::warn!(
+						target: LOG_TARGET,
+						%worker_pid,
+						"execute job error: {}",
+						job_error,
+					);
+					if matches!(job_error, JobError::TimedOut) {
+						Ok(WorkerResponse::JobTimedOut)
+					} else {
+						Ok(WorkerResponse::JobError(job_error.to_string()))
+					}
+				},
+			}
+		},
+		// The job was killed by the given signal.
+		//
+		// The job gets SIGSYS on seccomp violations, but this signal may have been sent for some
+		// other reason, so we still need to check for seccomp violations elsewhere.
+		Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) =>
+			Ok(WorkerResponse::JobDied(format!("received signal: {signal:?}"))),
+		Err(errno) => Ok(internal_error_from_errno("waitpid", errno)),
+
+		// It is within an attacker's power to send an unexpected exit status. So we cannot treat
+		// this as an internal error (which would make us abstain), but must vote against.
+		Ok(unexpected_wait_status) => Ok(WorkerResponse::JobDied(format!(
+			"unexpected status from wait: {unexpected_wait_status:?}"
+		))),
+	}
+}
+
+/// Calculate the total CPU time from the given `usage` structure, returned from
+/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
+/// and system time.
+///
+/// # Arguments
+///
+/// - `rusage`: Contains resource usage information.
+///
+/// # Returns
+///
+/// Returns a `Duration` representing the total CPU time.
+fn get_total_cpu_usage(rusage: Usage) -> Duration {
+	let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
+		(rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
+
+	return Duration::from_micros(micros)
+}
+
+/// Get a job response.
+fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result<JobResult> {
+	let response_bytes = framed_recv_blocking(received_data)?;
+	JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			format!("execute pvf recv_child_response: decode error: {:?}", e),
+		)
+	})
+}
+
+/// Write response to the pipe and exit process after.
+///
+/// # Arguments
+///
+/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
+///
+/// - `response`: Child process response, or error.
+fn send_child_response(pipe_write: &mut PipeWriter, response: JobResult) -> ! {
+	framed_send_blocking(pipe_write, response.encode().as_slice())
+		.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
+
+	if response.is_ok() {
+		process::exit(libc::EXIT_SUCCESS)
+	} else {
+		process::exit(libc::EXIT_FAILURE)
+	}
+}
 
-	Response::Ok { result_descriptor, duration }
+fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerResponse {
+	WorkerResponse::InternalError(InternalValidationError::Kernel(format!(
+		"{}: {}: {}",
+		context,
+		errno,
+		io::Error::last_os_error()
+	)))
 }
diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml
index e21583ecc8b7595599fe775eb003b97f54bd31db..1cd221533f48fd0dda4b88c3593565387eee3893 100644
--- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml
+++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml
@@ -14,6 +14,8 @@ rayon = "1.5.1"
 tracking-allocator = { package = "staging-tracking-allocator", path = "../../../tracking-allocator" }
 tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
 tikv-jemallocator = { version = "0.5.0", optional = true }
+os_pipe = "1.1.4"
+nix = { version = "0.27.1", features = ["resource", "process"]}
 
 parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
 
diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
index 37a4dd06075e9a8aa1b6d90bd80dce8830fb0415..151b54efc2d133fb97a5e855d34ff988c0137c21 100644
--- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
@@ -28,28 +28,40 @@ const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
 use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
 use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
+use libc;
+use nix::{
+	errno::Errno,
+	sys::{
+		resource::{Usage, UsageWho},
+		wait::WaitStatus,
+	},
+	unistd::{ForkResult, Pid},
+};
+use os_pipe::{self, PipeReader, PipeWriter};
 use parity_scale_codec::{Decode, Encode};
 use polkadot_node_core_pvf_common::{
-	error::{PrepareError, PrepareResult, OOM_PAYLOAD},
+	error::{PrepareError, PrepareResult},
 	executor_intf::create_runtime_from_artifact_bytes,
 	framed_recv_blocking, framed_send_blocking,
 	prepare::{MemoryStats, PrepareJobKind, PrepareStats},
 	pvf::PvfPrepData,
 	worker::{
-		cpu_time_monitor_loop, stringify_panic_payload,
-		thread::{self, WaitOutcome},
-		worker_event_loop, WorkerKind,
+		cpu_time_monitor_loop, run_worker, stringify_panic_payload,
+		thread::{self, spawn_worker_thread, WaitOutcome},
+		WorkerKind,
 	},
 	worker_dir, ProcessTime, SecurityStatus,
 };
 use polkadot_primitives::ExecutorParams;
 use std::{
-	fs, io,
+	fs,
+	io::{self, Read},
 	os::{
 		fd::{AsRawFd, RawFd},
 		unix::net::UnixStream,
 	},
 	path::PathBuf,
+	process,
 	sync::{mpsc::channel, Arc},
 	time::Duration,
 };
@@ -65,6 +77,7 @@ static ALLOC: TrackingAllocator<tikv_jemallocator::Jemalloc> =
 static ALLOC: TrackingAllocator<std::alloc::System> = TrackingAllocator(std::alloc::System);
 
 /// Contains the bytes for a successfully compiled artifact.
+#[derive(Encode, Decode)]
 pub struct CompiledArtifact(Vec<u8>);
 
 impl CompiledArtifact {
@@ -80,6 +93,7 @@ impl AsRef<[u8]> for CompiledArtifact {
 	}
 }
 
+/// Get a worker request.
 fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
 	let pvf = framed_recv_blocking(stream)?;
 	let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
@@ -91,6 +105,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
 	Ok(pvf)
 }
 
+/// Send a worker response.
 fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
 	framed_send_blocking(stream, &result.encode())
 }
@@ -111,18 +126,22 @@ fn start_memory_tracking(fd: RawFd, limit: Option<isize>) {
 					// Syscalls never allocate or deallocate, so this is safe.
 					libc::syscall(libc::SYS_write, fd, OOM_PAYLOAD.as_ptr(), OOM_PAYLOAD.len());
 					libc::syscall(libc::SYS_close, fd);
-					libc::syscall(libc::SYS_exit, 1);
+					// Make sure we exit from all threads. Copied from glibc.
+					libc::syscall(libc::SYS_exit_group, 1);
+					loop {
+						libc::syscall(libc::SYS_exit, 1);
+					}
 				}
 				#[cfg(not(target_os = "linux"))]
 				{
 					// Syscalls are not available on MacOS, so we have to use `libc` wrappers.
-					// Technicaly, there may be allocations inside, although they shouldn't be
+					// Technically, there may be allocations inside, although they shouldn't be
 					// there. In that case, we'll see deadlocks on MacOS after the OOM condition
 					// triggered. As we consider running a validator on MacOS unsafe, and this
 					// code is only run by a validator, it's a lesser evil.
 					libc::write(fd, OOM_PAYLOAD.as_ptr().cast(), OOM_PAYLOAD.len());
 					libc::close(fd);
-					std::process::exit(1);
+					libc::_exit(1);
 				}
 			})),
 		);
@@ -155,17 +174,19 @@ fn end_memory_tracking() -> isize {
 ///
 /// 1. Get the code and parameters for preparation from the host.
 ///
-/// 2. Start a memory tracker in a separate thread.
+/// 2. Start a new child process
 ///
-/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads.
+/// 3. Start the memory tracker and the actual preparation in two separate threads.
 ///
 /// 4. Wait on the two threads created in step 3.
 ///
 /// 5. Stop the memory tracker and get the stats.
 ///
-/// 6. If compilation succeeded, write the compiled artifact into a temporary file.
+/// 6. Pipe the result back to the parent process and exit from child process.
 ///
-/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
+/// 7. If compilation succeeded, write the compiled artifact into a temporary file.
+///
+/// 8. Send the result of preparation back to the host. If any error occurred in the above steps, we
 ///    send that in the `PrepareResult`.
 pub fn worker_entrypoint(
 	socket_path: PathBuf,
@@ -174,7 +195,7 @@ pub fn worker_entrypoint(
 	worker_version: Option<&str>,
 	security_status: SecurityStatus,
 ) {
-	worker_event_loop(
+	run_worker(
 		WorkerKind::Prepare,
 		socket_path,
 		worker_dir_path,
@@ -182,7 +203,7 @@ pub fn worker_entrypoint(
 		worker_version,
 		&security_status,
 		|mut stream, worker_dir_path| {
-			let worker_pid = std::process::id();
+			let worker_pid = process::id();
 			let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_dir_path);
 
 			loop {
@@ -197,186 +218,58 @@ pub fn worker_entrypoint(
 				let prepare_job_kind = pvf.prep_kind();
 				let executor_params = pvf.executor_params();
 
-				// Conditional variable to notify us when a thread is done.
-				let condvar = thread::get_condvar();
-
-				// Run the memory tracker in a regular, non-worker thread.
-				#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-				let condvar_memory = Arc::clone(&condvar);
-				#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-				let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
+				let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
 
-				let cpu_time_start = ProcessTime::now();
-
-				// Spawn a new thread that runs the CPU time monitor.
-				let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
-				let cpu_time_monitor_thread = thread::spawn_worker_thread(
-					"cpu time monitor thread",
-					move || {
-						cpu_time_monitor_loop(
-							cpu_time_start,
-							preparation_timeout,
-							cpu_time_monitor_rx,
-						)
+				let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
+					Ok(usage) => usage,
+					Err(errno) => {
+						let result = Err(error_from_errno("getrusage before", errno));
+						send_response(&mut stream, result)?;
+						continue
 					},
-					Arc::clone(&condvar),
-					WaitOutcome::TimedOut,
-				)?;
-
-				start_memory_tracking(
-					stream.as_raw_fd(),
-					executor_params.prechecking_max_memory().map(|v| {
-						v.try_into().unwrap_or_else(|_| {
-							gum::warn!(
-								LOG_TARGET,
-								%worker_pid,
-								"Illegal pre-checking max memory value {} discarded",
-								v,
-							);
-							0
-						})
-					}),
-				);
-
-				// Spawn another thread for preparation.
-				let prepare_thread = thread::spawn_worker_thread(
-					"prepare thread",
-					move || {
-						#[allow(unused_mut)]
-						let mut result = prepare_artifact(pvf, cpu_time_start);
-
-						// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
-						#[cfg(target_os = "linux")]
-						let mut result = result
-							.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread()));
-
-						// If we are pre-checking, check for runtime construction errors.
-						//
-						// As pre-checking is more strict than just preparation in terms of memory
-						// and time, it is okay to do extra checks here. This takes negligible time
-						// anyway.
-						if let PrepareJobKind::Prechecking = prepare_job_kind {
-							result = result.and_then(|output| {
-								runtime_construction_check(
-									output.0.as_ref(),
-									executor_params.as_ref(),
-								)?;
-								Ok(output)
-							});
-						}
-
-						result
-					},
-					Arc::clone(&condvar),
-					WaitOutcome::Finished,
-				)?;
-
-				let outcome = thread::wait_for_threads(condvar);
-
-				let peak_alloc = {
-					let peak = end_memory_tracking();
-					gum::debug!(
-						target: LOG_TARGET,
-						%worker_pid,
-						"prepare job peak allocation is {} bytes",
-						peak,
-					);
-					peak
 				};
 
-				let result = match outcome {
-					WaitOutcome::Finished => {
-						let _ = cpu_time_monitor_tx.send(());
-
-						match prepare_thread.join().unwrap_or_else(|err| {
-							Err(PrepareError::Panic(stringify_panic_payload(err)))
-						}) {
-							Err(err) => {
-								// Serialized error will be written into the socket.
-								Err(err)
-							},
-							Ok(ok) => {
-								cfg_if::cfg_if! {
-									if #[cfg(target_os = "linux")] {
-										let (artifact, cpu_time_elapsed, max_rss) = ok;
-									} else {
-										let (artifact, cpu_time_elapsed) = ok;
-									}
-								}
-
-								// Stop the memory stats worker and get its observed memory stats.
-								#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-								let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid);
-								let memory_stats = MemoryStats {
-									#[cfg(any(
-										target_os = "linux",
-										feature = "jemalloc-allocator"
-									))]
-									memory_tracker_stats,
-									#[cfg(target_os = "linux")]
-									max_rss: extract_max_rss_stat(max_rss, worker_pid),
-									// Negative peak allocation values are legit; they are narrow
-									// corner cases and shouldn't affect overall statistics
-									// significantly
-									peak_tracked_alloc: if peak_alloc > 0 {
-										peak_alloc as u64
-									} else {
-										0u64
-									},
-								};
-
-								// Write the serialized artifact into a temp file.
-								//
-								// PVF host only keeps artifacts statuses in its memory,
-								// successfully compiled code gets stored on the disk (and
-								// consequently deserialized by execute-workers). The prepare worker
-								// is only required to send `Ok` to the pool to indicate the
-								// success.
-
-								gum::debug!(
-									target: LOG_TARGET,
-									%worker_pid,
-									"worker: writing artifact to {}",
-									temp_artifact_dest.display(),
-								);
-								fs::write(&temp_artifact_dest, &artifact)?;
-
-								Ok(PrepareStats { cpu_time_elapsed, memory_stats })
-							},
-						}
+				// SAFETY: new process is spawned within a single threaded process. This invariant
+				// is enforced by tests.
+				let result = match unsafe { nix::unistd::fork() } {
+					Err(errno) => Err(error_from_errno("fork", errno)),
+					Ok(ForkResult::Child) => {
+						// Dropping the stream closes the underlying socket. We want to make sure
+						// that the sandboxed child can't get any kind of information from the
+						// outside world. The only IPC it should be able to do is sending its
+						// response over the pipe.
+						drop(stream);
+						// Drop the read end so we don't have too many FDs open.
+						drop(pipe_reader);
+
+						handle_child_process(
+							pvf,
+							pipe_writer,
+							preparation_timeout,
+							prepare_job_kind,
+							executor_params,
+						)
 					},
-					// If the CPU thread is not selected, we signal it to end, the join handle is
-					// dropped and the thread will finish in the background.
-					WaitOutcome::TimedOut => {
-						match cpu_time_monitor_thread.join() {
-							Ok(Some(cpu_time_elapsed)) => {
-								// Log if we exceed the timeout and the other thread hasn't
-								// finished.
-								gum::warn!(
-									target: LOG_TARGET,
-									%worker_pid,
-									"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
-									cpu_time_elapsed.as_millis(),
-									preparation_timeout.as_millis(),
-								);
-								Err(PrepareError::TimedOut)
-							},
-							Ok(None) => Err(PrepareError::IoErr(
-								"error communicating over closed channel".into(),
-							)),
-							// Errors in this thread are independent of the PVF.
-							Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
-						}
+					Ok(ForkResult::Parent { child }) => {
+						// the read end will wait until all write ends have been closed,
+						// this drop is necessary to avoid deadlock
+						drop(pipe_writer);
+
+						handle_parent_process(
+							pipe_reader,
+							child,
+							temp_artifact_dest.clone(),
+							worker_pid,
+							usage_before,
+							preparation_timeout,
+						)
 					},
-					WaitOutcome::Pending => unreachable!(
-						"we run wait_while until the outcome is no longer pending; qed"
-					),
 				};
 
 				gum::trace!(
 					target: LOG_TARGET,
 					%worker_pid,
-					"worker: sending response to host: {:?}",
+					"worker: sending result to host: {:?}",
 					result
 				);
 				send_response(&mut stream, result)?;
@@ -385,10 +278,7 @@ pub fn worker_entrypoint(
 	);
 }
 
-fn prepare_artifact(
-	pvf: PvfPrepData,
-	cpu_time_start: ProcessTime,
-) -> Result<(CompiledArtifact, Duration), PrepareError> {
+fn prepare_artifact(pvf: PvfPrepData) -> Result<CompiledArtifact, PrepareError> {
 	let blob = match prevalidate(&pvf.code()) {
 		Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
 		Ok(b) => b,
@@ -398,7 +288,6 @@ fn prepare_artifact(
 		Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
 		Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
 	}
-	.map(|artifact| (artifact, cpu_time_start.elapsed()))
 }
 
 /// Try constructing the runtime to catch any instantiation errors during pre-checking.
@@ -412,3 +301,372 @@ fn runtime_construction_check(
 		.map(|_runtime| ())
 		.map_err(|err| PrepareError::RuntimeConstruction(format!("{:?}", err)))
 }
+
+#[derive(Encode, Decode)]
+struct JobResponse {
+	artifact: CompiledArtifact,
+	memory_stats: MemoryStats,
+}
+
+/// This is used to handle child process during pvf prepare worker.
+/// It prepares the artifact and tracks memory stats during preparation
+/// and pipes back the response to the parent process
+///
+/// # Arguments
+///
+/// - `pvf`: `PvfPrepData` structure, containing data to prepare the artifact
+///
+/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
+///
+/// - `preparation_timeout`: The timeout in `Duration`.
+///
+/// - `prepare_job_kind`: The kind of prepare job.
+///
+/// - `executor_params`: Deterministically serialized execution environment semantics.
+///
+/// # Returns
+///
+/// - If any error occur, pipe response back with `PrepareError`.
+///
+/// - If success, pipe back `JobResponse`.
+fn handle_child_process(
+	pvf: PvfPrepData,
+	mut pipe_write: PipeWriter,
+	preparation_timeout: Duration,
+	prepare_job_kind: PrepareJobKind,
+	executor_params: Arc<ExecutorParams>,
+) -> ! {
+	let worker_job_pid = process::id();
+	gum::debug!(
+		target: LOG_TARGET,
+		%worker_job_pid,
+		?prepare_job_kind,
+		?preparation_timeout,
+		"worker job: preparing artifact",
+	);
+
+	// Conditional variable to notify us when a thread is done.
+	let condvar = thread::get_condvar();
+
+	// Run the memory tracker in a regular, non-worker thread.
+	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+	let condvar_memory = Arc::clone(&condvar);
+	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+	let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
+
+	start_memory_tracking(
+		pipe_write.as_raw_fd(),
+		executor_params.prechecking_max_memory().map(|v| {
+			v.try_into().unwrap_or_else(|_| {
+				gum::warn!(
+					LOG_TARGET,
+					%worker_job_pid,
+					"Illegal pre-checking max memory value {} discarded",
+					v,
+				);
+				0
+			})
+		}),
+	);
+
+	let cpu_time_start = ProcessTime::now();
+
+	// Spawn a new thread that runs the CPU time monitor.
+	let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
+	let cpu_time_monitor_thread = thread::spawn_worker_thread(
+		"cpu time monitor thread",
+		move || cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx),
+		Arc::clone(&condvar),
+		WaitOutcome::TimedOut,
+	)
+	.unwrap_or_else(|err| {
+		send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
+	});
+
+	let prepare_thread = spawn_worker_thread(
+		"prepare worker",
+		move || {
+			#[allow(unused_mut)]
+			let mut result = prepare_artifact(pvf);
+
+			// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
+			#[cfg(target_os = "linux")]
+			let mut result = result.map(|artifact| (artifact, get_max_rss_thread()));
+
+			// If we are pre-checking, check for runtime construction errors.
+			//
+			// As pre-checking is more strict than just preparation in terms of memory
+			// and time, it is okay to do extra checks here. This takes negligible time
+			// anyway.
+			if let PrepareJobKind::Prechecking = prepare_job_kind {
+				result = result.and_then(|output| {
+					runtime_construction_check(output.0.as_ref(), &executor_params)?;
+					Ok(output)
+				});
+			}
+			result
+		},
+		Arc::clone(&condvar),
+		WaitOutcome::Finished,
+	)
+	.unwrap_or_else(|err| {
+		send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
+	});
+
+	let outcome = thread::wait_for_threads(condvar);
+
+	let peak_alloc = {
+		let peak = end_memory_tracking();
+		gum::debug!(
+			target: LOG_TARGET,
+			%worker_job_pid,
+			"prepare job peak allocation is {} bytes",
+			peak,
+		);
+		peak
+	};
+
+	let result = match outcome {
+		WaitOutcome::Finished => {
+			let _ = cpu_time_monitor_tx.send(());
+
+			match prepare_thread.join().unwrap_or_else(|err| {
+				send_child_response(
+					&mut pipe_write,
+					Err(PrepareError::JobError(stringify_panic_payload(err))),
+				)
+			}) {
+				Err(err) => Err(err),
+				Ok(ok) => {
+					cfg_if::cfg_if! {
+					if #[cfg(target_os = "linux")] {
+						let (artifact, max_rss) = ok;
+					} else {
+						let artifact = ok;
+					}
+					}
+
+					// Stop the memory stats worker and get its observed memory stats.
+					#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+					let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, process::id());
+
+					let memory_stats = MemoryStats {
+						#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+						memory_tracker_stats,
+						#[cfg(target_os = "linux")]
+						max_rss: extract_max_rss_stat(max_rss, process::id()),
+						// Negative peak allocation values are legit; they are narrow
+						// corner cases and shouldn't affect overall statistics
+						// significantly
+						peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 },
+					};
+
+					Ok(JobResponse { artifact, memory_stats })
+				},
+			}
+		},
+
+		// If the CPU thread is not selected, we signal it to end, the join handle is
+		// dropped and the thread will finish in the background.
+		WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
+			Ok(Some(_cpu_time_elapsed)) => Err(PrepareError::TimedOut),
+			Ok(None) => Err(PrepareError::IoErr("error communicating over closed channel".into())),
+			Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
+		},
+		WaitOutcome::Pending =>
+			unreachable!("we run wait_while until the outcome is no longer pending; qed"),
+	};
+
+	send_child_response(&mut pipe_write, result);
+}
+
+/// Waits for child process to finish and handle child response from pipe.
+///
+/// # Arguments
+///
+/// - `pipe_read`: A `PipeReader` used to read data from the child process.
+///
+/// - `child`: The child pid.
+///
+/// - `temp_artifact_dest`: The destination `PathBuf` to write the temporary artifact file.
+///
+/// - `worker_pid`: The PID of the child process.
+///
+/// - `usage_before`: Resource usage statistics before executing the child process.
+///
+/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`.
+///
+/// # Returns
+///
+/// - If the child send response without an error, this function returns `Ok(PrepareStats)`
+///   containing memory and CPU usage statistics.
+///
+/// - If the child send response with an error, it returns a `PrepareError` with that error.
+///
+/// - If the child process timeout, it returns `PrepareError::TimedOut`.
+fn handle_parent_process(
+	mut pipe_read: PipeReader,
+	child: Pid,
+	temp_artifact_dest: PathBuf,
+	worker_pid: u32,
+	usage_before: Usage,
+	timeout: Duration,
+) -> Result<PrepareStats, PrepareError> {
+	// Read from the child. Don't decode unless the process exited normally, which we check later.
+	let mut received_data = Vec::new();
+	pipe_read
+		.read_to_end(&mut received_data)
+		.map_err(|err| PrepareError::IoErr(err.to_string()))?;
+
+	let status = nix::sys::wait::waitpid(child, None);
+	gum::trace!(
+		target: LOG_TARGET,
+		%worker_pid,
+		"prepare worker received wait status from job: {:?}",
+		status,
+	);
+
+	let usage_after = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
+		.map_err(|errno| error_from_errno("getrusage after", errno))?;
+
+	// Using `getrusage` is needed to check whether child has timedout since we cannot rely on
+	// child to report its own time.
+	// As `getrusage` returns resource usage from all terminated child processes,
+	// it is necessary to subtract the usage before the current child process to isolate its cpu
+	// time
+	let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
+	if cpu_tv >= timeout {
+		gum::warn!(
+			target: LOG_TARGET,
+			%worker_pid,
+			"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
+			cpu_tv.as_millis(),
+			timeout.as_millis(),
+		);
+		return Err(PrepareError::TimedOut)
+	}
+
+	match status {
+		Ok(WaitStatus::Exited(_pid, exit_status)) => {
+			let mut reader = io::BufReader::new(received_data.as_slice());
+			let result = recv_child_response(&mut reader)
+				.map_err(|err| PrepareError::JobError(err.to_string()))?;
+
+			match result {
+				Err(err) => Err(err),
+				Ok(response) => {
+					// The exit status should have been zero if no error occurred.
+					if exit_status != 0 {
+						return Err(PrepareError::JobError(format!(
+							"unexpected exit status: {}",
+							exit_status
+						)))
+					}
+
+					// Write the serialized artifact into a temp file.
+					//
+					// PVF host only keeps artifacts statuses in its memory,
+					// successfully compiled code gets stored on the disk (and
+					// consequently deserialized by execute-workers). The prepare worker
+					// is only required to send `Ok` to the pool to indicate the
+					// success.
+					gum::debug!(
+						target: LOG_TARGET,
+						%worker_pid,
+						"worker: writing artifact to {}",
+						temp_artifact_dest.display(),
+					);
+					// Write to the temp file created by the host.
+					if let Err(err) = fs::write(&temp_artifact_dest, &response.artifact) {
+						return Err(PrepareError::IoErr(err.to_string()))
+					};
+
+					Ok(PrepareStats {
+						memory_stats: response.memory_stats,
+						cpu_time_elapsed: cpu_tv,
+					})
+				},
+			}
+		},
+		// The job was killed by the given signal.
+		//
+		// The job gets SIGSYS on seccomp violations, but this signal may have been sent for some
+		// other reason, so we still need to check for seccomp violations elsewhere.
+		Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) =>
+			Err(PrepareError::JobDied(format!("received signal: {signal:?}"))),
+		Err(errno) => Err(error_from_errno("waitpid", errno)),
+
+		// An attacker can make the child process return any exit status it wants. So we can treat
+		// all unexpected cases the same way.
+		Ok(unexpected_wait_status) => Err(PrepareError::JobDied(format!(
+			"unexpected status from wait: {unexpected_wait_status:?}"
+		))),
+	}
+}
+
+/// Calculate the total CPU time from the given `usage` structure, returned from
+/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
+/// and system time.
+///
+/// # Arguments
+///
+/// - `rusage`: Contains resource usage information.
+///
+/// # Returns
+///
+/// Returns a `Duration` representing the total CPU time.
+fn get_total_cpu_usage(rusage: Usage) -> Duration {
+	let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
+		(rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
+
+	return Duration::from_micros(micros)
+}
+
+/// Get a job response.
+fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result<JobResult> {
+	let response_bytes = framed_recv_blocking(received_data)?;
+	JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			format!("prepare pvf recv_child_response: decode error: {:?}", e),
+		)
+	})
+}
+
+/// Write a job response to the pipe and exit process after.
+///
+/// # Arguments
+///
+/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
+///
+/// - `response`: Child process response
+fn send_child_response(pipe_write: &mut PipeWriter, response: JobResult) -> ! {
+	framed_send_blocking(pipe_write, response.encode().as_slice())
+		.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
+
+	if response.is_ok() {
+		process::exit(libc::EXIT_SUCCESS)
+	} else {
+		process::exit(libc::EXIT_FAILURE)
+	}
+}
+
+fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError {
+	PrepareError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error()))
+}
+
+type JobResult = Result<JobResponse, PrepareError>;
+
+/// Pre-encoded length-prefixed `Result::Err(PrepareError::OutOfMemory)`
+const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";
+
+#[test]
+fn pre_encoded_payloads() {
+	// NOTE: This must match the type of `response` in `send_child_response`.
+	let oom_unencoded: JobResult = Result::Err(PrepareError::OutOfMemory);
+	let oom_encoded = oom_unencoded.encode();
+	// The payload is prefixed with	its length in `framed_send`.
+	let mut oom_payload = oom_encoded.len().to_le_bytes().to_vec();
+	oom_payload.extend(oom_encoded);
+	assert_eq!(oom_payload, OOM_PAYLOAD);
+}
diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs
index 87ef0b54a0406776b865ea143185fd8012ef82e1..7fdb8c56ec92828348e9b7c297b2e698adc98e7e 100644
--- a/polkadot/node/core/pvf/src/error.rs
+++ b/polkadot/node/core/pvf/src/error.rs
@@ -33,36 +33,37 @@ pub enum ValidationError {
 pub enum InvalidCandidate {
 	/// PVF preparation ended up with a deterministic error.
 	PrepareError(String),
-	/// The failure is reported by the execution worker. The string contains the error message.
-	WorkerReportedError(String),
-	/// The worker has died during validation of a candidate. That may fall in one of the following
-	/// categories, which we cannot distinguish programmatically:
+	/// The candidate is reported to be invalid by the execution worker. The string contains the
+	/// error message.
+	WorkerReportedInvalid(String),
+	/// The worker process (not the job) has died during validation of a candidate.
 	///
-	/// (a) Some sort of transient glitch caused the worker process to abort. An example would be
-	/// that the host machine ran out of free memory and the OOM killer started killing the
-	/// processes, and in order to save the parent it will "sacrifice child" first.
-	///
-	/// (b) The candidate triggered a code path that has lead to the process death. For example,
-	///     the PVF found a way to consume unbounded amount of resources and then it either
-	///     exceeded an `rlimit` (if set) or, again, invited OOM killer. Another possibility is a
-	///     bug in wasmtime allowed the PVF to gain control over the execution worker.
-	///
-	/// We attribute such an event to an *invalid candidate* in either case.
-	///
-	/// The rationale for this is that a glitch may lead to unfair rejecting candidate by a single
-	/// validator. If the glitch is somewhat more persistent the validator will reject all
-	/// candidate thrown at it and hopefully the operator notices it by decreased reward
-	/// performance of the validator. On the other hand, if the worker died because of (b) we would
-	/// have better chances to stop the attack.
+	/// It's unlikely that this is caused by malicious code since workers spawn separate job
+	/// processes, and those job processes are sandboxed. But, it is possible. We retry in this
+	/// case, and if the error persists, we assume it's caused by the candidate and vote against.
 	AmbiguousWorkerDeath,
 	/// PVF execution (compilation is not included) took more time than was allotted.
 	HardTimeout,
-	/// A panic occurred and we can't be sure whether the candidate is really invalid or some
-	/// internal glitch occurred. Whenever we are unsure, we can never treat an error as internal
-	/// as we would abstain from voting. This is bad because if the issue was due to the candidate,
-	/// then all validators would abstain, stalling finality on the chain. So we will first retry
-	/// the candidate, and if the issue persists we are forced to vote invalid.
-	Panic(String),
+	/// The job process (not the worker) has died for one of the following reasons:
+	///
+	/// (a) A seccomp violation occurred, most likely due to an attempt by malicious code to
+	/// execute arbitrary code. Note that there is no foolproof way to detect this if the operator
+	/// has seccomp auditing disabled.
+	///
+	/// (b) The host machine ran out of free memory and the OOM killer started killing the
+	/// processes, and in order to save the parent it will "sacrifice child" first.
+	///
+	/// (c) Some other reason, perhaps transient or perhaps caused by malicious code.
+	///
+	/// We cannot treat this as an internal error because malicious code may have caused this.
+	AmbiguousJobDeath(String),
+	/// An unexpected error occurred in the job process and we can't be sure whether the candidate
+	/// is really invalid or some internal glitch occurred. Whenever we are unsure, we can never
+	/// treat an error as internal as we would abstain from voting. This is bad because if the
+	/// issue was due to the candidate, then all validators would abstain, stalling finality on the
+	/// chain. So we will first retry the candidate, and if the issue persists we are forced to
+	/// vote invalid.
+	JobError(String),
 }
 
 impl From<InternalValidationError> for ValidationError {
diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs
index aca604f0de21dd805e10c68c1a8d80c202010e9b..257377df3f480186748bafbd852e7b852801caca 100644
--- a/polkadot/node/core/pvf/src/execute/queue.rs
+++ b/polkadot/node/core/pvf/src/execute/queue.rs
@@ -342,20 +342,27 @@ fn handle_job_finish(
 		},
 		Outcome::InvalidCandidate { err, idle_worker } => (
 			Some(idle_worker),
-			Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(err))),
+			Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedInvalid(err))),
 			None,
 		),
 		Outcome::InternalError { err } => (None, Err(ValidationError::InternalError(err)), None),
+		// Either the worker or the job timed out. Kill the worker in either case. Treated as
+		// definitely-invalid, because if we timed out, there's no time left for a retry.
 		Outcome::HardTimeout =>
 			(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)), None),
 		// "Maybe invalid" errors (will retry).
-		Outcome::IoErr => (
+		Outcome::WorkerIntfErr => (
 			None,
 			Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)),
 			None,
 		),
-		Outcome::Panic { err } =>
-			(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::Panic(err))), None),
+		Outcome::JobDied { err } => (
+			None,
+			Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousJobDeath(err))),
+			None,
+		),
+		Outcome::JobError { err } =>
+			(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::JobError(err))), None),
 	};
 
 	queue.metrics.execute_finished();
diff --git a/polkadot/node/core/pvf/src/execute/worker_intf.rs b/polkadot/node/core/pvf/src/execute/worker_intf.rs
index 61264f7d517d8fe93273b806f0ca7f8c4b942222..bf44ba017250582075cfea4b8c5219cd6ffa0495 100644
--- a/polkadot/node/core/pvf/src/execute/worker_intf.rs
+++ b/polkadot/node/core/pvf/src/execute/worker_intf.rs
@@ -30,7 +30,7 @@ use futures_timer::Delay;
 use parity_scale_codec::{Decode, Encode};
 use polkadot_node_core_pvf_common::{
 	error::InternalValidationError,
-	execute::{Handshake, Response},
+	execute::{Handshake, WorkerResponse},
 	worker_dir, SecurityStatus,
 };
 use polkadot_parachain_primitives::primitives::ValidationResult;
@@ -88,19 +88,26 @@ pub enum Outcome {
 	/// a trap. Errors related to the preparation process are not expected to be encountered by the
 	/// execution workers.
 	InvalidCandidate { err: String, idle_worker: IdleWorker },
+	/// The execution time exceeded the hard limit. The worker is terminated.
+	HardTimeout,
+	/// An I/O error happened during communication with the worker. This may mean that the worker
+	/// process already died. The token is not returned in any case.
+	WorkerIntfErr,
+	/// The job process has died. We must kill the worker just in case.
+	///
+	/// We cannot treat this as an internal error because malicious code may have caused this.
+	JobDied { err: String },
+	/// An unexpected error occurred in the job process.
+	///
+	/// Because malicious code can cause a job error, we must not treat it as an internal error.
+	JobError { err: String },
+
 	/// An internal error happened during the validation. Such an error is most likely related to
 	/// some transient glitch.
 	///
 	/// Should only ever be used for errors independent of the candidate and PVF. Therefore it may
 	/// be a problem with the worker, so we terminate it.
 	InternalError { err: InternalValidationError },
-	/// The execution time exceeded the hard limit. The worker is terminated.
-	HardTimeout,
-	/// An I/O error happened during communication with the worker. This may mean that the worker
-	/// process already died. The token is not returned in any case.
-	IoErr,
-	/// An unexpected panic has occurred in the execution worker.
-	Panic { err: String },
 }
 
 /// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -137,7 +144,7 @@ pub async fn start_work(
 				?error,
 				"failed to send an execute request",
 			);
-			return Outcome::IoErr
+			return Outcome::WorkerIntfErr
 		}
 
 		// We use a generous timeout here. This is in addition to the one in the child process, in
@@ -173,7 +180,7 @@ pub async fn start_work(
 							);
 						}
 
-						return Outcome::IoErr
+						return Outcome::WorkerIntfErr
 					},
 					Ok(response) => {
 						// Check if any syscall violations occurred during the job. For now this is
@@ -189,7 +196,7 @@ pub async fn start_work(
 							);
 						}
 
-						if let Response::Ok{duration, ..} = response {
+						if let WorkerResponse::Ok{duration, ..} = response {
 							if duration > execution_timeout {
 								// The job didn't complete within the timeout.
 								gum::warn!(
@@ -201,7 +208,7 @@ pub async fn start_work(
 								);
 
 								// Return a timeout error.
-								return Outcome::HardTimeout;
+								return Outcome::HardTimeout
 							}
 						}
 
@@ -216,23 +223,25 @@ pub async fn start_work(
 					validation_code_hash = ?artifact.id.code_hash,
 					"execution worker exceeded lenient timeout for execution, child worker likely stalled",
 				);
-				Response::TimedOut
+				WorkerResponse::JobTimedOut
 			},
 		};
 
 		match response {
-			Response::Ok { result_descriptor, duration } => Outcome::Ok {
+			WorkerResponse::Ok { result_descriptor, duration } => Outcome::Ok {
 				result_descriptor,
 				duration,
 				idle_worker: IdleWorker { stream, pid, worker_dir },
 			},
-			Response::InvalidCandidate(err) => Outcome::InvalidCandidate {
+			WorkerResponse::InvalidCandidate(err) => Outcome::InvalidCandidate {
 				err,
 				idle_worker: IdleWorker { stream, pid, worker_dir },
 			},
-			Response::TimedOut => Outcome::HardTimeout,
-			Response::Panic(err) => Outcome::Panic { err },
-			Response::InternalError(err) => Outcome::InternalError { err },
+			WorkerResponse::JobTimedOut => Outcome::HardTimeout,
+			WorkerResponse::JobDied(err) => Outcome::JobDied { err },
+			WorkerResponse::JobError(err) => Outcome::JobError { err },
+
+			WorkerResponse::InternalError(err) => Outcome::InternalError { err },
 		}
 	})
 	.await
@@ -306,9 +315,9 @@ async fn send_request(
 	framed_send(stream, &execution_timeout.encode()).await
 }
 
-async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> {
+async fn recv_response(stream: &mut UnixStream) -> io::Result<WorkerResponse> {
 	let response_bytes = framed_recv(stream).await?;
-	Response::decode(&mut &response_bytes[..]).map_err(|e| {
+	WorkerResponse::decode(&mut response_bytes.as_slice()).map_err(|e| {
 		io::Error::new(
 			io::ErrorKind::Other,
 			format!("execute pvf recv_response: decode error: {:?}", e),
diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs
index 6bb6ca5b64453909c0b5de622dcb725de0006d32..8e02f540d321439696d3cdefeaca0892f495349c 100644
--- a/polkadot/node/core/pvf/src/prepare/pool.rs
+++ b/polkadot/node/core/pvf/src/prepare/pool.rs
@@ -339,17 +339,17 @@ fn handle_mux(
 					spawned,
 					worker,
 					idle,
-					Err(PrepareError::CreateTmpFileErr(err)),
+					Err(PrepareError::CreateTmpFile(err)),
 				),
 				// Return `Concluded`, but do not kill the worker since the error was on the host
 				// side.
-				Outcome::RenameTmpFileErr { worker: idle, result: _, err, src, dest } =>
+				Outcome::RenameTmpFile { worker: idle, result: _, err, src, dest } =>
 					handle_concluded_no_rip(
 						from_pool,
 						spawned,
 						worker,
 						idle,
-						Err(PrepareError::RenameTmpFileErr { err, src, dest }),
+						Err(PrepareError::RenameTmpFile { err, src, dest }),
 					),
 				// Could not clear worker cache. Kill the worker so other jobs can't see the data.
 				Outcome::ClearWorkerDir { err } => {
@@ -387,6 +387,21 @@ fn handle_mux(
 
 					Ok(())
 				},
+				// The worker might still be usable, but we kill it just in case.
+				Outcome::JobDied(err) => {
+					if attempt_retire(metrics, spawned, worker) {
+						reply(
+							from_pool,
+							FromPool::Concluded {
+								worker,
+								rip: true,
+								result: Err(PrepareError::JobDied(err)),
+							},
+						)?;
+					}
+
+					Ok(())
+				},
 				Outcome::TimedOut => {
 					if attempt_retire(metrics, spawned, worker) {
 						reply(
diff --git a/polkadot/node/core/pvf/src/prepare/worker_intf.rs b/polkadot/node/core/pvf/src/prepare/worker_intf.rs
index 0e50caf1feb5c857b30df07681791d21ead4c497..a22fa74b2fe1c8ebea3ebb94715d33ccc16e0a03 100644
--- a/polkadot/node/core/pvf/src/prepare/worker_intf.rs
+++ b/polkadot/node/core/pvf/src/prepare/worker_intf.rs
@@ -79,7 +79,7 @@ pub enum Outcome {
 	CreateTmpFileErr { worker: IdleWorker, err: String },
 	/// The response from the worker is received, but the tmp file cannot be renamed (moved) to the
 	/// final destination location.
-	RenameTmpFileErr {
+	RenameTmpFile {
 		worker: IdleWorker,
 		result: PrepareResult,
 		err: String,
@@ -100,6 +100,10 @@ pub enum Outcome {
 	IoErr(String),
 	/// The worker ran out of memory and is aborting. The worker should be ripped.
 	OutOfMemory,
+	/// The preparation job process died, due to OOM, a seccomp violation, or some other factor.
+	///
+	/// The worker might still be usable, but we kill it just in case.
+	JobDied(String),
 }
 
 /// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -187,21 +191,6 @@ pub async fn start_work(
 						"failed to recv a prepare response: {:?}",
 						err,
 					);
-
-					// The worker died. Check if it was due to a seccomp violation.
-					//
-					// NOTE: Log, but don't change the outcome. Not all validators may have auditing
-					// enabled, so we don't want attackers to abuse a non-deterministic outcome.
-					for syscall in security::check_seccomp_violations_for_worker(audit_log_file, pid).await {
-						gum::error!(
-							target: LOG_TARGET,
-							worker_pid = %pid,
-							%syscall,
-							?pvf,
-							"A forbidden syscall was attempted! This is a violation of our seccomp security policy. Report an issue ASAP!"
-						);
-					}
-
 					Outcome::IoErr(err.to_string())
 				},
 				Err(_) => {
@@ -236,6 +225,7 @@ async fn handle_response(
 		Ok(result) => result,
 		// Timed out on the child. This should already be logged by the child.
 		Err(PrepareError::TimedOut) => return Outcome::TimedOut,
+		Err(PrepareError::JobDied(err)) => return Outcome::JobDied(err),
 		Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory,
 		Err(_) => return Outcome::Concluded { worker, result },
 	};
@@ -272,7 +262,7 @@ async fn handle_response(
 				artifact_path.display(),
 				err,
 			);
-			Outcome::RenameTmpFileErr {
+			Outcome::RenameTmpFile {
 				worker,
 				result,
 				err: format!("{:?}", err),
diff --git a/polkadot/node/core/pvf/src/testing.rs b/polkadot/node/core/pvf/src/testing.rs
index 4c038896f7f9e29aaed7f7325a445d144ba20457..400b65bfe7d832123b8d77f1ffdad867293a6eb2 100644
--- a/polkadot/node/core/pvf/src/testing.rs
+++ b/polkadot/node/core/pvf/src/testing.rs
@@ -14,7 +14,7 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-//! Various things for testing other crates.
+//! Various utilities for testing.
 
 pub use crate::{
 	host::{EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME},
@@ -59,27 +59,33 @@ pub fn validate_candidate(
 ///
 /// NOTE: This should only be called in dev code (tests, benchmarks) as it relies on the relative
 /// paths of the built workers.
-pub fn get_and_check_worker_paths() -> (PathBuf, PathBuf) {
+pub fn build_workers_and_get_paths(is_bench: bool) -> (PathBuf, PathBuf) {
 	// Only needs to be called once for the current process.
 	static WORKER_PATHS: OnceLock<Mutex<(PathBuf, PathBuf)>> = OnceLock::new();
 
-	fn build_workers() {
-		let build_args = vec![
+	fn build_workers(is_bench: bool) {
+		let mut build_args = vec![
 			"build",
 			"--package=polkadot",
 			"--bin=polkadot-prepare-worker",
 			"--bin=polkadot-execute-worker",
 		];
-		let exit_status = std::process::Command::new("cargo")
+		if is_bench {
+			// Benches require --release. Regular tests are debug (no flag needed).
+			build_args.push("--release");
+		}
+		let mut cargo = std::process::Command::new("cargo");
+		let cmd = cargo
 			// wasm runtime not needed
 			.env("SKIP_WASM_BUILD", "1")
 			.args(build_args)
-			.stdout(std::process::Stdio::piped())
-			.status()
-			.expect("Failed to run the build program");
+			.stdout(std::process::Stdio::piped());
+
+		println!("INFO: calling `{cmd:?}`");
+		let exit_status = cmd.status().expect("Failed to run the build program");
 
 		if !exit_status.success() {
-			eprintln!("Failed to build workers: {}", exit_status.code().unwrap());
+			eprintln!("ERROR: Failed to build workers: {}", exit_status.code().unwrap());
 			std::process::exit(1);
 		}
 	}
@@ -95,23 +101,23 @@ pub fn get_and_check_worker_paths() -> (PathBuf, PathBuf) {
 
 		// explain why a build happens
 		if !prepare_worker_path.is_executable() {
-			eprintln!("Prepare worker does not exist or is not executable. Workers directory: {:?}", workers_path);
+			println!("WARN: Prepare worker does not exist or is not executable. Workers directory: {:?}", workers_path);
 		}
 		if !execute_worker_path.is_executable() {
-			eprintln!("Execute worker does not exist or is not executable. Workers directory: {:?}", workers_path);
+			println!("WARN: Execute worker does not exist or is not executable. Workers directory: {:?}", workers_path);
 		}
 		if let Ok(ver) = get_worker_version(&prepare_worker_path) {
 			if ver != NODE_VERSION {
-				eprintln!("Prepare worker version {ver} does not match node version {NODE_VERSION}; worker path: {prepare_worker_path:?}");
+				println!("WARN: Prepare worker version {ver} does not match node version {NODE_VERSION}; worker path: {prepare_worker_path:?}");
 			}
 		}
 		if let Ok(ver) = get_worker_version(&execute_worker_path) {
 			if ver != NODE_VERSION {
-				eprintln!("Execute worker version {ver} does not match node version {NODE_VERSION}; worker path: {execute_worker_path:?}");
+				println!("WARN: Execute worker version {ver} does not match node version {NODE_VERSION}; worker path: {execute_worker_path:?}");
 			}
 		}
 
-		build_workers();
+		build_workers(is_bench);
 
 		Mutex::new((prepare_worker_path, execute_worker_path))
 	});
diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs
index 801b60884fa36ec9d93377978e297e72dfbf4066..d2d842cf84a336ad92421bd0e344b6853638c390 100644
--- a/polkadot/node/core/pvf/tests/it/main.rs
+++ b/polkadot/node/core/pvf/tests/it/main.rs
@@ -19,23 +19,23 @@
 use assert_matches::assert_matches;
 use parity_scale_codec::Encode as _;
 use polkadot_node_core_pvf::{
-	start, testing::get_and_check_worker_paths, Config, InvalidCandidate, Metrics, PrepareError,
+	start, testing::build_workers_and_get_paths, Config, InvalidCandidate, Metrics, PrepareError,
 	PrepareJobKind, PrepareStats, PvfPrepData, ValidationError, ValidationHost,
 	JOB_TIMEOUT_WALL_CLOCK_FACTOR,
 };
 use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
 use polkadot_primitives::{ExecutorParam, ExecutorParams};
-#[cfg(target_os = "linux")]
-use rusty_fork::rusty_fork_test;
 
 use std::time::Duration;
 use tokio::sync::Mutex;
 
 mod adder;
+#[cfg(target_os = "linux")]
+mod process;
 mod worker_common;
 
-const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
-const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(3);
+const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6);
+const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6);
 
 struct TestHost {
 	cache_dir: tempfile::TempDir,
@@ -51,7 +51,7 @@ impl TestHost {
 	where
 		F: FnOnce(&mut Config),
 	{
-		let (prepare_worker_path, execute_worker_path) = get_and_check_worker_paths();
+		let (prepare_worker_path, execute_worker_path) = build_workers_and_get_paths(false);
 
 		let cache_dir = tempfile::tempdir().unwrap();
 		let mut config = Config::new(
@@ -126,7 +126,26 @@ impl TestHost {
 }
 
 #[tokio::test]
-async fn terminates_on_timeout() {
+async fn prepare_job_terminates_on_timeout() {
+	let host = TestHost::new().await;
+
+	let start = std::time::Instant::now();
+	let result = host
+		.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default())
+		.await;
+
+	match result {
+		Err(PrepareError::TimedOut) => {},
+		r => panic!("{:?}", r),
+	}
+
+	let duration = std::time::Instant::now().duration_since(start);
+	assert!(duration >= TEST_PREPARATION_TIMEOUT);
+	assert!(duration < TEST_PREPARATION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
+}
+
+#[tokio::test]
+async fn execute_job_terminates_on_timeout() {
 	let host = TestHost::new().await;
 
 	let start = std::time::Instant::now();
@@ -153,108 +172,6 @@ async fn terminates_on_timeout() {
 	assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
 }
 
-#[cfg(target_os = "linux")]
-fn kill_by_sid_and_name(sid: i32, exe_name: &'static str) {
-	use procfs::process;
-
-	let all_processes: Vec<process::Process> = process::all_processes()
-		.expect("Can't read /proc")
-		.filter_map(|p| match p {
-			Ok(p) => Some(p), // happy path
-			Err(e) => match e {
-				// process vanished during iteration, ignore it
-				procfs::ProcError::NotFound(_) => None,
-				x => {
-					panic!("some unknown error: {}", x);
-				},
-			},
-		})
-		.collect();
-
-	for process in all_processes {
-		if process.stat().unwrap().session == sid &&
-			process.exe().unwrap().to_str().unwrap().contains(exe_name)
-		{
-			assert_eq!(unsafe { libc::kill(process.pid(), 9) }, 0);
-		}
-	}
-}
-
-// Run these tests in their own processes with rusty-fork. They work by each creating a new session,
-// then killing the worker process that matches the session ID and expected worker name.
-#[cfg(target_os = "linux")]
-rusty_fork_test! {
-	// What happens when the prepare worker dies in the middle of a job?
-	#[test]
-	fn prepare_worker_killed_during_job() {
-		const PROCESS_NAME: &'static str = "polkadot-prepare-worker";
-
-		let rt  = tokio::runtime::Runtime::new().unwrap();
-		rt.block_on(async {
-			let host = TestHost::new().await;
-
-			// Create a new session and get the session ID.
-			let sid = unsafe { libc::setsid() };
-			assert!(sid > 0);
-
-			let (result, _) = futures::join!(
-				// Choose a job that would normally take the entire timeout.
-				host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
-				// Run a future that kills the job in the middle of the timeout.
-				async {
-					tokio::time::sleep(TEST_PREPARATION_TIMEOUT / 2).await;
-					kill_by_sid_and_name(sid, PROCESS_NAME);
-				}
-			);
-
-			assert_matches!(result, Err(PrepareError::IoErr(_)));
-		})
-	}
-
-	// What happens when the execute worker dies in the middle of a job?
-	#[test]
-	fn execute_worker_killed_during_job() {
-		const PROCESS_NAME: &'static str = "polkadot-execute-worker";
-
-		let rt  = tokio::runtime::Runtime::new().unwrap();
-		rt.block_on(async {
-			let host = TestHost::new().await;
-
-			// Create a new session and get the session ID.
-			let sid = unsafe { libc::setsid() };
-			assert!(sid > 0);
-
-			// Prepare the artifact ahead of time.
-			let binary = halt::wasm_binary_unwrap();
-			host.precheck_pvf(binary, Default::default()).await.unwrap();
-
-			let (result, _) = futures::join!(
-				// Choose an job that would normally take the entire timeout.
-				host.validate_candidate(
-					binary,
-					ValidationParams {
-						block_data: BlockData(Vec::new()),
-						parent_head: Default::default(),
-						relay_parent_number: 1,
-						relay_parent_storage_root: Default::default(),
-					},
-					Default::default(),
-				),
-				// Run a future that kills the job in the middle of the timeout.
-				async {
-					tokio::time::sleep(TEST_EXECUTION_TIMEOUT / 2).await;
-					kill_by_sid_and_name(sid, PROCESS_NAME);
-				}
-			);
-
-			assert_matches!(
-				result,
-				Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
-			);
-		})
-	}
-}
-
 #[cfg(feature = "ci-only-tests")]
 #[tokio::test]
 async fn ensure_parallel_execution() {
diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs
new file mode 100644
index 0000000000000000000000000000000000000000..725d060ab9160a4757a0d0b844f5162597b8f836
--- /dev/null
+++ b/polkadot/node/core/pvf/tests/it/process.rs
@@ -0,0 +1,383 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Test unexpected behaviors of the spawned processes. We test both worker processes (directly
+//! spawned by the host) and job processes (spawned by the workers to securely perform PVF jobs).
+
+use super::TestHost;
+use assert_matches::assert_matches;
+use polkadot_node_core_pvf::{InvalidCandidate, PrepareError, ValidationError};
+use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams};
+use procfs::process;
+use rusty_fork::rusty_fork_test;
+use std::time::Duration;
+
+const PREPARE_PROCESS_NAME: &'static str = "polkadot-prepare-worker";
+const EXECUTE_PROCESS_NAME: &'static str = "polkadot-execute-worker";
+
+const SIGNAL_KILL: i32 = 9;
+const SIGNAL_STOP: i32 = 19;
+
+fn send_signal_by_sid_and_name(
+	sid: i32,
+	exe_name: &'static str,
+	is_direct_child: bool,
+	signal: i32,
+) {
+	let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
+	assert_eq!(unsafe { libc::kill(process.pid(), signal) }, 0);
+}
+fn get_num_threads_by_sid_and_name(sid: i32, exe_name: &'static str, is_direct_child: bool) -> i64 {
+	let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
+	process.stat().unwrap().num_threads
+}
+
+fn find_process_by_sid_and_name(
+	sid: i32,
+	exe_name: &'static str,
+	is_direct_child: bool,
+) -> process::Process {
+	let all_processes: Vec<process::Process> = process::all_processes()
+		.expect("Can't read /proc")
+		.filter_map(|p| match p {
+			Ok(p) => Some(p), // happy path
+			Err(e) => match e {
+				// process vanished during iteration, ignore it
+				procfs::ProcError::NotFound(_) => None,
+				x => {
+					panic!("some unknown error: {}", x);
+				},
+			},
+		})
+		.collect();
+
+	let mut found = None;
+	for process in all_processes {
+		let stat = process.stat().unwrap();
+
+		if stat.session != sid || !process.exe().unwrap().to_str().unwrap().contains(exe_name) {
+			continue
+		}
+		// The workers are direct children of the current process, the worker job processes are not
+		// (they are children of the workers).
+		let process_is_direct_child = stat.ppid as u32 == std::process::id();
+		if is_direct_child != process_is_direct_child {
+			continue
+		}
+
+		if found.is_some() {
+			panic!("Found more than one process")
+		}
+		found = Some(process);
+	}
+	found.expect("Should have found the expected process")
+}
+
+// Run these tests in their own processes with rusty-fork. They work by each creating a new session,
+// then doing something with the child process that matches the session ID and expected process
+// name.
+rusty_fork_test! {
+	// What happens when the prepare worker (not the job) times out?
+	#[test]
+	fn prepare_worker_timeout() {
+		let rt  = tokio::runtime::Runtime::new().unwrap();
+		rt.block_on(async {
+			let host = TestHost::new().await;
+
+			// Create a new session and get the session ID.
+			let sid = unsafe { libc::setsid() };
+			assert!(sid > 0);
+
+			let (result, _) = futures::join!(
+				// Choose a job that would normally take the entire timeout.
+				host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
+				// Send a stop signal to pause the worker.
+				async {
+					tokio::time::sleep(Duration::from_secs(1)).await;
+					send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true, SIGNAL_STOP);
+				}
+			);
+
+			assert_matches!(result, Err(PrepareError::TimedOut));
+		})
+	}
+
+	// What happens when the execute worker (not the job) times out?
+	#[test]
+	fn execute_worker_timeout() {
+		let rt  = tokio::runtime::Runtime::new().unwrap();
+		rt.block_on(async {
+			let host = TestHost::new().await;
+
+			// Create a new session and get the session ID.
+			let sid = unsafe { libc::setsid() };
+			assert!(sid > 0);
+
+			// Prepare the artifact ahead of time.
+			let binary = halt::wasm_binary_unwrap();
+			host.precheck_pvf(binary, Default::default()).await.unwrap();
+
+			let (result, _) = futures::join!(
+				// Choose an job that would normally take the entire timeout.
+				host.validate_candidate(
+					binary,
+					ValidationParams {
+						block_data: BlockData(Vec::new()),
+						parent_head: Default::default(),
+						relay_parent_number: 1,
+						relay_parent_storage_root: Default::default(),
+					},
+					Default::default(),
+				),
+				// Send a stop signal to pause the worker.
+				async {
+					tokio::time::sleep(Duration::from_secs(1)).await;
+					send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true, SIGNAL_STOP);
+				}
+			);
+
+			assert_matches!(
+				result,
+				Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout))
+			);
+		})
+	}
+
+	// What happens when the prepare worker dies in the middle of a job?
+	#[test]
+	fn prepare_worker_killed_during_job() {
+		let rt  = tokio::runtime::Runtime::new().unwrap();
+		rt.block_on(async {
+			let host = TestHost::new().await;
+
+			// Create a new session and get the session ID.
+			let sid = unsafe { libc::setsid() };
+			assert!(sid > 0);
+
+			let (result, _) = futures::join!(
+				// Choose a job that would normally take the entire timeout.
+				host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
+				// Run a future that kills the job while it's running.
+				async {
+					tokio::time::sleep(Duration::from_secs(1)).await;
+					send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true, SIGNAL_KILL);
+				}
+			);
+
+			assert_matches!(result, Err(PrepareError::IoErr(_)));
+		})
+	}
+
+	// What happens when the execute worker dies in the middle of a job?
+	#[test]
+	fn execute_worker_killed_during_job() {
+		let rt  = tokio::runtime::Runtime::new().unwrap();
+		rt.block_on(async {
+			let host = TestHost::new().await;
+
+			// Create a new session and get the session ID.
+			let sid = unsafe { libc::setsid() };
+			assert!(sid > 0);
+
+			// Prepare the artifact ahead of time.
+			let binary = halt::wasm_binary_unwrap();
+			host.precheck_pvf(binary, Default::default()).await.unwrap();
+
+			let (result, _) = futures::join!(
+				// Choose an job that would normally take the entire timeout.
+				host.validate_candidate(
+					binary,
+					ValidationParams {
+						block_data: BlockData(Vec::new()),
+						parent_head: Default::default(),
+						relay_parent_number: 1,
+						relay_parent_storage_root: Default::default(),
+					},
+					Default::default(),
+				),
+				// Run a future that kills the job while it's running.
+				async {
+					tokio::time::sleep(Duration::from_secs(1)).await;
+					send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true, SIGNAL_KILL);
+				}
+			);
+
+			assert_matches!(
+				result,
+				Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
+			);
+		})
+	}
+
+	// What happens when the forked prepare job dies in the middle of its job?
+	#[test]
+	fn forked_prepare_job_killed_during_job() {
+		let rt  = tokio::runtime::Runtime::new().unwrap();
+		rt.block_on(async {
+			let host = TestHost::new().await;
+
+			// Create a new session and get the session ID.
+			let sid = unsafe { libc::setsid() };
+			assert!(sid > 0);
+
+			let (result, _) = futures::join!(
+				// Choose a job that would normally take the entire timeout.
+				host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
+				// Run a future that kills the job while it's running.
+				async {
+					tokio::time::sleep(Duration::from_secs(1)).await;
+					send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false, SIGNAL_KILL);
+				}
+			);
+
+			// Note that we get a more specific error if the job died than if the whole worker died.
+			assert_matches!(
+				result,
+				Err(PrepareError::JobDied(err)) if err == "received signal: SIGKILL"
+			);
+		})
+	}
+
+	// What happens when the forked execute job dies in the middle of its job?
+	#[test]
+	fn forked_execute_job_killed_during_job() {
+		let rt  = tokio::runtime::Runtime::new().unwrap();
+		rt.block_on(async {
+			let host = TestHost::new().await;
+
+			// Create a new session and get the session ID.
+			let sid = unsafe { libc::setsid() };
+			assert!(sid > 0);
+
+			// Prepare the artifact ahead of time.
+			let binary = halt::wasm_binary_unwrap();
+			host.precheck_pvf(binary, Default::default()).await.unwrap();
+
+			let (result, _) = futures::join!(
+				// Choose a job that would normally take the entire timeout.
+				host.validate_candidate(
+					binary,
+					ValidationParams {
+						block_data: BlockData(Vec::new()),
+						parent_head: Default::default(),
+						relay_parent_number: 1,
+						relay_parent_storage_root: Default::default(),
+					},
+					Default::default(),
+				),
+				// Run a future that kills the job while it's running.
+				async {
+					tokio::time::sleep(Duration::from_secs(1)).await;
+					send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false, SIGNAL_KILL);
+				}
+			);
+
+			// Note that we get a more specific error if the job died than if the whole worker died.
+			assert_matches!(
+				result,
+				Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousJobDeath(err)))
+					if err == "received signal: SIGKILL"
+			);
+		})
+	}
+
+	// Ensure that the spawned prepare worker is single-threaded.
+	//
+	// See `run_worker` for why we need this invariant.
+	#[test]
+	fn ensure_prepare_processes_have_correct_num_threads() {
+		let rt  = tokio::runtime::Runtime::new().unwrap();
+		rt.block_on(async {
+			let host = TestHost::new().await;
+
+			// Create a new session and get the session ID.
+			let sid = unsafe { libc::setsid() };
+			assert!(sid > 0);
+
+			let _ = futures::join!(
+				// Choose a job that would normally take the entire timeout.
+				host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
+				// Run a future that kills the job while it's running.
+				async {
+					tokio::time::sleep(Duration::from_secs(1)).await;
+					assert_eq!(
+						get_num_threads_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true),
+						1
+					);
+					// Child job should have three threads: main thread, execute thread, CPU time
+					// monitor, and memory tracking.
+					assert_eq!(
+						get_num_threads_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false),
+						4
+					);
+
+					// End the test.
+					send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true, SIGNAL_KILL);
+				}
+			);
+		})
+	}
+
+	// Ensure that the spawned execute worker is single-threaded.
+	//
+	// See `run_worker` for why we need this invariant.
+	#[test]
+	fn ensure_execute_processes_have_correct_num_threads() {
+		let rt  = tokio::runtime::Runtime::new().unwrap();
+		rt.block_on(async {
+			let host = TestHost::new().await;
+
+			// Create a new session and get the session ID.
+			let sid = unsafe { libc::setsid() };
+			assert!(sid > 0);
+
+			// Prepare the artifact ahead of time.
+			let binary = halt::wasm_binary_unwrap();
+			host.precheck_pvf(binary, Default::default()).await.unwrap();
+
+			let _ = futures::join!(
+				// Choose a job that would normally take the entire timeout.
+				host.validate_candidate(
+					binary,
+					ValidationParams {
+						block_data: BlockData(Vec::new()),
+						parent_head: Default::default(),
+						relay_parent_number: 1,
+						relay_parent_storage_root: Default::default(),
+					},
+					Default::default(),
+				),
+				// Run a future that tests the thread count while the worker is running.
+				async {
+					tokio::time::sleep(Duration::from_secs(1)).await;
+					assert_eq!(
+						get_num_threads_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true),
+						1
+					);
+					// Child job should have three threads: main thread, execute thread, and CPU
+					// time monitor.
+					assert_eq!(
+						get_num_threads_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false),
+						3
+					);
+
+					// End the test.
+					send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true, SIGNAL_KILL);
+				}
+			);
+		})
+	}
+}
diff --git a/polkadot/node/core/pvf/tests/it/worker_common.rs b/polkadot/node/core/pvf/tests/it/worker_common.rs
index df64980dc8064d80486db002b073992593bbce9c..0d33af7e096cdcb4df8432498adde3a925ff1030 100644
--- a/polkadot/node/core/pvf/tests/it/worker_common.rs
+++ b/polkadot/node/core/pvf/tests/it/worker_common.rs
@@ -15,7 +15,7 @@
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
 use polkadot_node_core_pvf::{
-	testing::{get_and_check_worker_paths, spawn_with_program_path, SpawnErr},
+	testing::{build_workers_and_get_paths, spawn_with_program_path, SpawnErr},
 	SecurityStatus,
 };
 use std::{env, time::Duration};
@@ -23,7 +23,7 @@ use std::{env, time::Duration};
 // Test spawning a program that immediately exits with a failure code.
 #[tokio::test]
 async fn spawn_immediate_exit() {
-	let (prepare_worker_path, _) = get_and_check_worker_paths();
+	let (prepare_worker_path, _) = build_workers_and_get_paths(false);
 
 	// There's no explicit `exit` subcommand in the worker; it will panic on an unknown
 	// subcommand anyway
@@ -41,7 +41,7 @@ async fn spawn_immediate_exit() {
 
 #[tokio::test]
 async fn spawn_timeout() {
-	let (_, execute_worker_path) = get_and_check_worker_paths();
+	let (_, execute_worker_path) = build_workers_and_get_paths(false);
 
 	let result = spawn_with_program_path(
 		"integration-test",
@@ -57,7 +57,7 @@ async fn spawn_timeout() {
 
 #[tokio::test]
 async fn should_connect() {
-	let (prepare_worker_path, _) = get_and_check_worker_paths();
+	let (prepare_worker_path, _) = build_workers_and_get_paths(false);
 
 	let _ = spawn_with_program_path(
 		"integration-test",
diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md b/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md
index 52129f9eb80aff9974717d2f6a8d49060b4dacc4..4dbb7980c1be36c3750cf78c74aabc932a3fd7fc 100644
--- a/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md
+++ b/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md
@@ -1,7 +1,11 @@
 # PVF Host and Workers
 
 The PVF host is responsible for handling requests to prepare and execute PVF
-code blobs, which it sends to PVF workers running in their own child processes.
+code blobs, which it sends to PVF **workers** running in their own child
+processes.
+
+While the workers are generally long-living, they also spawn one-off secure
+**job processes** that perform the jobs. See "Job Processes" section below.
 
 This system has two high-levels goals that we will touch on here: *determinism*
 and *security*.
@@ -36,8 +40,11 @@ execution request:
    not successful.
 2. **Artifact missing:** The prepared artifact might have been deleted due to
    operator error or some bug in the system.
-3. **Panic:** The worker thread panicked for some indeterminate reason, which
-   may or may not be independent of the candidate or PVF.
+3. **Job errors:** For example, the worker thread panicked for some
+   indeterminate reason, which may or may not be independent of the candidate or
+   PVF.
+4. **Internal errors:** See "Internal Errors" section. In this case, after the
+   retry we abstain from voting.
 
 ### Preparation timeouts
 
@@ -62,10 +69,16 @@ more than the CPU time.
 
 ### Internal errors
 
+An internal, or local, error is one that we treat as independent of the PVF
+and/or candidate, i.e. local to the running machine. If this happens, then we
+will first retry the job and if the errors persists, then we simply do not vote.
+This prevents slashes, since otherwise our vote may not agree with that of the
+other validators.
+
 In general, for errors not raising a dispute we have to be very careful. This is
-only sound, if we either:
+only sound, if either:
 
-1. Ruled out that error in pre-checking. If something is not checked in
+1. We ruled out that error in pre-checking. If something is not checked in
    pre-checking, even if independent of the candidate and PVF, we must raise a
    dispute.
 2. We are 100% confident that it is a hardware/local issue: Like corrupted file,
@@ -75,11 +88,11 @@ Reasoning: Otherwise it would be possible to register a PVF where candidates can
 not be checked, but we don't get a dispute - so nobody gets punished. Second, we
 end up with a finality stall that is not going to resolve!
 
-There are some error conditions where we can't be sure whether the candidate is
-really invalid or some internal glitch occurred, e.g. panics. Whenever we are
-unsure, we can never treat an error as internal as we would abstain from voting.
-So we will first retry the candidate, and if the issue persists we are forced to
-vote invalid.
+Note that any error from the job process we cannot treat as internal. The job
+runs untrusted code and an attacker can therefore return arbitrary errors. If
+they were to return errors that we treat as internal, they could make us abstain
+from voting. Since we are unsure if such errors are legitimate, we will first
+retry the candidate, and if the issue persists we are forced to vote invalid.
 
 ## Security
 
@@ -119,6 +132,20 @@ So what are we actually worried about? Things that come to mind:
 6. **Intercepting and manipulating packages** - Effect very similar to the
    above, hard to do without also being able to do 4 or 5.
 
+### Job Processes
+
+As mentioned above, our architecture includes long-living **worker processes**
+and one-off **job processes*. This separation is important so that the handling
+of untrusted code can be limited to the job processes. A hijacked job process
+can therefore not interfere with other jobs running in separate processes.
+
+Furthermore, if an unexpected execution error occurred in the worker and not the
+job, we generally can be confident that it has nothing to do with the candidate,
+so we can abstain from voting. On the other hand, a hijacked job can send back
+erroneous responses for candidates, so we know that we should not abstain from
+voting on such errors from jobs. Otherwise, an attacker could trigger a finality
+stall. (See "Internal Errors" section above.)
+
 ### Restricting file-system access
 
 A basic security mechanism is to make sure that any process directly interfacing
diff --git a/polkadot/scripts/list-syscalls/execute-worker-syscalls b/polkadot/scripts/list-syscalls/execute-worker-syscalls
index 4a7a66181299a6033035e596cb935cd397af80c5..349af783cf1a1340d9467187fde21e1829ec6248 100644
--- a/polkadot/scripts/list-syscalls/execute-worker-syscalls
+++ b/polkadot/scripts/list-syscalls/execute-worker-syscalls
@@ -16,6 +16,7 @@
 16 (ioctl)
 19 (readv)
 20 (writev)
+22 (pipe)
 24 (sched_yield)
 25 (mremap)
 28 (madvise)
@@ -25,7 +26,9 @@
 45 (recvfrom)
 46 (sendmsg)
 56 (clone)
+57 (fork)
 60 (exit)
+61 (wait4)
 62 (kill)
 72 (fcntl)
 79 (getcwd)
@@ -36,6 +39,7 @@
 89 (readlink)
 96 (gettimeofday)
 97 (getrlimit)
+98 (getrusage)
 99 (sysinfo)
 102 (getuid)
 110 (getppid)
@@ -47,6 +51,7 @@
 158 (arch_prctl)
 165 (mount)
 166 (umount2)
+186 (gettid)
 200 (tkill)
 202 (futex)
 204 (sched_getaffinity)
@@ -60,6 +65,7 @@
 263 (unlinkat)
 272 (unshare)
 273 (set_robust_list)
+293 (pipe2)
 302 (prlimit64)
 318 (getrandom)
 319 (memfd_create)
diff --git a/polkadot/scripts/list-syscalls/prepare-worker-syscalls b/polkadot/scripts/list-syscalls/prepare-worker-syscalls
index cab58e06692bbd46d48d36ce1ef9525f9b2e792c..05281b61591a7f9e45efa7d7bb6514fbe118f130 100644
--- a/polkadot/scripts/list-syscalls/prepare-worker-syscalls
+++ b/polkadot/scripts/list-syscalls/prepare-worker-syscalls
@@ -16,6 +16,7 @@
 16 (ioctl)
 19 (readv)
 20 (writev)
+22 (pipe)
 24 (sched_yield)
 25 (mremap)
 28 (madvise)
@@ -25,7 +26,9 @@
 45 (recvfrom)
 46 (sendmsg)
 56 (clone)
+57 (fork)
 60 (exit)
+61 (wait4)
 62 (kill)
 72 (fcntl)
 79 (getcwd)
@@ -48,6 +51,7 @@
 158 (arch_prctl)
 165 (mount)
 166 (umount2)
+186 (gettid)
 200 (tkill)
 202 (futex)
 203 (sched_setaffinity)
@@ -62,6 +66,7 @@
 263 (unlinkat)
 272 (unshare)
 273 (set_robust_list)
+293 (pipe2)
 302 (prlimit64)
 309 (getcpu)
 318 (getrandom)