From 5889119fc9d6479398618a31ffa9c1c3952ec3b7 Mon Sep 17 00:00:00 2001
From: Marcin S <marcin@realemail.net>
Date: Fri, 7 Apr 2023 12:14:53 +0200
Subject: [PATCH] PVF: Minor refactor in workers code (#7012)

* Move version check to `worker_event_loop`

* More minor refactors

- More consistent use of `format_invalid` and `format_internal`.
- Fix a doc error.
- Fix some poorly-named local variables.
---
 polkadot/node/core/pvf/src/error.rs          |  5 +--
 polkadot/node/core/pvf/src/execute/worker.rs | 34 +++++++---------
 polkadot/node/core/pvf/src/lib.rs            |  1 -
 polkadot/node/core/pvf/src/prepare/worker.rs | 13 +-----
 polkadot/node/core/pvf/src/worker_common.rs  | 43 ++++++++++++--------
 5 files changed, 45 insertions(+), 51 deletions(-)

diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs
index 5edf435e719..adcc6b7b1b9 100644
--- a/polkadot/node/core/pvf/src/error.rs
+++ b/polkadot/node/core/pvf/src/error.rs
@@ -29,12 +29,11 @@ pub enum PrepareError {
 	Prevalidation(String),
 	/// Compilation failed for the given PVF.
 	Preparation(String),
-	/// An unexpected panic has occured in the preparation worker.
+	/// An unexpected panic has occurred in the preparation worker.
 	Panic(String),
 	/// Failed to prepare the PVF due to the time limit.
 	TimedOut,
-	/// An IO error occurred while receiving the result from the worker process. This state is reported by the
-	/// validation host (not by the worker).
+	/// An IO error occurred. This state is reported by either the validation host or by the worker.
 	IoErr(String),
 	/// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
 	/// validation host (not by the worker).
diff --git a/polkadot/node/core/pvf/src/execute/worker.rs b/polkadot/node/core/pvf/src/execute/worker.rs
index 04357d8704b..6627337c21f 100644
--- a/polkadot/node/core/pvf/src/execute/worker.rs
+++ b/polkadot/node/core/pvf/src/execute/worker.rs
@@ -261,6 +261,13 @@ impl Response {
 			Self::InvalidCandidate(format!("{}: {}", ctx, msg))
 		}
 	}
+	fn format_internal(ctx: &'static str, msg: &str) -> Self {
+		if msg.is_empty() {
+			Self::InternalError(ctx.to_string())
+		} else {
+			Self::InternalError(format!("{}: {}", ctx, msg))
+		}
+	}
 }
 
 /// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
@@ -268,19 +275,8 @@ impl Response {
 /// is checked against the worker version. A mismatch results in immediate worker termination.
 /// `None` is used for tests and in other situations when version check is not necessary.
 pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
-	worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
+	worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
 		let worker_pid = std::process::id();
-		if let Some(version) = node_version {
-			if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
-				gum::error!(
-					target: LOG_TARGET,
-					%worker_pid,
-					"Node and worker version mismatch, node needs restarting, forcing shutdown",
-				);
-				crate::kill_parent_node_in_emergency();
-				return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
-			}
-		}
 
 		let handshake = recv_handshake(&mut stream).await?;
 		let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
@@ -301,7 +297,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
 			let cpu_time_start = ProcessTime::now();
 
 			// Spawn a new thread that runs the CPU time monitor.
-			let thread_fut = rt_handle
+			let cpu_time_monitor_fut = rt_handle
 				.spawn_blocking(move || {
 					cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
 				})
@@ -313,14 +309,14 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
 				})
 				.fuse();
 
-			pin_mut!(thread_fut);
+			pin_mut!(cpu_time_monitor_fut);
 			pin_mut!(execute_fut);
 
 			let response = select_biased! {
 				// If this future is not selected, the join handle is dropped and the thread will
 				// finish in the background.
-				join_res = thread_fut => {
-					match join_res {
+				cpu_time_monitor_res = cpu_time_monitor_fut => {
+					match cpu_time_monitor_res {
 						Ok(Some(cpu_time_elapsed)) => {
 							// Log if we exceed the timeout and the other thread hasn't finished.
 							gum::warn!(
@@ -333,12 +329,12 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
 							Response::TimedOut
 						},
 						Ok(None) => Response::InternalError("error communicating over finished channel".into()),
-						Err(e) => Response::InternalError(format!("{}", e)),
+						Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
 					}
 				},
 				execute_res = execute_fut => {
 					let _ = finished_tx.send(());
-					execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e)))
+					execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
 				},
 			};
 
@@ -367,7 +363,7 @@ fn validate_using_artifact(
 
 	let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
 		Err(err) =>
-			return Response::InvalidCandidate(format!("validation result decoding failed: {}", err)),
+			return Response::format_invalid("validation result decoding failed", &err.to_string()),
 		Ok(r) => r,
 	};
 
diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs
index 88134529bc4..8c40bbb8b93 100644
--- a/polkadot/node/core/pvf/src/lib.rs
+++ b/polkadot/node/core/pvf/src/lib.rs
@@ -114,7 +114,6 @@ pub use pvf::PvfPrepData;
 
 pub use host::{start, Config, ValidationHost};
 pub use metrics::Metrics;
-pub(crate) use worker_common::kill_parent_node_in_emergency;
 pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;
 
 pub use execute::worker_entrypoint as execute_worker_entrypoint;
diff --git a/polkadot/node/core/pvf/src/prepare/worker.rs b/polkadot/node/core/pvf/src/prepare/worker.rs
index 1ccba603c1f..43926e6b64a 100644
--- a/polkadot/node/core/pvf/src/prepare/worker.rs
+++ b/polkadot/node/core/pvf/src/prepare/worker.rs
@@ -351,19 +351,8 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
 ///	7. Send the result of preparation back to the host. If any error occurred in the above steps, we
 ///	   send that in the `PrepareResult`.
 pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
-	worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
+	worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move {
 		let worker_pid = std::process::id();
-		if let Some(version) = node_version {
-			if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
-				gum::error!(
-					target: LOG_TARGET,
-					%worker_pid,
-					"Node and worker version mismatch, node needs restarting, forcing shutdown",
-				);
-				crate::kill_parent_node_in_emergency();
-				return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
-			}
-		}
 
 		loop {
 			let (pvf, dest) = recv_request(&mut stream).await?;
diff --git a/polkadot/node/core/pvf/src/worker_common.rs b/polkadot/node/core/pvf/src/worker_common.rs
index 11b2050f2b1..a7d5d901450 100644
--- a/polkadot/node/core/pvf/src/worker_common.rs
+++ b/polkadot/node/core/pvf/src/worker_common.rs
@@ -171,18 +171,35 @@ pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
 	tmpfile_in(prefix, &temp_dir).await
 }
 
-pub fn worker_event_loop<F, Fut>(debug_id: &'static str, socket_path: &str, mut event_loop: F)
-where
+pub fn worker_event_loop<F, Fut>(
+	debug_id: &'static str,
+	socket_path: &str,
+	node_version: Option<&str>,
+	mut event_loop: F,
+) where
 	F: FnMut(Handle, UnixStream) -> Fut,
 	Fut: futures::Future<Output = io::Result<Never>>,
 {
-	gum::debug!(
-		target: LOG_TARGET,
-		worker_pid = %std::process::id(),
-		"starting pvf worker ({})",
-		debug_id,
-	);
+	let worker_pid = std::process::id();
+	gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id);
+
+	// Check for a mismatch between the node and worker versions.
+	if let Some(version) = node_version {
+		if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
+			gum::error!(
+				target: LOG_TARGET,
+				%worker_pid,
+				"Node and worker version mismatch, node needs restarting, forcing shutdown",
+			);
+			kill_parent_node_in_emergency();
+			let err: io::Result<Never> =
+				Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"));
+			gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err);
+			return
+		}
+	}
 
+	// Run the main worker loop.
 	let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
 	let handle = rt.handle();
 	let err = rt
@@ -197,13 +214,7 @@ where
 		// It's never `Ok` because it's `Ok(Never)`.
 		.unwrap_err();
 
-	gum::debug!(
-		target: LOG_TARGET,
-		worker_pid = %std::process::id(),
-		"quitting pvf worker ({}): {:?}",
-		debug_id,
-		err,
-	);
+	gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err);
 
 	// We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast
 	// as possible and not wait for stalled validation to finish. This isn't strictly necessary now,
@@ -422,7 +433,7 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
 /// get closed by the OS and other workers receive error on socket read and also exit. Preparation
 /// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
 /// no leftover artifacts are possible.
-pub(crate) fn kill_parent_node_in_emergency() {
+fn kill_parent_node_in_emergency() {
 	unsafe {
 		// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
 		// some corner cases, which is checked. `kill()` never fails.
-- 
GitLab