From e277f95b3b184e7ec49c14c6ab407f23d92a3427 Mon Sep 17 00:00:00 2001
From: Marcin S <marcin@realemail.net>
Date: Fri, 21 Apr 2023 12:40:09 +0200
Subject: [PATCH] PVF: Move PVF workers into separate crate (#7101)

* Move PVF workers into separate crate

* Fix indentation

* Fix compilation errors

* Fix more compilation errors

* Rename `worker.rs` files, make host interface to worker more clear

* Fix more compilation errors

* Fix more compilation errors

* Add link to issue

* Address review comments

* Update comment
---
 polkadot/Cargo.lock                           |  83 ++++---
 polkadot/Cargo.toml                           |   5 +-
 polkadot/cli/Cargo.toml                       |   5 +-
 polkadot/cli/src/command.rs                   |   4 +-
 polkadot/cli/src/host_perf_check.rs           |   1 -
 polkadot/node/core/pvf/Cargo.toml             |  27 +--
 polkadot/node/core/pvf/src/artifacts.rs       |   2 +
 polkadot/node/core/pvf/src/error.rs           |  16 +-
 polkadot/node/core/pvf/src/execute/mod.rs     |   6 +-
 polkadot/node/core/pvf/src/execute/queue.rs   |  15 +-
 .../src/execute/{worker.rs => worker_intf.rs} | 183 ++-------------
 polkadot/node/core/pvf/src/lib.rs             |  32 +--
 polkadot/node/core/pvf/src/prepare/mod.rs     |  35 ++-
 polkadot/node/core/pvf/src/prepare/pool.rs    |   6 +-
 polkadot/node/core/pvf/src/prepare/queue.rs   |   2 +-
 .../src/prepare/{worker.rs => worker_intf.rs} | 196 +---------------
 polkadot/node/core/pvf/src/pvf.rs             |  17 +-
 polkadot/node/core/pvf/src/worker_common.rs   | 120 +---------
 polkadot/node/core/pvf/worker/Cargo.toml      |  49 ++++
 .../pvf/{ => worker}/bin/puppet_worker.rs     |   2 +-
 polkadot/node/core/pvf/worker/build.rs        |  19 ++
 polkadot/node/core/pvf/worker/src/common.rs   | 142 +++++++++++
 polkadot/node/core/pvf/worker/src/execute.rs  | 175 ++++++++++++++
 .../pvf/{ => worker}/src/executor_intf.rs     |   0
 polkadot/node/core/pvf/worker/src/lib.rs      |  73 ++++++
 .../prepare => worker/src}/memory_stats.rs    |  26 +-
 polkadot/node/core/pvf/worker/src/prepare.rs  | 222 ++++++++++++++++++
 .../node/core/pvf/{ => worker}/src/testing.rs |   4 -
 .../core/pvf/{ => worker}/tests/it/adder.rs   |   0
 .../core/pvf/{ => worker}/tests/it/main.rs    |   0
 .../{ => worker}/tests/it/worker_common.rs    |   2 +-
 polkadot/node/malus/Cargo.toml                |   2 +-
 polkadot/node/malus/src/malus.rs              |  10 +-
 .../node/test/performance-test/Cargo.toml     |   5 +-
 .../performance-test/src/gen_ref_constants.rs |   1 -
 .../node/test/performance-test/src/lib.rs     |   6 +-
 .../test-parachains/adder/collator/Cargo.toml |   2 +-
 .../adder/collator/bin/puppet_worker.rs       |   2 +-
 .../test-parachains/adder/collator/src/lib.rs |   2 +-
 .../undying/collator/Cargo.toml               |   2 +-
 .../undying/collator/bin/puppet_worker.rs     |   2 +-
 .../undying/collator/src/lib.rs               |   2 +-
 42 files changed, 878 insertions(+), 627 deletions(-)
 rename polkadot/node/core/pvf/src/execute/{worker.rs => worker_intf.rs} (55%)
 rename polkadot/node/core/pvf/src/prepare/{worker.rs => worker_intf.rs} (55%)
 create mode 100644 polkadot/node/core/pvf/worker/Cargo.toml
 rename polkadot/node/core/pvf/{ => worker}/bin/puppet_worker.rs (92%)
 create mode 100644 polkadot/node/core/pvf/worker/build.rs
 create mode 100644 polkadot/node/core/pvf/worker/src/common.rs
 create mode 100644 polkadot/node/core/pvf/worker/src/execute.rs
 rename polkadot/node/core/pvf/{ => worker}/src/executor_intf.rs (100%)
 create mode 100644 polkadot/node/core/pvf/worker/src/lib.rs
 rename polkadot/node/core/pvf/{src/prepare => worker/src}/memory_stats.rs (89%)
 create mode 100644 polkadot/node/core/pvf/worker/src/prepare.rs
 rename polkadot/node/core/pvf/{ => worker}/src/testing.rs (96%)
 rename polkadot/node/core/pvf/{ => worker}/tests/it/adder.rs (100%)
 rename polkadot/node/core/pvf/{ => worker}/tests/it/main.rs (100%)
 rename polkadot/node/core/pvf/{ => worker}/tests/it/worker_common.rs (94%)

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 9381afc4521..23abf35bbab 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -3123,6 +3123,12 @@ version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
 
+[[package]]
+name = "hex-literal"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0"
+
 [[package]]
 name = "hex-literal"
 version = "0.4.1"
@@ -3693,7 +3699,7 @@ dependencies = [
  "frame-system-benchmarking",
  "frame-system-rpc-runtime-api",
  "frame-try-runtime",
- "hex-literal",
+ "hex-literal 0.4.1",
  "kusama-runtime-constants",
  "log",
  "pallet-authority-discovery",
@@ -6580,7 +6586,7 @@ dependencies = [
  "nix 0.26.2",
  "polkadot-cli",
  "polkadot-core-primitives",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
  "polkadot-overseer",
  "substrate-rpc-client",
  "tempfile",
@@ -6706,7 +6712,7 @@ dependencies = [
  "futures",
  "log",
  "polkadot-client",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
  "polkadot-node-metrics",
  "polkadot-performance-test",
  "polkadot-service",
@@ -6720,6 +6726,7 @@ dependencies = [
  "sp-core",
  "sp-io",
  "sp-keyring",
+ "sp-maybe-compressed-blob",
  "substrate-build-script-utils",
  "thiserror",
  "try-runtime-cli",
@@ -7175,10 +7182,9 @@ version = "0.9.41"
 dependencies = [
  "always-assert",
  "assert_matches",
- "cpu-time",
  "futures",
  "futures-timer",
- "hex-literal",
+ "hex-literal 0.3.4",
  "libc",
  "parity-scale-codec",
  "pin-project",
@@ -7188,22 +7194,13 @@ dependencies = [
  "polkadot-parachain",
  "polkadot-primitives",
  "rand 0.8.5",
- "rayon",
- "sc-executor",
- "sc-executor-common",
- "sc-executor-wasmtime",
  "slotmap",
  "sp-core",
- "sp-externalities",
- "sp-io",
  "sp-maybe-compressed-blob",
  "sp-tracing",
  "sp-wasm-interface",
  "substrate-build-script-utils",
  "tempfile",
- "test-parachain-adder",
- "test-parachain-halt",
- "tikv-jemalloc-ctl",
  "tokio",
  "tracing-gum",
 ]
@@ -7231,6 +7228,36 @@ dependencies = [
  "tracing-gum",
 ]
 
+[[package]]
+name = "polkadot-node-core-pvf-worker"
+version = "0.9.41"
+dependencies = [
+ "assert_matches",
+ "cpu-time",
+ "futures",
+ "libc",
+ "parity-scale-codec",
+ "polkadot-node-core-pvf",
+ "polkadot-parachain",
+ "polkadot-primitives",
+ "rayon",
+ "sc-executor",
+ "sc-executor-common",
+ "sc-executor-wasmtime",
+ "sp-core",
+ "sp-externalities",
+ "sp-io",
+ "sp-maybe-compressed-blob",
+ "sp-tracing",
+ "substrate-build-script-utils",
+ "tempfile",
+ "test-parachain-adder",
+ "test-parachain-halt",
+ "tikv-jemalloc-ctl",
+ "tokio",
+ "tracing-gum",
+]
+
 [[package]]
 name = "polkadot-node-core-runtime-api"
 version = "0.9.41"
@@ -7480,10 +7507,12 @@ dependencies = [
  "kusama-runtime",
  "log",
  "polkadot-erasure-coding",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
  "polkadot-node-primitives",
  "polkadot-primitives",
  "quote",
+ "sc-executor-common",
+ "sp-maybe-compressed-blob",
  "thiserror",
 ]
 
@@ -7492,7 +7521,7 @@ name = "polkadot-primitives"
 version = "0.9.41"
 dependencies = [
  "bitvec",
- "hex-literal",
+ "hex-literal 0.4.1",
  "parity-scale-codec",
  "polkadot-core-primitives",
  "polkadot-parachain",
@@ -7569,7 +7598,7 @@ dependencies = [
  "frame-system-benchmarking",
  "frame-system-rpc-runtime-api",
  "frame-try-runtime",
- "hex-literal",
+ "hex-literal 0.4.1",
  "log",
  "pallet-authority-discovery",
  "pallet-authorship",
@@ -7666,7 +7695,7 @@ dependencies = [
  "frame-support",
  "frame-support-test",
  "frame-system",
- "hex-literal",
+ "hex-literal 0.4.1",
  "impl-trait-for-tuples",
  "libsecp256k1",
  "log",
@@ -7744,7 +7773,7 @@ dependencies = [
  "frame-support-test",
  "frame-system",
  "futures",
- "hex-literal",
+ "hex-literal 0.4.1",
  "log",
  "pallet-authority-discovery",
  "pallet-authorship",
@@ -7795,7 +7824,7 @@ dependencies = [
  "frame-support",
  "frame-system-rpc-runtime-api",
  "futures",
- "hex-literal",
+ "hex-literal 0.4.1",
  "kusama-runtime",
  "kusama-runtime-constants",
  "kvdb",
@@ -7981,7 +8010,7 @@ dependencies = [
  "polkadot-node-core-backing",
  "polkadot-node-core-candidate-validation",
  "polkadot-node-core-dispute-coordinator",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
  "polkadot-node-primitives",
  "polkadot-node-subsystem",
  "polkadot-node-subsystem-test-helpers",
@@ -8004,7 +8033,7 @@ dependencies = [
  "frame-support",
  "frame-system",
  "frame-system-rpc-runtime-api",
- "hex-literal",
+ "hex-literal 0.4.1",
  "log",
  "pallet-authority-discovery",
  "pallet-authorship",
@@ -8887,7 +8916,7 @@ dependencies = [
  "frame-system-benchmarking",
  "frame-system-rpc-runtime-api",
  "frame-try-runtime",
- "hex-literal",
+ "hex-literal 0.4.1",
  "log",
  "pallet-authority-discovery",
  "pallet-authorship",
@@ -12041,7 +12070,7 @@ dependencies = [
  "log",
  "parity-scale-codec",
  "polkadot-cli",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
  "polkadot-node-primitives",
  "polkadot-node-subsystem",
  "polkadot-parachain",
@@ -12089,7 +12118,7 @@ dependencies = [
  "log",
  "parity-scale-codec",
  "polkadot-cli",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
  "polkadot-node-primitives",
  "polkadot-node-subsystem",
  "polkadot-parachain",
@@ -13649,7 +13678,7 @@ dependencies = [
  "frame-system-benchmarking",
  "frame-system-rpc-runtime-api",
  "frame-try-runtime",
- "hex-literal",
+ "hex-literal 0.4.1",
  "log",
  "pallet-authority-discovery",
  "pallet-authorship",
@@ -14048,7 +14077,7 @@ dependencies = [
  "bounded-collections",
  "derivative",
  "hex",
- "hex-literal",
+ "hex-literal 0.4.1",
  "impl-trait-for-tuples",
  "log",
  "parity-scale-codec",
diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml
index ded5dda0aa2..886b489a502 100644
--- a/polkadot/Cargo.toml
+++ b/polkadot/Cargo.toml
@@ -24,7 +24,7 @@ tikv-jemallocator = "0.5.0"
 
 # Crates in our workspace, defined as dependencies so we can pass them feature flags.
 polkadot-cli = { path = "cli", features = [ "kusama-native", "westend-native", "rococo-native" ]  }
-polkadot-node-core-pvf = { path = "node/core/pvf" }
+polkadot-node-core-pvf-worker = { path = "node/core/pvf/worker" }
 polkadot-overseer = { path = "node/overseer" }
 
 [dev-dependencies]
@@ -80,6 +80,7 @@ members = [
 	"node/core/parachains-inherent",
 	"node/core/provisioner",
 	"node/core/pvf",
+	"node/core/pvf/worker",
 	"node/core/pvf-checker",
 	"node/core/runtime-api",
 	"node/network/approval-distribution",
@@ -206,7 +207,7 @@ try-runtime = [ "polkadot-cli/try-runtime" ]
 fast-runtime = [ "polkadot-cli/fast-runtime" ]
 runtime-metrics = [ "polkadot-cli/runtime-metrics" ]
 pyroscope = ["polkadot-cli/pyroscope"]
-jemalloc-allocator = ["polkadot-node-core-pvf/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"]
+jemalloc-allocator = ["polkadot-node-core-pvf-worker/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"]
 
 # Configuration for building a .deb package - for use with `cargo-deb`
 [package.metadata.deb]
diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml
index 01247bbc996..4d08ee18ed1 100644
--- a/polkadot/cli/Cargo.toml
+++ b/polkadot/cli/Cargo.toml
@@ -22,12 +22,13 @@ pyro = { package = "pyroscope", version = "0.3.1", optional = true }
 
 service = { package = "polkadot-service", path = "../node/service", default-features = false, optional = true }
 polkadot-client = { path = "../node/client", optional = true }
-polkadot-node-core-pvf = { path = "../node/core/pvf", optional = true }
+polkadot-node-core-pvf-worker = { path = "../node/core/pvf/worker", optional = true }
 polkadot-performance-test = { path = "../node/test/performance-test", optional = true }
 
 sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
 frame-benchmarking-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
 try-runtime-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
 sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
@@ -52,7 +53,7 @@ cli = [
 	"frame-benchmarking-cli",
 	"try-runtime-cli",
 	"polkadot-client",
-	"polkadot-node-core-pvf",
+	"polkadot-node-core-pvf-worker",
 ]
 runtime-benchmarks = [
 	"service/runtime-benchmarks",
diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs
index 2f0bc9e2f85..c0e96de2a54 100644
--- a/polkadot/cli/src/command.rs
+++ b/polkadot/cli/src/command.rs
@@ -494,7 +494,7 @@ pub fn run() -> Result<()> {
 
 			#[cfg(not(target_os = "android"))]
 			{
-				polkadot_node_core_pvf::prepare_worker_entrypoint(
+				polkadot_node_core_pvf_worker::prepare_worker_entrypoint(
 					&cmd.socket_path,
 					Some(&cmd.node_impl_version),
 				);
@@ -516,7 +516,7 @@ pub fn run() -> Result<()> {
 
 			#[cfg(not(target_os = "android"))]
 			{
-				polkadot_node_core_pvf::execute_worker_entrypoint(
+				polkadot_node_core_pvf_worker::execute_worker_entrypoint(
 					&cmd.socket_path,
 					Some(&cmd.node_impl_version),
 				);
diff --git a/polkadot/cli/src/host_perf_check.rs b/polkadot/cli/src/host_perf_check.rs
index 1225c4708a3..adfdebce677 100644
--- a/polkadot/cli/src/host_perf_check.rs
+++ b/polkadot/cli/src/host_perf_check.rs
@@ -15,7 +15,6 @@
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
 use log::info;
-use polkadot_node_core_pvf::sp_maybe_compressed_blob;
 use polkadot_performance_test::{
 	measure_erasure_coding, measure_pvf_prepare, PerfCheckError, ERASURE_CODING_N_VALIDATORS,
 	ERASURE_CODING_TIME_LIMIT, PVF_PREPARE_TIME_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml
index 20dcd5afdfa..026930758b8 100644
--- a/polkadot/node/core/pvf/Cargo.toml
+++ b/polkadot/node/core/pvf/Cargo.toml
@@ -4,24 +4,15 @@ version.workspace = true
 authors.workspace = true
 edition.workspace = true
 
-[[bin]]
-name = "puppet_worker"
-path = "bin/puppet_worker.rs"
-
 [dependencies]
 always-assert = "0.1"
-assert_matches = "1.4.0"
-cpu-time = "1.0.0"
 futures = "0.3.21"
 futures-timer = "3.0.2"
 gum = { package = "tracing-gum", path = "../../gum" }
 libc = "0.2.139"
 pin-project = "1.0.9"
 rand = "0.8.5"
-rayon = "1.5.1"
 slotmap = "1.0"
-tempfile = "3.3.0"
-tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
 tokio = { version = "1.24.2", features = ["fs", "process"] }
 
 parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] }
@@ -30,13 +21,8 @@ polkadot-parachain = { path = "../../../parachain" }
 polkadot-core-primitives = { path = "../../../core-primitives" }
 polkadot-node-metrics = { path = "../../metrics" }
 polkadot-node-primitives = { path = "../../primitives" }
-
 polkadot-primitives = { path = "../../../primitives" }
-sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-externalities = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
 sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -45,14 +31,7 @@ sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master
 [build-dependencies]
 substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
 
-[target.'cfg(target_os = "linux")'.dependencies]
-tikv-jemalloc-ctl = "0.5.0"
-
 [dev-dependencies]
-adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" }
-halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }
-hex-literal = "0.4.1"
+assert_matches = "1.4.0"
+hex-literal = "0.3.4"
 tempfile = "3.3.0"
-
-[features]
-jemalloc-allocator = ["dep:tikv-jemalloc-ctl"]
diff --git a/polkadot/node/core/pvf/src/artifacts.rs b/polkadot/node/core/pvf/src/artifacts.rs
index f0f8a0f8af2..d5a660cc3aa 100644
--- a/polkadot/node/core/pvf/src/artifacts.rs
+++ b/polkadot/node/core/pvf/src/artifacts.rs
@@ -65,9 +65,11 @@ use std::{
 	time::{Duration, SystemTime},
 };
 
+/// Contains the bytes for a successfully compiled artifact.
 pub struct CompiledArtifact(Vec<u8>);
 
 impl CompiledArtifact {
+	/// Creates a `CompiledArtifact`.
 	pub fn new(code: Vec<u8>) -> Self {
 		Self(code)
 	}
diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs
index 662fcc22cd3..21f23d515fd 100644
--- a/polkadot/node/core/pvf/src/error.rs
+++ b/polkadot/node/core/pvf/src/error.rs
@@ -16,7 +16,7 @@
 
 use crate::prepare::PrepareStats;
 use parity_scale_codec::{Decode, Encode};
-use std::{any::Any, fmt};
+use std::fmt;
 
 /// Result of PVF preparation performed by the validation host. Contains stats about the preparation if
 /// successful
@@ -126,17 +126,3 @@ impl From<PrepareError> for ValidationError {
 		}
 	}
 }
-
-/// Attempt to convert an opaque panic payload to a string.
-///
-/// This is a best effort, and is not guaranteed to provide the most accurate value.
-pub(crate) fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
-	match payload.downcast::<&'static str>() {
-		Ok(msg) => msg.to_string(),
-		Err(payload) => match payload.downcast::<String>() {
-			Ok(msg) => *msg,
-			// At least we tried...
-			Err(_) => "unknown panic payload".to_string(),
-		},
-	}
-}
diff --git a/polkadot/node/core/pvf/src/execute/mod.rs b/polkadot/node/core/pvf/src/execute/mod.rs
index b0e8cc48256..8e3b17d7156 100644
--- a/polkadot/node/core/pvf/src/execute/mod.rs
+++ b/polkadot/node/core/pvf/src/execute/mod.rs
@@ -18,10 +18,10 @@
 //!
 //! The validation host [runs the queue][`start`] communicating with it by sending [`ToQueue`]
 //! messages. The queue will spawn workers in new processes. Those processes should jump to
-//! [`worker_entrypoint`].
+//! `polkadot_node_core_pvf_worker::execute_worker_entrypoint`.
 
 mod queue;
-mod worker;
+mod worker_intf;
 
 pub use queue::{start, PendingExecutionRequest, ToQueue};
-pub use worker::{worker_entrypoint, Response as ExecuteResponse};
+pub use worker_intf::{Handshake as ExecuteHandshake, Response as ExecuteResponse};
diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs
index e1e02205256..5b3e21cee07 100644
--- a/polkadot/node/core/pvf/src/execute/queue.rs
+++ b/polkadot/node/core/pvf/src/execute/queue.rs
@@ -16,7 +16,7 @@
 
 //! A queue that handles requests for PVF execution.
 
-use super::worker::Outcome;
+use super::worker_intf::Outcome;
 use crate::{
 	artifacts::{ArtifactId, ArtifactPathId},
 	host::ResultSender,
@@ -416,7 +416,8 @@ async fn spawn_worker_task(
 	use futures_timer::Delay;
 
 	loop {
-		match super::worker::spawn(&program_path, job.executor_params.clone(), spawn_timeout).await
+		match super::worker_intf::spawn(&program_path, job.executor_params.clone(), spawn_timeout)
+			.await
 		{
 			Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
 			Err(err) => {
@@ -460,9 +461,13 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
 	queue.mux.push(
 		async move {
 			let _timer = execution_timer;
-			let outcome =
-				super::worker::start_work(idle, job.artifact.clone(), job.exec_timeout, job.params)
-					.await;
+			let outcome = super::worker_intf::start_work(
+				idle,
+				job.artifact.clone(),
+				job.exec_timeout,
+				job.params,
+			)
+			.await;
 			QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
 		}
 		.boxed(),
diff --git a/polkadot/node/core/pvf/src/execute/worker.rs b/polkadot/node/core/pvf/src/execute/worker_intf.rs
similarity index 55%
rename from polkadot/node/core/pvf/src/execute/worker.rs
rename to polkadot/node/core/pvf/src/execute/worker_intf.rs
index f20874083be..bc467cf90de 100644
--- a/polkadot/node/core/pvf/src/execute/worker.rs
+++ b/polkadot/node/core/pvf/src/execute/worker_intf.rs
@@ -14,28 +14,23 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
+//! Host interface to the execute worker.
+
 use crate::{
 	artifacts::ArtifactPathId,
-	executor_intf::Executor,
 	worker_common::{
-		bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
-		spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
-		JOB_TIMEOUT_WALL_CLOCK_FACTOR,
+		framed_recv, framed_send, path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr,
+		WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
 	},
 	LOG_TARGET,
 };
-use cpu_time::ProcessTime;
-use futures::{pin_mut, select_biased, FutureExt};
+use futures::FutureExt;
 use futures_timer::Delay;
 use parity_scale_codec::{Decode, Encode};
 
 use polkadot_parachain::primitives::ValidationResult;
 use polkadot_primitives::ExecutorParams;
-use std::{
-	path::{Path, PathBuf},
-	sync::{mpsc::channel, Arc},
-	time::Duration,
-};
+use std::{path::Path, time::Duration};
 use tokio::{io, net::UnixStream};
 
 /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
@@ -185,17 +180,6 @@ async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Re
 	framed_send(stream, &handshake.encode()).await
 }
 
-async fn recv_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
-	let handshake_enc = framed_recv(stream).await?;
-	let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
-		io::Error::new(
-			io::ErrorKind::Other,
-			"execute pvf recv_handshake: failed to decode Handshake".to_owned(),
-		)
-	})?;
-	Ok(handshake)
-}
-
 async fn send_request(
 	stream: &mut UnixStream,
 	artifact_path: &Path,
@@ -207,29 +191,6 @@ async fn send_request(
 	framed_send(stream, &execution_timeout.encode()).await
 }
 
-async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>, Duration)> {
-	let artifact_path = framed_recv(stream).await?;
-	let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| {
-		io::Error::new(
-			io::ErrorKind::Other,
-			"execute pvf recv_request: non utf-8 artifact path".to_string(),
-		)
-	})?;
-	let params = framed_recv(stream).await?;
-	let execution_timeout = framed_recv(stream).await?;
-	let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| {
-		io::Error::new(
-			io::ErrorKind::Other,
-			"execute pvf recv_request: failed to decode duration".to_string(),
-		)
-	})?;
-	Ok((artifact_path, params, execution_timeout))
-}
-
-async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> {
-	framed_send(stream, &response.encode()).await
-}
-
 async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> {
 	let response_bytes = framed_recv(stream).await?;
 	Response::decode(&mut &response_bytes[..]).map_err(|e| {
@@ -240,28 +201,43 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> {
 	})
 }
 
+/// The payload of the one-time handshake that is done when a worker process is created. Carries
+/// data from the host to the worker.
 #[derive(Encode, Decode)]
-struct Handshake {
-	executor_params: ExecutorParams,
+pub struct Handshake {
+	/// The executor parameters.
+	pub executor_params: ExecutorParams,
 }
 
+/// The response from an execution job on the worker.
 #[derive(Encode, Decode)]
 pub enum Response {
-	Ok { result_descriptor: ValidationResult, duration: Duration },
+	/// The job completed successfully.
+	Ok {
+		/// The result of parachain validation.
+		result_descriptor: ValidationResult,
+		/// The amount of CPU time taken by the job.
+		duration: Duration,
+	},
+	/// The candidate is invalid.
 	InvalidCandidate(String),
+	/// The job timed out.
 	TimedOut,
+	/// Some internal error occurred. Should only be used for errors independent of the candidate.
 	InternalError(String),
 }
 
 impl Response {
-	fn format_invalid(ctx: &'static str, msg: &str) -> Self {
+	/// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty).
+	pub fn format_invalid(ctx: &'static str, msg: &str) -> Self {
 		if msg.is_empty() {
 			Self::InvalidCandidate(ctx.to_string())
 		} else {
 			Self::InvalidCandidate(format!("{}: {}", ctx, msg))
 		}
 	}
-	fn format_internal(ctx: &'static str, msg: &str) -> Self {
+	/// Creates an internal response from a context `ctx` and a message `msg` (which can be empty).
+	pub fn format_internal(ctx: &'static str, msg: &str) -> Self {
 		if msg.is_empty() {
 			Self::InternalError(ctx.to_string())
 		} else {
@@ -269,110 +245,3 @@ impl Response {
 		}
 	}
 }
-
-/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
-/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
-/// is checked against the worker version. A mismatch results in immediate worker termination.
-/// `None` is used for tests and in other situations when version check is not necessary.
-pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
-	worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
-		let worker_pid = std::process::id();
-
-		let handshake = recv_handshake(&mut stream).await?;
-		let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
-			io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
-		})?);
-
-		loop {
-			let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
-			gum::debug!(
-				target: LOG_TARGET,
-				%worker_pid,
-				"worker: validating artifact {}",
-				artifact_path.display(),
-			);
-
-			// Used to signal to the cpu time monitor thread that it can finish.
-			let (finished_tx, finished_rx) = channel::<()>();
-			let cpu_time_start = ProcessTime::now();
-
-			// Spawn a new thread that runs the CPU time monitor.
-			let cpu_time_monitor_fut = rt_handle
-				.spawn_blocking(move || {
-					cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
-				})
-				.fuse();
-			let executor_2 = executor.clone();
-			let execute_fut = rt_handle
-				.spawn_blocking(move || {
-					validate_using_artifact(&artifact_path, &params, executor_2, cpu_time_start)
-				})
-				.fuse();
-
-			pin_mut!(cpu_time_monitor_fut);
-			pin_mut!(execute_fut);
-
-			let response = select_biased! {
-				// If this future is not selected, the join handle is dropped and the thread will
-				// finish in the background.
-				cpu_time_monitor_res = cpu_time_monitor_fut => {
-					match cpu_time_monitor_res {
-						Ok(Some(cpu_time_elapsed)) => {
-							// Log if we exceed the timeout and the other thread hasn't finished.
-							gum::warn!(
-								target: LOG_TARGET,
-								%worker_pid,
-								"execute job took {}ms cpu time, exceeded execute timeout {}ms",
-								cpu_time_elapsed.as_millis(),
-								execution_timeout.as_millis(),
-							);
-							Response::TimedOut
-						},
-						Ok(None) => Response::InternalError("error communicating over finished channel".into()),
-						Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
-					}
-				},
-				execute_res = execute_fut => {
-					let _ = finished_tx.send(());
-					execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
-				},
-			};
-
-			send_response(&mut stream, response).await?;
-		}
-	});
-}
-
-fn validate_using_artifact(
-	artifact_path: &Path,
-	params: &[u8],
-	executor: Arc<Executor>,
-	cpu_time_start: ProcessTime,
-) -> Response {
-	// Check here if the file exists, because the error from Substrate is not match-able.
-	// TODO: Re-evaluate after <https://github.com/paritytech/substrate/issues/13860>.
-	let file_metadata = std::fs::metadata(artifact_path);
-	if let Err(err) = file_metadata {
-		return Response::format_internal("execute: could not find or open file", &err.to_string())
-	}
-
-	let descriptor_bytes = match unsafe {
-		// SAFETY: this should be safe since the compiled artifact passed here comes from the
-		//         file created by the prepare workers. These files are obtained by calling
-		//         [`executor_intf::prepare`].
-		executor.execute(artifact_path.as_ref(), params)
-	} {
-		Err(err) => return Response::format_invalid("execute", &err),
-		Ok(d) => d,
-	};
-
-	let duration = cpu_time_start.elapsed();
-
-	let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
-		Err(err) =>
-			return Response::format_invalid("validation result decoding failed", &err.to_string()),
-		Ok(r) => r,
-	};
-
-	Response::Ok { result_descriptor, duration }
-}
diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs
index e8f174b89cd..cdaee334140 100644
--- a/polkadot/node/core/pvf/src/lib.rs
+++ b/polkadot/node/core/pvf/src/lib.rs
@@ -29,11 +29,11 @@
 //!
 //! Then using the handle the client can send three types of requests:
 //!
-//! (a) PVF pre-checking. This takes the PVF [code][`Pvf`] and tries to prepare it (verify and
+//! (a) PVF pre-checking. This takes the `Pvf` code and tries to prepare it (verify and
 //! compile) in order to pre-check its validity.
 //!
 //! (b) PVF execution. This accepts the PVF [`params`][`polkadot_parachain::primitives::ValidationParams`]
-//!     and the PVF [code][`Pvf`], prepares (verifies and compiles) the code, and then executes PVF
+//!     and the `Pvf` code, prepares (verifies and compiles) the code, and then executes PVF
 //!     with the `params`.
 //!
 //! (c) Heads up. This request allows to signal that the given PVF may be needed soon and that it
@@ -91,7 +91,6 @@
 mod artifacts;
 mod error;
 mod execute;
-mod executor_intf;
 mod host;
 mod metrics;
 mod prepare;
@@ -99,27 +98,22 @@ mod priority;
 mod pvf;
 mod worker_common;
 
-#[doc(hidden)]
-pub mod testing;
-
-#[doc(hidden)]
-pub use sp_tracing;
-
+pub use artifacts::CompiledArtifact;
 pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
-pub use prepare::PrepareStats;
+pub use execute::{ExecuteHandshake, ExecuteResponse};
+#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+pub use prepare::MemoryAllocationStats;
+pub use prepare::{MemoryStats, PrepareStats};
 pub use priority::Priority;
 pub use pvf::PvfPrepData;
 
 pub use host::{start, Config, ValidationHost};
 pub use metrics::Metrics;
-pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;
-
-pub use execute::worker_entrypoint as execute_worker_entrypoint;
-pub use prepare::worker_entrypoint as prepare_worker_entrypoint;
-
-pub use executor_intf::{prepare, prevalidate};
-
-pub use sc_executor_common;
-pub use sp_maybe_compressed_blob;
+pub use worker_common::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
 
 const LOG_TARGET: &str = "parachain::pvf";
+
+#[doc(hidden)]
+pub mod testing {
+	pub use crate::worker_common::{spawn_with_program_path, SpawnErr};
+}
diff --git a/polkadot/node/core/pvf/src/prepare/mod.rs b/polkadot/node/core/pvf/src/prepare/mod.rs
index d8d036a8223..de40c48464c 100644
--- a/polkadot/node/core/pvf/src/prepare/mod.rs
+++ b/polkadot/node/core/pvf/src/prepare/mod.rs
@@ -20,23 +20,44 @@
 //! (by running [`start_pool`]).
 //!
 //! The pool will spawn workers in new processes and those should execute pass control to
-//! [`worker_entrypoint`].
+//! `polkadot_node_core_pvf_worker::prepare_worker_entrypoint`.
 
-mod memory_stats;
 mod pool;
 mod queue;
-mod worker;
+mod worker_intf;
 
-pub use memory_stats::MemoryStats;
 pub use pool::start as start_pool;
 pub use queue::{start as start_queue, FromQueue, ToQueue};
-pub use worker::worker_entrypoint;
 
 use parity_scale_codec::{Decode, Encode};
 
 /// Preparation statistics, including the CPU time and memory taken.
 #[derive(Debug, Clone, Default, Encode, Decode)]
 pub struct PrepareStats {
-	cpu_time_elapsed: std::time::Duration,
-	memory_stats: MemoryStats,
+	/// The CPU time that elapsed for the preparation job.
+	pub cpu_time_elapsed: std::time::Duration,
+	/// The observed memory statistics for the preparation job.
+	pub memory_stats: MemoryStats,
+}
+
+/// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if
+/// supported by the OS, `ru_maxrss`.
+#[derive(Clone, Debug, Default, Encode, Decode)]
+pub struct MemoryStats {
+	/// Memory stats from `tikv_jemalloc_ctl`.
+	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+	pub memory_tracker_stats: Option<MemoryAllocationStats>,
+	/// `ru_maxrss` from `getrusage`. `None` if an error occurred.
+	#[cfg(target_os = "linux")]
+	pub max_rss: Option<i64>,
+}
+
+/// Statistics of collected memory metrics.
+#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+#[derive(Clone, Debug, Default, Encode, Decode)]
+pub struct MemoryAllocationStats {
+	/// Total resident memory, in bytes.
+	pub resident: u64,
+	/// Total allocated memory, in bytes.
+	pub allocated: u64,
 }
diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs
index f8435a40348..d151f097805 100644
--- a/polkadot/node/core/pvf/src/prepare/pool.rs
+++ b/polkadot/node/core/pvf/src/prepare/pool.rs
@@ -14,7 +14,7 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-use super::worker::{self, Outcome};
+use super::worker_intf::{self, Outcome};
 use crate::{
 	error::{PrepareError, PrepareResult},
 	metrics::Metrics,
@@ -250,7 +250,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po
 	use futures_timer::Delay;
 
 	loop {
-		match worker::spawn(&program_path, spawn_timeout).await {
+		match worker_intf::spawn(&program_path, spawn_timeout).await {
 			Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle),
 			Err(err) => {
 				gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err);
@@ -271,7 +271,7 @@ async fn start_work_task<Timer>(
 	artifact_path: PathBuf,
 	_preparation_timer: Option<Timer>,
 ) -> PoolEvent {
-	let outcome = worker::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await;
+	let outcome = worker_intf::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await;
 	PoolEvent::StartWork(worker, outcome)
 }
 
diff --git a/polkadot/node/core/pvf/src/prepare/queue.rs b/polkadot/node/core/pvf/src/prepare/queue.rs
index 20ee95a435b..f84d5ab0e56 100644
--- a/polkadot/node/core/pvf/src/prepare/queue.rs
+++ b/polkadot/node/core/pvf/src/prepare/queue.rs
@@ -226,7 +226,7 @@ async fn handle_enqueue(
 		target: LOG_TARGET,
 		validation_code_hash = ?pvf.code_hash(),
 		?priority,
-		preparation_timeout = ?pvf.prep_timeout,
+		preparation_timeout = ?pvf.prep_timeout(),
 		"PVF is enqueued for preparation.",
 	);
 	queue.metrics.prepare_enqueued();
diff --git a/polkadot/node/core/pvf/src/prepare/worker.rs b/polkadot/node/core/pvf/src/prepare/worker_intf.rs
similarity index 55%
rename from polkadot/node/core/pvf/src/prepare/worker.rs
rename to polkadot/node/core/pvf/src/prepare/worker_intf.rs
index 3b2ae211e6c..daf94aadc67 100644
--- a/polkadot/node/core/pvf/src/prepare/worker.rs
+++ b/polkadot/node/core/pvf/src/prepare/worker_intf.rs
@@ -14,33 +14,24 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-#[cfg(target_os = "linux")]
-use super::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
-#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-use super::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
-use super::memory_stats::MemoryStats;
+//! Host interface to the prepare worker.
+
 use crate::{
-	artifacts::CompiledArtifact,
 	error::{PrepareError, PrepareResult},
 	metrics::Metrics,
 	prepare::PrepareStats,
 	pvf::PvfPrepData,
 	worker_common::{
-		bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
-		spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
-		JOB_TIMEOUT_WALL_CLOCK_FACTOR,
+		framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker,
+		SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
 	},
 	LOG_TARGET,
 };
-use cpu_time::ProcessTime;
-use futures::{pin_mut, select_biased, FutureExt};
 use parity_scale_codec::{Decode, Encode};
 
 use sp_core::hexdisplay::HexDisplay;
 use std::{
-	panic,
 	path::{Path, PathBuf},
-	sync::mpsc::channel,
 	time::Duration,
 };
 use tokio::{io, net::UnixStream};
@@ -104,7 +95,7 @@ pub async fn start_work(
 	);
 
 	with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
-		let preparation_timeout = pvf.prep_timeout;
+		let preparation_timeout = pvf.prep_timeout();
 		if let Err(err) = send_request(&mut stream, pvf, &tmp_file).await {
 			gum::warn!(
 				target: LOG_TARGET,
@@ -285,28 +276,6 @@ async fn send_request(
 	Ok(())
 }
 
-async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> {
-	let pvf = framed_recv(stream).await?;
-	let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
-		io::Error::new(
-			io::ErrorKind::Other,
-			format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e),
-		)
-	})?;
-	let tmp_file = framed_recv(stream).await?;
-	let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
-		io::Error::new(
-			io::ErrorKind::Other,
-			"prepare pvf recv_request: non utf-8 artifact path".to_string(),
-		)
-	})?;
-	Ok((pvf, tmp_file))
-}
-
-async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
-	framed_send(stream, &result.encode()).await
-}
-
 async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareResult> {
 	let result = framed_recv(stream).await?;
 	let result = PrepareResult::decode(&mut &result[..]).map_err(|e| {
@@ -325,158 +294,3 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
 	})?;
 	Ok(result)
 }
-
-/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
-/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
-/// is checked against the worker version. A mismatch results in immediate worker termination.
-/// `None` is used for tests and in other situations when version check is not necessary.
-///
-/// # Flow
-///
-///	This runs the following in a loop:
-///
-///	1. Get the code and parameters for preparation from the host.
-///
-///	2. Start a memory tracker in a separate thread.
-///
-///	3. Start the CPU time monitor loop and the actual preparation in two separate threads.
-///
-///	4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor
-///	   thread will trigger first.
-///
-///	5. Stop the memory tracker and get the stats.
-///
-/// 6. If compilation succeeded, write the compiled artifact into a temporary file.
-///
-///	7. Send the result of preparation back to the host. If any error occurred in the above steps, we
-///	   send that in the `PrepareResult`.
-pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
-	worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move {
-		let worker_pid = std::process::id();
-
-		loop {
-			let (pvf, dest) = recv_request(&mut stream).await?;
-			gum::debug!(
-				target: LOG_TARGET,
-				%worker_pid,
-				"worker: preparing artifact",
-			);
-
-			let cpu_time_start = ProcessTime::now();
-			let preparation_timeout = pvf.prep_timeout;
-
-			// Run the memory tracker.
-			#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-			let (memory_tracker_tx, memory_tracker_rx) = channel::<()>();
-			#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-			let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx));
-
-			// Spawn a new thread that runs the CPU time monitor.
-			let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
-			let cpu_time_monitor_fut = rt_handle
-				.spawn_blocking(move || {
-					cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx)
-				})
-				.fuse();
-			// Spawn another thread for preparation.
-			let prepare_fut = rt_handle
-				.spawn_blocking(move || {
-					let result = prepare_artifact(pvf);
-
-					// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
-					#[cfg(target_os = "linux")]
-					let result = result.map(|artifact| (artifact, get_max_rss_thread()));
-
-					result
-				})
-				.fuse();
-
-			pin_mut!(cpu_time_monitor_fut);
-			pin_mut!(prepare_fut);
-
-			let result = select_biased! {
-				// If this future is not selected, the join handle is dropped and the thread will
-				// finish in the background.
-				join_res = cpu_time_monitor_fut => {
-					match join_res {
-						Ok(Some(cpu_time_elapsed)) => {
-							// Log if we exceed the timeout and the other thread hasn't finished.
-							gum::warn!(
-								target: LOG_TARGET,
-								%worker_pid,
-								"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
-								cpu_time_elapsed.as_millis(),
-								preparation_timeout.as_millis(),
-							);
-							Err(PrepareError::TimedOut)
-						},
-						Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
-						Err(err) => Err(PrepareError::IoErr(err.to_string())),
-					}
-				},
-				prepare_res = prepare_fut => {
-					let cpu_time_elapsed = cpu_time_start.elapsed();
-					let _ = cpu_time_monitor_tx.send(());
-
-					match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) {
-						Err(err) => {
-							// Serialized error will be written into the socket.
-							Err(err)
-						},
-						Ok(ok) => {
-							// Stop the memory stats worker and get its observed memory stats.
-							#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-							let memory_tracker_stats =
-								get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await;
-							#[cfg(target_os = "linux")]
-							let (ok, max_rss) = ok;
-							let memory_stats = MemoryStats {
-								#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-								memory_tracker_stats,
-								#[cfg(target_os = "linux")]
-								max_rss: extract_max_rss_stat(max_rss, worker_pid),
-							};
-
-							// Write the serialized artifact into a temp file.
-							//
-							// PVF host only keeps artifacts statuses in its memory, successfully
-							// compiled code gets stored on the disk (and consequently deserialized
-							// by execute-workers). The prepare worker is only required to send `Ok`
-							// to the pool to indicate the success.
-
-							gum::debug!(
-								target: LOG_TARGET,
-								%worker_pid,
-								"worker: writing artifact to {}",
-								dest.display(),
-							);
-							tokio::fs::write(&dest, &ok).await?;
-
-							Ok(PrepareStats{cpu_time_elapsed, memory_stats})
-						},
-					}
-				},
-			};
-
-			send_response(&mut stream, result).await?;
-		}
-	});
-}
-
-fn prepare_artifact(pvf: PvfPrepData) -> Result<CompiledArtifact, PrepareError> {
-	panic::catch_unwind(|| {
-		let blob = match crate::executor_intf::prevalidate(&pvf.code()) {
-			Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
-			Ok(b) => b,
-		};
-
-		match crate::executor_intf::prepare(blob, &pvf.executor_params()) {
-			Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
-			Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
-		}
-	})
-	.map_err(|panic_payload| {
-		PrepareError::Panic(crate::error::stringify_panic_payload(panic_payload))
-	})
-	.and_then(|inner_result| inner_result)
-}
diff --git a/polkadot/node/core/pvf/src/pvf.rs b/polkadot/node/core/pvf/src/pvf.rs
index ad2dc5fcd91..c134cacb4ac 100644
--- a/polkadot/node/core/pvf/src/pvf.rs
+++ b/polkadot/node/core/pvf/src/pvf.rs
@@ -36,13 +36,13 @@ use crate::host::tests::TEST_PREPARATION_TIMEOUT;
 #[derive(Clone, Encode, Decode)]
 pub struct PvfPrepData {
 	/// Wasm code (uncompressed)
-	pub(crate) code: Arc<Vec<u8>>,
+	code: Arc<Vec<u8>>,
 	/// Wasm code hash
-	pub(crate) code_hash: ValidationCodeHash,
+	code_hash: ValidationCodeHash,
 	/// Executor environment parameters for the session for which artifact is prepared
-	pub(crate) executor_params: Arc<ExecutorParams>,
+	executor_params: Arc<ExecutorParams>,
 	/// Preparation timeout
-	pub(crate) prep_timeout: Duration,
+	prep_timeout: Duration,
 }
 
 impl PvfPrepData {
@@ -69,15 +69,20 @@ impl PvfPrepData {
 	}
 
 	/// Returns PVF code
-	pub(crate) fn code(&self) -> Arc<Vec<u8>> {
+	pub fn code(&self) -> Arc<Vec<u8>> {
 		self.code.clone()
 	}
 
 	/// Returns executor params
-	pub(crate) fn executor_params(&self) -> Arc<ExecutorParams> {
+	pub fn executor_params(&self) -> Arc<ExecutorParams> {
 		self.executor_params.clone()
 	}
 
+	/// Returns preparation timeout.
+	pub fn prep_timeout(&self) -> Duration {
+		self.prep_timeout
+	}
+
 	/// Creates a structure for tests
 	#[cfg(test)]
 	pub(crate) fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self {
diff --git a/polkadot/node/core/pvf/src/worker_common.rs b/polkadot/node/core/pvf/src/worker_common.rs
index 3caee34a5d0..33144616601 100644
--- a/polkadot/node/core/pvf/src/worker_common.rs
+++ b/polkadot/node/core/pvf/src/worker_common.rs
@@ -17,8 +17,7 @@
 //! Common logic for implementation of worker processes.
 
 use crate::LOG_TARGET;
-use cpu_time::ProcessTime;
-use futures::{never::Never, FutureExt as _};
+use futures::FutureExt as _;
 use futures_timer::Delay;
 use pin_project::pin_project;
 use rand::Rng;
@@ -26,7 +25,6 @@ use std::{
 	fmt, mem,
 	path::{Path, PathBuf},
 	pin::Pin,
-	sync::mpsc::{Receiver, RecvTimeoutError},
 	task::{Context, Poll},
 	time::Duration,
 };
@@ -34,17 +32,12 @@ use tokio::{
 	io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf},
 	net::{UnixListener, UnixStream},
 	process,
-	runtime::{Handle, Runtime},
 };
 
 /// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in
 /// wall clock time). This is lenient because CPU time may go slower than wall clock time.
 pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
 
-/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
-/// child process.
-pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
-
 /// This is publicly exposed only for integration tests.
 #[doc(hidden)]
 pub async fn spawn_with_program_path(
@@ -171,92 +164,6 @@ pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
 	tmpfile_in(prefix, &temp_dir).await
 }
 
-pub fn worker_event_loop<F, Fut>(
-	debug_id: &'static str,
-	socket_path: &str,
-	node_version: Option<&str>,
-	mut event_loop: F,
-) where
-	F: FnMut(Handle, UnixStream) -> Fut,
-	Fut: futures::Future<Output = io::Result<Never>>,
-{
-	let worker_pid = std::process::id();
-	gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id);
-
-	// Check for a mismatch between the node and worker versions.
-	if let Some(version) = node_version {
-		if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
-			gum::error!(
-				target: LOG_TARGET,
-				%worker_pid,
-				"Node and worker version mismatch, node needs restarting, forcing shutdown",
-			);
-			kill_parent_node_in_emergency();
-			let err: io::Result<Never> =
-				Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"));
-			gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err);
-			return
-		}
-	}
-
-	// Run the main worker loop.
-	let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
-	let handle = rt.handle();
-	let err = rt
-		.block_on(async move {
-			let stream = UnixStream::connect(socket_path).await?;
-			let _ = tokio::fs::remove_file(socket_path).await;
-
-			let result = event_loop(handle.clone(), stream).await;
-
-			result
-		})
-		// It's never `Ok` because it's `Ok(Never)`.
-		.unwrap_err();
-
-	gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err);
-
-	// We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast
-	// as possible and not wait for stalled validation to finish. This isn't strictly necessary now,
-	// but may be in the future.
-	rt.shutdown_background();
-}
-
-/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
-/// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout.
-///
-/// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return
-/// `None` if the other thread finishes first, without us timing out.
-///
-/// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or
-/// execution, to be killed by the host. We do not kill the process here because it would interfere
-/// with the proper handling of this error.
-pub fn cpu_time_monitor_loop(
-	cpu_time_start: ProcessTime,
-	timeout: Duration,
-	finished_rx: Receiver<()>,
-) -> Option<Duration> {
-	loop {
-		let cpu_time_elapsed = cpu_time_start.elapsed();
-
-		// Treat the timeout as CPU time, which is less subject to variance due to load.
-		if cpu_time_elapsed <= timeout {
-			// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
-			// is wall clock time. The CPU clock may be slower than the wall clock.
-			let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD;
-			match finished_rx.recv_timeout(sleep_interval) {
-				// Received finish signal.
-				Ok(()) => return None,
-				// Timed out, restart loop.
-				Err(RecvTimeoutError::Timeout) => continue,
-				Err(RecvTimeoutError::Disconnected) => return None,
-			}
-		}
-
-		return Some(cpu_time_elapsed)
-	}
-}
-
 /// A struct that represents an idle worker.
 ///
 /// This struct is supposed to be used as a token that is passed by move into a subroutine that
@@ -405,12 +312,7 @@ pub fn path_to_bytes(path: &Path) -> &[u8] {
 	path.to_str().expect("non-UTF-8 path").as_bytes()
 }
 
-/// Interprets the given bytes as a path. Returns `None` if the given bytes do not constitute a
-/// a proper utf-8 string.
-pub fn bytes_to_path(bytes: &[u8]) -> Option<PathBuf> {
-	std::str::from_utf8(bytes).ok().map(PathBuf::from)
-}
-
+/// Write some data prefixed by its length into `w`.
 pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> {
 	let len_buf = buf.len().to_le_bytes();
 	w.write_all(&len_buf).await?;
@@ -418,6 +320,7 @@ pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::R
 	Ok(())
 }
 
+/// Read some data prefixed by its length from `r`.
 pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>> {
 	let mut len_buf = [0u8; mem::size_of::<usize>()];
 	r.read_exact(&mut len_buf).await?;
@@ -426,20 +329,3 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
 	r.read_exact(&mut buf).await?;
 	Ok(buf)
 }
-
-/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM`
-/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
-/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
-/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
-/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
-/// no leftover artifacts are possible.
-fn kill_parent_node_in_emergency() {
-	unsafe {
-		// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
-		// some corner cases, which is checked. `kill()` never fails.
-		let ppid = libc::getppid();
-		if ppid > 1 {
-			libc::kill(ppid, libc::SIGTERM);
-		}
-	}
-}
diff --git a/polkadot/node/core/pvf/worker/Cargo.toml b/polkadot/node/core/pvf/worker/Cargo.toml
new file mode 100644
index 00000000000..260c6217eb6
--- /dev/null
+++ b/polkadot/node/core/pvf/worker/Cargo.toml
@@ -0,0 +1,49 @@
+[package]
+name = "polkadot-node-core-pvf-worker"
+version.workspace = true
+authors.workspace = true
+edition.workspace = true
+
+[[bin]]
+name = "puppet_worker"
+path = "bin/puppet_worker.rs"
+
+[dependencies]
+assert_matches = "1.4.0"
+cpu-time = "1.0.0"
+futures = "0.3.21"
+gum = { package = "tracing-gum", path = "../../../gum" }
+libc = "0.2.139"
+rayon = "1.5.1"
+tempfile = "3.3.0"
+tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
+tokio = "1.24.2"
+
+parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] }
+
+polkadot-node-core-pvf = { path = ".." }
+polkadot-parachain = { path = "../../../../parachain" }
+polkadot-primitives = { path = "../../../../primitives" }
+
+sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-externalities = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
+[target.'cfg(target_os = "linux")'.dependencies]
+tikv-jemalloc-ctl = "0.5.0"
+
+[build-dependencies]
+substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
+[dev-dependencies]
+adder = { package = "test-parachain-adder", path = "../../../../parachain/test-parachains/adder" }
+halt = { package = "test-parachain-halt", path = "../../../../parachain/test-parachains/halt" }
+tempfile = "3.3.0"
+
+[features]
+jemalloc-allocator = ["dep:tikv-jemalloc-ctl"]
diff --git a/polkadot/node/core/pvf/bin/puppet_worker.rs b/polkadot/node/core/pvf/worker/bin/puppet_worker.rs
similarity index 92%
rename from polkadot/node/core/pvf/bin/puppet_worker.rs
rename to polkadot/node/core/pvf/worker/bin/puppet_worker.rs
index 7f93519d845..ddd81971292 100644
--- a/polkadot/node/core/pvf/bin/puppet_worker.rs
+++ b/polkadot/node/core/pvf/worker/bin/puppet_worker.rs
@@ -14,4 +14,4 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-polkadot_node_core_pvf::decl_puppet_worker_main!();
+polkadot_node_core_pvf_worker::decl_puppet_worker_main!();
diff --git a/polkadot/node/core/pvf/worker/build.rs b/polkadot/node/core/pvf/worker/build.rs
new file mode 100644
index 00000000000..40e9f832586
--- /dev/null
+++ b/polkadot/node/core/pvf/worker/build.rs
@@ -0,0 +1,19 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+fn main() {
+	substrate_build_script_utils::generate_cargo_keys();
+}
diff --git a/polkadot/node/core/pvf/worker/src/common.rs b/polkadot/node/core/pvf/worker/src/common.rs
new file mode 100644
index 00000000000..84bc88701d6
--- /dev/null
+++ b/polkadot/node/core/pvf/worker/src/common.rs
@@ -0,0 +1,142 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use crate::LOG_TARGET;
+use cpu_time::ProcessTime;
+use futures::never::Never;
+use std::{
+	path::PathBuf,
+	sync::mpsc::{Receiver, RecvTimeoutError},
+	time::Duration,
+};
+use tokio::{
+	io,
+	net::UnixStream,
+	runtime::{Handle, Runtime},
+};
+
+/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
+/// child process.
+pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
+
+/// Interprets the given bytes as a path. Returns `None` if the given bytes do not constitute a
+/// a proper utf-8 string.
+pub fn bytes_to_path(bytes: &[u8]) -> Option<PathBuf> {
+	std::str::from_utf8(bytes).ok().map(PathBuf::from)
+}
+
+pub fn worker_event_loop<F, Fut>(
+	debug_id: &'static str,
+	socket_path: &str,
+	node_version: Option<&str>,
+	mut event_loop: F,
+) where
+	F: FnMut(Handle, UnixStream) -> Fut,
+	Fut: futures::Future<Output = io::Result<Never>>,
+{
+	let worker_pid = std::process::id();
+	gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id);
+
+	// Check for a mismatch between the node and worker versions.
+	if let Some(version) = node_version {
+		if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
+			gum::error!(
+				target: LOG_TARGET,
+				%worker_pid,
+				"Node and worker version mismatch, node needs restarting, forcing shutdown",
+			);
+			kill_parent_node_in_emergency();
+			let err: io::Result<Never> =
+				Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"));
+			gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err);
+			return
+		}
+	}
+
+	// Run the main worker loop.
+	let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
+	let handle = rt.handle();
+	let err = rt
+		.block_on(async move {
+			let stream = UnixStream::connect(socket_path).await?;
+			let _ = tokio::fs::remove_file(socket_path).await;
+
+			let result = event_loop(handle.clone(), stream).await;
+
+			result
+		})
+		// It's never `Ok` because it's `Ok(Never)`.
+		.unwrap_err();
+
+	gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err);
+
+	// We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast
+	// as possible and not wait for stalled validation to finish. This isn't strictly necessary now,
+	// but may be in the future.
+	rt.shutdown_background();
+}
+
+/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
+/// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout.
+///
+/// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return
+/// `None` if the other thread finishes first, without us timing out.
+///
+/// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or
+/// execution, to be killed by the host. We do not kill the process here because it would interfere
+/// with the proper handling of this error.
+pub fn cpu_time_monitor_loop(
+	cpu_time_start: ProcessTime,
+	timeout: Duration,
+	finished_rx: Receiver<()>,
+) -> Option<Duration> {
+	loop {
+		let cpu_time_elapsed = cpu_time_start.elapsed();
+
+		// Treat the timeout as CPU time, which is less subject to variance due to load.
+		if cpu_time_elapsed <= timeout {
+			// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
+			// is wall clock time. The CPU clock may be slower than the wall clock.
+			let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD;
+			match finished_rx.recv_timeout(sleep_interval) {
+				// Received finish signal.
+				Ok(()) => return None,
+				// Timed out, restart loop.
+				Err(RecvTimeoutError::Timeout) => continue,
+				Err(RecvTimeoutError::Disconnected) => return None,
+			}
+		}
+
+		return Some(cpu_time_elapsed)
+	}
+}
+
+/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM`
+/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
+/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
+/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
+/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
+/// no leftover artifacts are possible.
+fn kill_parent_node_in_emergency() {
+	unsafe {
+		// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
+		// some corner cases, which is checked. `kill()` never fails.
+		let ppid = libc::getppid();
+		if ppid > 1 {
+			libc::kill(ppid, libc::SIGTERM);
+		}
+	}
+}
diff --git a/polkadot/node/core/pvf/worker/src/execute.rs b/polkadot/node/core/pvf/worker/src/execute.rs
new file mode 100644
index 00000000000..9f6ff164a2b
--- /dev/null
+++ b/polkadot/node/core/pvf/worker/src/execute.rs
@@ -0,0 +1,175 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use crate::{
+	common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop},
+	executor_intf::Executor,
+	LOG_TARGET,
+};
+use cpu_time::ProcessTime;
+use futures::{pin_mut, select_biased, FutureExt};
+use parity_scale_codec::{Decode, Encode};
+use polkadot_node_core_pvf::{
+	framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response,
+};
+use polkadot_parachain::primitives::ValidationResult;
+use std::{
+	path::{Path, PathBuf},
+	sync::{mpsc::channel, Arc},
+	time::Duration,
+};
+use tokio::{io, net::UnixStream};
+
+async fn recv_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
+	let handshake_enc = framed_recv(stream).await?;
+	let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			"execute pvf recv_handshake: failed to decode Handshake".to_owned(),
+		)
+	})?;
+	Ok(handshake)
+}
+
+async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>, Duration)> {
+	let artifact_path = framed_recv(stream).await?;
+	let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			"execute pvf recv_request: non utf-8 artifact path".to_string(),
+		)
+	})?;
+	let params = framed_recv(stream).await?;
+	let execution_timeout = framed_recv(stream).await?;
+	let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			"execute pvf recv_request: failed to decode duration".to_string(),
+		)
+	})?;
+	Ok((artifact_path, params, execution_timeout))
+}
+
+async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> {
+	framed_send(stream, &response.encode()).await
+}
+
+/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
+/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
+/// is checked against the worker version. A mismatch results in immediate worker termination.
+/// `None` is used for tests and in other situations when version check is not necessary.
+pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
+	worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
+		let worker_pid = std::process::id();
+
+		let handshake = recv_handshake(&mut stream).await?;
+		let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
+			io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
+		})?);
+
+		loop {
+			let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
+			gum::debug!(
+				target: LOG_TARGET,
+				%worker_pid,
+				"worker: validating artifact {}",
+				artifact_path.display(),
+			);
+
+			// Used to signal to the cpu time monitor thread that it can finish.
+			let (finished_tx, finished_rx) = channel::<()>();
+			let cpu_time_start = ProcessTime::now();
+
+			// Spawn a new thread that runs the CPU time monitor.
+			let cpu_time_monitor_fut = rt_handle
+				.spawn_blocking(move || {
+					cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
+				})
+				.fuse();
+			let executor_2 = executor.clone();
+			let execute_fut = rt_handle
+				.spawn_blocking(move || {
+					validate_using_artifact(&artifact_path, &params, executor_2, cpu_time_start)
+				})
+				.fuse();
+
+			pin_mut!(cpu_time_monitor_fut);
+			pin_mut!(execute_fut);
+
+			let response = select_biased! {
+				// If this future is not selected, the join handle is dropped and the thread will
+				// finish in the background.
+				cpu_time_monitor_res = cpu_time_monitor_fut => {
+					match cpu_time_monitor_res {
+						Ok(Some(cpu_time_elapsed)) => {
+							// Log if we exceed the timeout and the other thread hasn't finished.
+							gum::warn!(
+								target: LOG_TARGET,
+								%worker_pid,
+								"execute job took {}ms cpu time, exceeded execute timeout {}ms",
+								cpu_time_elapsed.as_millis(),
+								execution_timeout.as_millis(),
+							);
+							Response::TimedOut
+						},
+						Ok(None) => Response::InternalError("error communicating over finished channel".into()),
+						Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
+					}
+				},
+				execute_res = execute_fut => {
+					let _ = finished_tx.send(());
+					execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
+				},
+			};
+
+			send_response(&mut stream, response).await?;
+		}
+	});
+}
+
+fn validate_using_artifact(
+	artifact_path: &Path,
+	params: &[u8],
+	executor: Arc<Executor>,
+	cpu_time_start: ProcessTime,
+) -> Response {
+	// Check here if the file exists, because the error from Substrate is not match-able.
+	// TODO: Re-evaluate after <https://github.com/paritytech/substrate/issues/13860>.
+	let file_metadata = std::fs::metadata(artifact_path);
+	if let Err(err) = file_metadata {
+		return Response::format_internal("execute: could not find or open file", &err.to_string())
+	}
+
+	let descriptor_bytes = match unsafe {
+		// SAFETY: this should be safe since the compiled artifact passed here comes from the
+		//         file created by the prepare workers. These files are obtained by calling
+		//         [`executor_intf::prepare`].
+		executor.execute(artifact_path.as_ref(), params)
+	} {
+		Err(err) => return Response::format_invalid("execute", &err),
+		Ok(d) => d,
+	};
+
+	let duration = cpu_time_start.elapsed();
+
+	let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
+		Err(err) =>
+			return Response::format_invalid("validation result decoding failed", &err.to_string()),
+		Ok(r) => r,
+	};
+
+	Response::Ok { result_descriptor, duration }
+}
diff --git a/polkadot/node/core/pvf/src/executor_intf.rs b/polkadot/node/core/pvf/worker/src/executor_intf.rs
similarity index 100%
rename from polkadot/node/core/pvf/src/executor_intf.rs
rename to polkadot/node/core/pvf/worker/src/executor_intf.rs
diff --git a/polkadot/node/core/pvf/worker/src/lib.rs b/polkadot/node/core/pvf/worker/src/lib.rs
new file mode 100644
index 00000000000..456362cf8f5
--- /dev/null
+++ b/polkadot/node/core/pvf/worker/src/lib.rs
@@ -0,0 +1,73 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+mod common;
+mod execute;
+mod executor_intf;
+mod memory_stats;
+mod prepare;
+
+#[doc(hidden)]
+pub mod testing;
+
+#[doc(hidden)]
+pub use sp_tracing;
+
+pub use execute::worker_entrypoint as execute_worker_entrypoint;
+pub use prepare::worker_entrypoint as prepare_worker_entrypoint;
+
+pub use executor_intf::{prepare, prevalidate};
+
+// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
+//       separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-worker=trace`.
+const LOG_TARGET: &str = "parachain::pvf-worker";
+
+/// Use this macro to declare a `fn main() {}` that will create an executable that can be used for
+/// spawning the desired worker.
+#[macro_export(local_inner_macros)]
+macro_rules! decl_worker_main {
+	($command:tt) => {
+		fn main() {
+			$crate::sp_tracing::try_init_simple();
+
+			let args = std::env::args().collect::<Vec<_>>();
+
+			let mut version = None;
+			let mut socket_path: &str = "";
+
+			for i in 1..args.len() {
+				match args[i].as_ref() {
+					"--socket-path" => socket_path = args[i + 1].as_str(),
+					"--node-version" => version = Some(args[i + 1].as_str()),
+					_ => (),
+				}
+			}
+
+			decl_worker_main_command!($command, socket_path, version)
+		}
+	};
+}
+
+#[macro_export]
+#[doc(hidden)]
+macro_rules! decl_worker_main_command {
+	(prepare, $socket_path:expr, $version: expr) => {
+		$crate::prepare_worker_entrypoint(&$socket_path, $version)
+	};
+	(execute, $socket_path:expr, $version: expr) => {
+		$crate::execute_worker_entrypoint(&$socket_path, $version)
+	};
+}
diff --git a/polkadot/node/core/pvf/src/prepare/memory_stats.rs b/polkadot/node/core/pvf/worker/src/memory_stats.rs
similarity index 89%
rename from polkadot/node/core/pvf/src/prepare/memory_stats.rs
rename to polkadot/node/core/pvf/worker/src/memory_stats.rs
index 3513a68c79e..945c849eb1d 100644
--- a/polkadot/node/core/pvf/src/prepare/memory_stats.rs
+++ b/polkadot/node/core/pvf/worker/src/memory_stats.rs
@@ -27,38 +27,14 @@
 //! <https://github.com/paritytech/polkadot/issues/6472#issuecomment-1381941762> for more
 //! background.
 
-use parity_scale_codec::{Decode, Encode};
-
-/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if
-/// supported by the OS, `ru_maxrss`.
-#[derive(Clone, Debug, Default, Encode, Decode)]
-pub struct MemoryStats {
-	/// Memory stats from `tikv_jemalloc_ctl`.
-	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-	pub memory_tracker_stats: Option<MemoryAllocationStats>,
-	/// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able.
-	#[cfg(target_os = "linux")]
-	pub max_rss: Option<i64>,
-}
-
-/// Statistics of collected memory metrics.
-#[non_exhaustive]
-#[derive(Clone, Debug, Default, Encode, Decode)]
-pub struct MemoryAllocationStats {
-	/// Total resident memory, in bytes.
-	pub resident: u64,
-	/// Total allocated memory, in bytes.
-	pub allocated: u64,
-}
-
 /// Module for the memory tracker. The memory tracker runs in its own thread, where it polls memory
 /// usage at an interval.
 ///
 /// NOTE: Requires jemalloc enabled.
 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
 pub mod memory_tracker {
-	use super::*;
 	use crate::LOG_TARGET;
+	use polkadot_node_core_pvf::MemoryAllocationStats;
 	use std::{
 		sync::mpsc::{Receiver, RecvTimeoutError, Sender},
 		time::Duration,
diff --git a/polkadot/node/core/pvf/worker/src/prepare.rs b/polkadot/node/core/pvf/worker/src/prepare.rs
new file mode 100644
index 00000000000..3cec7439f8d
--- /dev/null
+++ b/polkadot/node/core/pvf/worker/src/prepare.rs
@@ -0,0 +1,222 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+#[cfg(target_os = "linux")]
+use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
+#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
+use crate::{
+	common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop},
+	prepare, prevalidate, LOG_TARGET,
+};
+use cpu_time::ProcessTime;
+use futures::{pin_mut, select_biased, FutureExt};
+use parity_scale_codec::{Decode, Encode};
+use polkadot_node_core_pvf::{
+	framed_recv, framed_send, CompiledArtifact, MemoryStats, PrepareError, PrepareResult,
+	PrepareStats, PvfPrepData,
+};
+use std::{any::Any, panic, path::PathBuf, sync::mpsc::channel};
+use tokio::{io, net::UnixStream};
+
+async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> {
+	let pvf = framed_recv(stream).await?;
+	let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e),
+		)
+	})?;
+	let tmp_file = framed_recv(stream).await?;
+	let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
+		io::Error::new(
+			io::ErrorKind::Other,
+			"prepare pvf recv_request: non utf-8 artifact path".to_string(),
+		)
+	})?;
+	Ok((pvf, tmp_file))
+}
+
+async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
+	framed_send(stream, &result.encode()).await
+}
+
+/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
+/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
+/// is checked against the worker version. A mismatch results in immediate worker termination.
+/// `None` is used for tests and in other situations when version check is not necessary.
+///
+/// # Flow
+///
+/// This runs the following in a loop:
+///
+/// 1. Get the code and parameters for preparation from the host.
+///
+/// 2. Start a memory tracker in a separate thread.
+///
+/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads.
+///
+/// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor
+///    thread will trigger first.
+///
+/// 5. Stop the memory tracker and get the stats.
+///
+/// 6. If compilation succeeded, write the compiled artifact into a temporary file.
+///
+/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
+///    send that in the `PrepareResult`.
+pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
+	worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move {
+		let worker_pid = std::process::id();
+
+		loop {
+			let (pvf, dest) = recv_request(&mut stream).await?;
+			gum::debug!(
+				target: LOG_TARGET,
+				%worker_pid,
+				"worker: preparing artifact",
+			);
+
+			let cpu_time_start = ProcessTime::now();
+			let preparation_timeout = pvf.prep_timeout();
+
+			// Run the memory tracker.
+			#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+			let (memory_tracker_tx, memory_tracker_rx) = channel::<()>();
+			#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+			let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx));
+
+			// Spawn a new thread that runs the CPU time monitor.
+			let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
+			let cpu_time_monitor_fut = rt_handle
+				.spawn_blocking(move || {
+					cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx)
+				})
+				.fuse();
+			// Spawn another thread for preparation.
+			let prepare_fut = rt_handle
+				.spawn_blocking(move || {
+					let result = prepare_artifact(pvf);
+
+					// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
+					#[cfg(target_os = "linux")]
+					let result = result.map(|artifact| (artifact, get_max_rss_thread()));
+
+					result
+				})
+				.fuse();
+
+			pin_mut!(cpu_time_monitor_fut);
+			pin_mut!(prepare_fut);
+
+			let result = select_biased! {
+				// If this future is not selected, the join handle is dropped and the thread will
+				// finish in the background.
+				join_res = cpu_time_monitor_fut => {
+					match join_res {
+						Ok(Some(cpu_time_elapsed)) => {
+							// Log if we exceed the timeout and the other thread hasn't finished.
+							gum::warn!(
+								target: LOG_TARGET,
+								%worker_pid,
+								"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
+								cpu_time_elapsed.as_millis(),
+								preparation_timeout.as_millis(),
+							);
+							Err(PrepareError::TimedOut)
+						},
+						Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
+						Err(err) => Err(PrepareError::IoErr(err.to_string())),
+					}
+				},
+				prepare_res = prepare_fut => {
+					let cpu_time_elapsed = cpu_time_start.elapsed();
+					let _ = cpu_time_monitor_tx.send(());
+
+					match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) {
+						Err(err) => {
+							// Serialized error will be written into the socket.
+							Err(err)
+						},
+						Ok(ok) => {
+							// Stop the memory stats worker and get its observed memory stats.
+							#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+							let memory_tracker_stats =
+								get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await;
+							#[cfg(target_os = "linux")]
+							let (ok, max_rss) = ok;
+							let memory_stats = MemoryStats {
+								#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+								memory_tracker_stats,
+								#[cfg(target_os = "linux")]
+								max_rss: extract_max_rss_stat(max_rss, worker_pid),
+							};
+
+							// Write the serialized artifact into a temp file.
+							//
+							// PVF host only keeps artifacts statuses in its memory, successfully
+							// compiled code gets stored on the disk (and consequently deserialized
+							// by execute-workers). The prepare worker is only required to send `Ok`
+							// to the pool to indicate the success.
+
+							gum::debug!(
+								target: LOG_TARGET,
+								%worker_pid,
+								"worker: writing artifact to {}",
+								dest.display(),
+							);
+							tokio::fs::write(&dest, &ok).await?;
+
+							Ok(PrepareStats{cpu_time_elapsed, memory_stats})
+						},
+					}
+				},
+			};
+
+			send_response(&mut stream, result).await?;
+		}
+	});
+}
+
+fn prepare_artifact(pvf: PvfPrepData) -> Result<CompiledArtifact, PrepareError> {
+	panic::catch_unwind(|| {
+		let blob = match prevalidate(&pvf.code()) {
+			Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
+			Ok(b) => b,
+		};
+
+		match prepare(blob, &pvf.executor_params()) {
+			Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
+			Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
+		}
+	})
+	.map_err(|panic_payload| PrepareError::Panic(stringify_panic_payload(panic_payload)))
+	.and_then(|inner_result| inner_result)
+}
+
+/// Attempt to convert an opaque panic payload to a string.
+///
+/// This is a best effort, and is not guaranteed to provide the most accurate value.
+fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
+	match payload.downcast::<&'static str>() {
+		Ok(msg) => msg.to_string(),
+		Err(payload) => match payload.downcast::<String>() {
+			Ok(msg) => *msg,
+			// At least we tried...
+			Err(_) => "unknown panic payload".to_string(),
+		},
+	}
+}
diff --git a/polkadot/node/core/pvf/src/testing.rs b/polkadot/node/core/pvf/worker/src/testing.rs
similarity index 96%
rename from polkadot/node/core/pvf/src/testing.rs
rename to polkadot/node/core/pvf/worker/src/testing.rs
index fb1b406cdad..d09b68bf8b3 100644
--- a/polkadot/node/core/pvf/src/testing.rs
+++ b/polkadot/node/core/pvf/worker/src/testing.rs
@@ -21,10 +21,6 @@
 
 use polkadot_primitives::ExecutorParams;
 
-pub mod worker_common {
-	pub use crate::worker_common::{spawn_with_program_path, SpawnErr};
-}
-
 /// A function that emulates the stitches together behaviors of the preparation and the execution
 /// worker in a single synchronous function.
 pub fn validate_candidate(
diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/worker/tests/it/adder.rs
similarity index 100%
rename from polkadot/node/core/pvf/tests/it/adder.rs
rename to polkadot/node/core/pvf/worker/tests/it/adder.rs
diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/worker/tests/it/main.rs
similarity index 100%
rename from polkadot/node/core/pvf/tests/it/main.rs
rename to polkadot/node/core/pvf/worker/tests/it/main.rs
diff --git a/polkadot/node/core/pvf/tests/it/worker_common.rs b/polkadot/node/core/pvf/worker/tests/it/worker_common.rs
similarity index 94%
rename from polkadot/node/core/pvf/tests/it/worker_common.rs
rename to polkadot/node/core/pvf/worker/tests/it/worker_common.rs
index 3a17efc8df5..439ac8538c9 100644
--- a/polkadot/node/core/pvf/tests/it/worker_common.rs
+++ b/polkadot/node/core/pvf/worker/tests/it/worker_common.rs
@@ -15,7 +15,7 @@
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
 use crate::PUPPET_EXE;
-use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr};
+use polkadot_node_core_pvf::testing::{spawn_with_program_path, SpawnErr};
 use std::time::Duration;
 
 // Test spawning a program that immediately exits with a failure code.
diff --git a/polkadot/node/malus/Cargo.toml b/polkadot/node/malus/Cargo.toml
index c783693ca52..3c6aa5c2d39 100644
--- a/polkadot/node/malus/Cargo.toml
+++ b/polkadot/node/malus/Cargo.toml
@@ -20,9 +20,9 @@ polkadot-node-subsystem-types = { path = "../subsystem-types" }
 polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator" }
 polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" }
 polkadot-node-core-backing = { path = "../core/backing" }
+polkadot-node-core-pvf-worker = { path = "../core/pvf/worker" }
 polkadot-node-primitives = { path = "../primitives" }
 polkadot-primitives = { path = "../../primitives" }
-polkadot-node-core-pvf = { path = "../core/pvf" }
 color-eyre = { version = "0.6.1", default-features = false }
 assert_matches = "1.5"
 async-trait = "0.1.57"
diff --git a/polkadot/node/malus/src/malus.rs b/polkadot/node/malus/src/malus.rs
index f202996aca1..36cf0cca06b 100644
--- a/polkadot/node/malus/src/malus.rs
+++ b/polkadot/node/malus/src/malus.rs
@@ -97,7 +97,10 @@ impl MalusCli {
 
 				#[cfg(not(target_os = "android"))]
 				{
-					polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path, None);
+					polkadot_node_core_pvf_worker::prepare_worker_entrypoint(
+						&cmd.socket_path,
+						None,
+					);
 				}
 			},
 			NemesisVariant::PvfExecuteWorker(cmd) => {
@@ -108,7 +111,10 @@ impl MalusCli {
 
 				#[cfg(not(target_os = "android"))]
 				{
-					polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path, None);
+					polkadot_node_core_pvf_worker::execute_worker_entrypoint(
+						&cmd.socket_path,
+						None,
+					);
 				}
 			},
 		}
diff --git a/polkadot/node/test/performance-test/Cargo.toml b/polkadot/node/test/performance-test/Cargo.toml
index c83557f124d..70f072c03ae 100644
--- a/polkadot/node/test/performance-test/Cargo.toml
+++ b/polkadot/node/test/performance-test/Cargo.toml
@@ -10,11 +10,14 @@ quote = "1.0.26"
 env_logger = "0.9"
 log = "0.4"
 
-polkadot-node-core-pvf = { path = "../../core/pvf" }
+polkadot-node-core-pvf-worker = { path = "../../core/pvf/worker" }
 polkadot-erasure-coding = { path = "../../../erasure-coding" }
 polkadot-node-primitives = { path = "../../primitives" }
 polkadot-primitives = { path = "../../../primitives" }
 
+sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
 kusama-runtime = { path = "../../../runtime/kusama" }
 
 [[bin]]
diff --git a/polkadot/node/test/performance-test/src/gen_ref_constants.rs b/polkadot/node/test/performance-test/src/gen_ref_constants.rs
index 0f06af1580e..ba10ed21555 100644
--- a/polkadot/node/test/performance-test/src/gen_ref_constants.rs
+++ b/polkadot/node/test/performance-test/src/gen_ref_constants.rs
@@ -31,7 +31,6 @@ fn main() -> Result<(), PerfCheckError> {
 
 #[cfg(build_type = "release")]
 mod run {
-	use polkadot_node_core_pvf::sp_maybe_compressed_blob;
 	use polkadot_node_primitives::VALIDATION_CODE_BOMB_LIMIT;
 	use polkadot_performance_test::{
 		measure_erasure_coding, measure_pvf_prepare, PerfCheckError, ERASURE_CODING_N_VALIDATORS,
diff --git a/polkadot/node/test/performance-test/src/lib.rs b/polkadot/node/test/performance-test/src/lib.rs
index e426cc4e514..1afa43cc62b 100644
--- a/polkadot/node/test/performance-test/src/lib.rs
+++ b/polkadot/node/test/performance-test/src/lib.rs
@@ -17,7 +17,6 @@
 //! A Polkadot performance tests utilities.
 
 use polkadot_erasure_coding::{obtain_chunks, reconstruct};
-use polkadot_node_core_pvf::{sc_executor_common, sp_maybe_compressed_blob};
 use polkadot_primitives::ExecutorParams;
 use std::time::{Duration, Instant};
 
@@ -66,8 +65,9 @@ pub fn measure_pvf_prepare(wasm_code: &[u8]) -> Result<Duration, PerfCheckError>
 		.or(Err(PerfCheckError::CodeDecompressionFailed))?;
 
 	// Recreate the pipeline from the pvf prepare worker.
-	let blob = polkadot_node_core_pvf::prevalidate(code.as_ref()).map_err(PerfCheckError::from)?;
-	polkadot_node_core_pvf::prepare(blob, &ExecutorParams::default())
+	let blob =
+		polkadot_node_core_pvf_worker::prevalidate(code.as_ref()).map_err(PerfCheckError::from)?;
+	polkadot_node_core_pvf_worker::prepare(blob, &ExecutorParams::default())
 		.map_err(PerfCheckError::from)?;
 
 	Ok(start.elapsed())
diff --git a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml
index 7fe4aefc688..ee20cb0b0d1 100644
--- a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml
+++ b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml
@@ -34,7 +34,7 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master
 # This one is tricky. Even though it is not used directly by the collator, we still need it for the
 # `puppet_worker` binary, which is required for the integration test. However, this shouldn't be
 # a big problem since it is used transitively anyway.
-polkadot-node-core-pvf = { path = "../../../../node/core/pvf" }
+polkadot-node-core-pvf-worker = { path = "../../../../node/core/pvf/worker" }
 
 [dev-dependencies]
 polkadot-parachain = { path = "../../.." }
diff --git a/polkadot/parachain/test-parachains/adder/collator/bin/puppet_worker.rs b/polkadot/parachain/test-parachains/adder/collator/bin/puppet_worker.rs
index 7f93519d845..ddd81971292 100644
--- a/polkadot/parachain/test-parachains/adder/collator/bin/puppet_worker.rs
+++ b/polkadot/parachain/test-parachains/adder/collator/bin/puppet_worker.rs
@@ -14,4 +14,4 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-polkadot_node_core_pvf::decl_puppet_worker_main!();
+polkadot_node_core_pvf_worker::decl_puppet_worker_main!();
diff --git a/polkadot/parachain/test-parachains/adder/collator/src/lib.rs b/polkadot/parachain/test-parachains/adder/collator/src/lib.rs
index 02a4598f9e4..4b2b9248de2 100644
--- a/polkadot/parachain/test-parachains/adder/collator/src/lib.rs
+++ b/polkadot/parachain/test-parachains/adder/collator/src/lib.rs
@@ -272,7 +272,7 @@ mod tests {
 	}
 
 	fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) {
-		use polkadot_node_core_pvf::testing::validate_candidate;
+		use polkadot_node_core_pvf_worker::testing::validate_candidate;
 
 		let block_data = match collation.proof_of_validity {
 			MaybeCompressedPoV::Raw(pov) => pov.block_data,
diff --git a/polkadot/parachain/test-parachains/undying/collator/Cargo.toml b/polkadot/parachain/test-parachains/undying/collator/Cargo.toml
index 2b9d80401f5..1b2ccf3be0c 100644
--- a/polkadot/parachain/test-parachains/undying/collator/Cargo.toml
+++ b/polkadot/parachain/test-parachains/undying/collator/Cargo.toml
@@ -34,7 +34,7 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master
 # This one is tricky. Even though it is not used directly by the collator, we still need it for the
 # `puppet_worker` binary, which is required for the integration test. However, this shouldn't be
 # a big problem since it is used transitively anyway.
-polkadot-node-core-pvf = { path = "../../../../node/core/pvf" }
+polkadot-node-core-pvf-worker = { path = "../../../../node/core/pvf/worker" }
 
 [dev-dependencies]
 polkadot-parachain = { path = "../../.." }
diff --git a/polkadot/parachain/test-parachains/undying/collator/bin/puppet_worker.rs b/polkadot/parachain/test-parachains/undying/collator/bin/puppet_worker.rs
index 7f93519d845..ddd81971292 100644
--- a/polkadot/parachain/test-parachains/undying/collator/bin/puppet_worker.rs
+++ b/polkadot/parachain/test-parachains/undying/collator/bin/puppet_worker.rs
@@ -14,4 +14,4 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-polkadot_node_core_pvf::decl_puppet_worker_main!();
+polkadot_node_core_pvf_worker::decl_puppet_worker_main!();
diff --git a/polkadot/parachain/test-parachains/undying/collator/src/lib.rs b/polkadot/parachain/test-parachains/undying/collator/src/lib.rs
index 838590fa16f..dcaf9b63296 100644
--- a/polkadot/parachain/test-parachains/undying/collator/src/lib.rs
+++ b/polkadot/parachain/test-parachains/undying/collator/src/lib.rs
@@ -354,7 +354,7 @@ mod tests {
 	}
 
 	fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) {
-		use polkadot_node_core_pvf::testing::validate_candidate;
+		use polkadot_node_core_pvf_worker::testing::validate_candidate;
 
 		let block_data = match collation.proof_of_validity {
 			MaybeCompressedPoV::Raw(pov) => pov.block_data,
-- 
GitLab