From 51c0c24213aeed7b7c230835a7e381cc2147df82 Mon Sep 17 00:00:00 2001
From: Marcin S <marcin@realemail.net>
Date: Thu, 5 Oct 2023 18:37:54 +0200
Subject: [PATCH] PVF: Add back socket path parameter, use tmp socket path
 (#1780)

---
 .../node/core/pvf/common/src/worker/mod.rs    |  24 ++-
 .../node/core/pvf/common/src/worker_dir.rs    |   5 -
 .../node/core/pvf/execute-worker/src/lib.rs   |   4 +
 .../node/core/pvf/prepare-worker/src/lib.rs   |   4 +
 polkadot/node/core/pvf/src/worker_intf.rs     | 140 +++++++++++-------
 5 files changed, 111 insertions(+), 66 deletions(-)

diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs
index 59973f6cbbc..d0bd5b6bd7c 100644
--- a/polkadot/node/core/pvf/common/src/worker/mod.rs
+++ b/polkadot/node/core/pvf/common/src/worker/mod.rs
@@ -18,7 +18,7 @@
 
 pub mod security;
 
-use crate::{worker_dir, SecurityStatus, LOG_TARGET};
+use crate::{SecurityStatus, LOG_TARGET};
 use cpu_time::ProcessTime;
 use futures::never::Never;
 use std::{
@@ -115,6 +115,7 @@ macro_rules! decl_worker_main {
 				},
 			}
 
+			let mut socket_path = None;
 			let mut worker_dir_path = None;
 			let mut node_version = None;
 			let mut can_enable_landlock = false;
@@ -123,6 +124,10 @@ macro_rules! decl_worker_main {
 			let mut i = 2;
 			while i < args.len() {
 				match args[i].as_ref() {
+					"--socket-path" => {
+						socket_path = Some(args[i + 1].as_str());
+						i += 1
+					},
 					"--worker-dir-path" => {
 						worker_dir_path = Some(args[i + 1].as_str());
 						i += 1
@@ -138,16 +143,24 @@ macro_rules! decl_worker_main {
 				}
 				i += 1;
 			}
+			let socket_path = socket_path.expect("the --socket-path argument is required");
 			let worker_dir_path =
 				worker_dir_path.expect("the --worker-dir-path argument is required");
 
+			let socket_path = std::path::Path::new(socket_path).to_owned();
 			let worker_dir_path = std::path::Path::new(worker_dir_path).to_owned();
 			let security_status = $crate::SecurityStatus {
 				can_enable_landlock,
 				can_unshare_user_namespace_and_change_root,
 			};
 
-			$entrypoint(worker_dir_path, node_version, Some($worker_version), security_status);
+			$entrypoint(
+				socket_path,
+				worker_dir_path,
+				node_version,
+				Some($worker_version),
+				security_status,
+			);
 		}
 	};
 }
@@ -177,6 +190,7 @@ impl fmt::Display for WorkerKind {
 // the version that this crate was compiled with.
 pub fn worker_event_loop<F, Fut>(
 	worker_kind: WorkerKind,
+	socket_path: PathBuf,
 	#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf,
 	node_version: Option<&str>,
 	worker_version: Option<&str>,
@@ -190,6 +204,7 @@ pub fn worker_event_loop<F, Fut>(
 	gum::debug!(
 		target: LOG_TARGET,
 		%worker_pid,
+		?socket_path,
 		?worker_dir_path,
 		?security_status,
 		"starting pvf worker ({})",
@@ -237,12 +252,9 @@ pub fn worker_event_loop<F, Fut>(
 	}
 
 	// Connect to the socket.
-	let socket_path = worker_dir::socket(&worker_dir_path);
 	let stream = || -> std::io::Result<UnixStream> {
 		let stream = UnixStream::connect(&socket_path)?;
-		// Remove the socket here. We don't also need to do this on the host-side; on failed
-		// rendezvous, the host will delete the whole worker dir.
-		std::fs::remove_file(&socket_path)?;
+		let _ = std::fs::remove_file(&socket_path);
 		Ok(stream)
 	}();
 	let stream = match stream {
diff --git a/polkadot/node/core/pvf/common/src/worker_dir.rs b/polkadot/node/core/pvf/common/src/worker_dir.rs
index c2610a4d112..1cdf43a61e4 100644
--- a/polkadot/node/core/pvf/common/src/worker_dir.rs
+++ b/polkadot/node/core/pvf/common/src/worker_dir.rs
@@ -20,7 +20,6 @@ use std::path::{Path, PathBuf};
 
 const WORKER_EXECUTE_ARTIFACT_NAME: &str = "artifact";
 const WORKER_PREPARE_TMP_ARTIFACT_NAME: &str = "tmp-artifact";
-const WORKER_SOCKET_NAME: &str = "socket";
 
 pub fn execute_artifact(worker_dir_path: &Path) -> PathBuf {
 	worker_dir_path.join(WORKER_EXECUTE_ARTIFACT_NAME)
@@ -29,7 +28,3 @@ pub fn execute_artifact(worker_dir_path: &Path) -> PathBuf {
 pub fn prepare_tmp_artifact(worker_dir_path: &Path) -> PathBuf {
 	worker_dir_path.join(WORKER_PREPARE_TMP_ARTIFACT_NAME)
 }
-
-pub fn socket(worker_dir_path: &Path) -> PathBuf {
-	worker_dir_path.join(WORKER_SOCKET_NAME)
-}
diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs
index 9d7bfdf2866..02eaedb96f2 100644
--- a/polkadot/node/core/pvf/execute-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs
@@ -111,6 +111,8 @@ fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()>
 ///
 /// # Parameters
 ///
+/// - `socket_path`: specifies the path to the socket used to communicate with the host.
+///
 /// - `worker_dir_path`: specifies the path to the worker-specific temporary directory.
 ///
 /// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in
@@ -121,6 +123,7 @@ fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()>
 ///
 /// - `security_status`: contains the detected status of security features.
 pub fn worker_entrypoint(
+	socket_path: PathBuf,
 	worker_dir_path: PathBuf,
 	node_version: Option<&str>,
 	worker_version: Option<&str>,
@@ -128,6 +131,7 @@ pub fn worker_entrypoint(
 ) {
 	worker_event_loop(
 		WorkerKind::Execute,
+		socket_path,
 		worker_dir_path,
 		node_version,
 		worker_version,
diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
index a24f5024722..fcc7f6754a7 100644
--- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
@@ -87,6 +87,8 @@ fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<(
 ///
 /// # Parameters
 ///
+/// - `socket_path`: specifies the path to the socket used to communicate with the host.
+///
 /// - `worker_dir_path`: specifies the path to the worker-specific temporary directory.
 ///
 /// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in
@@ -116,6 +118,7 @@ fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<(
 /// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
 ///    send that in the `PrepareResult`.
 pub fn worker_entrypoint(
+	socket_path: PathBuf,
 	worker_dir_path: PathBuf,
 	node_version: Option<&str>,
 	worker_version: Option<&str>,
@@ -123,6 +126,7 @@ pub fn worker_entrypoint(
 ) {
 	worker_event_loop(
 		WorkerKind::Prepare,
+		socket_path,
 		worker_dir_path,
 		node_version,
 		worker_version,
diff --git a/polkadot/node/core/pvf/src/worker_intf.rs b/polkadot/node/core/pvf/src/worker_intf.rs
index 9825506ba88..bd85d84055c 100644
--- a/polkadot/node/core/pvf/src/worker_intf.rs
+++ b/polkadot/node/core/pvf/src/worker_intf.rs
@@ -20,7 +20,7 @@ use crate::LOG_TARGET;
 use futures::FutureExt as _;
 use futures_timer::Delay;
 use pin_project::pin_project;
-use polkadot_node_core_pvf_common::{worker_dir, SecurityStatus};
+use polkadot_node_core_pvf_common::SecurityStatus;
 use rand::Rng;
 use std::{
 	fmt, mem,
@@ -67,71 +67,99 @@ pub async fn spawn_with_program_path(
 ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
 	let program_path = program_path.into();
 	let worker_dir = WorkerDir::new(debug_id, cache_path).await?;
-	let socket_path = worker_dir::socket(&worker_dir.path);
-
 	let extra_args: Vec<String> = extra_args.iter().map(|arg| arg.to_string()).collect();
 
-	let listener = UnixListener::bind(&socket_path).map_err(|err| {
-		gum::warn!(
-			target: LOG_TARGET,
-			%debug_id,
-			?program_path,
-			?extra_args,
-			?worker_dir,
-			?socket_path,
-			"cannot bind unix socket: {:?}",
-			err,
-		);
-		SpawnErr::Bind
-	})?;
-
-	let handle = WorkerHandle::spawn(&program_path, &extra_args, &worker_dir.path, security_status)
-		.map_err(|err| {
-			gum::warn!(
-				target: LOG_TARGET,
-				%debug_id,
-				?program_path,
-				?extra_args,
-				?worker_dir.path,
-				?socket_path,
-				"cannot spawn a worker: {:?}",
-				err,
-			);
-			SpawnErr::ProcessSpawn
-		})?;
-
-	let worker_dir_path = worker_dir.path.clone();
-	futures::select! {
-		accept_result = listener.accept().fuse() => {
-			let (stream, _) = accept_result.map_err(|err| {
+	with_transient_socket_path(debug_id, |socket_path| {
+		let socket_path = socket_path.to_owned();
+
+		async move {
+			let listener = UnixListener::bind(&socket_path).map_err(|err| {
 				gum::warn!(
 					target: LOG_TARGET,
 					%debug_id,
 					?program_path,
 					?extra_args,
-					?worker_dir_path,
+					?worker_dir,
 					?socket_path,
-					"cannot accept a worker: {:?}",
+					"cannot bind unix socket: {:?}",
 					err,
 				);
-				SpawnErr::Accept
+				SpawnErr::Bind
 			})?;
-			Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
-		}
-		_ = Delay::new(spawn_timeout).fuse() => {
-			gum::warn!(
-				target: LOG_TARGET,
-				%debug_id,
-				?program_path,
-				?extra_args,
-				?worker_dir_path,
-				?socket_path,
-				?spawn_timeout,
-				"spawning and connecting to socket timed out",
-			);
-			Err(SpawnErr::AcceptTimeout)
+
+			let handle = WorkerHandle::spawn(
+				&program_path,
+				&extra_args,
+				&socket_path,
+				&worker_dir.path,
+				security_status,
+			)
+			.map_err(|err| {
+				gum::warn!(
+					target: LOG_TARGET,
+					%debug_id,
+					?program_path,
+					?extra_args,
+					?worker_dir.path,
+					?socket_path,
+					"cannot spawn a worker: {:?}",
+					err,
+				);
+				SpawnErr::ProcessSpawn
+			})?;
+
+			let worker_dir_path = worker_dir.path.clone();
+			futures::select! {
+				accept_result = listener.accept().fuse() => {
+					let (stream, _) = accept_result.map_err(|err| {
+						gum::warn!(
+							target: LOG_TARGET,
+							%debug_id,
+							?program_path,
+							?extra_args,
+							?worker_dir_path,
+							?socket_path,
+							"cannot accept a worker: {:?}",
+							err,
+						);
+						SpawnErr::Accept
+					})?;
+					Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
+				}
+				_ = Delay::new(spawn_timeout).fuse() => {
+					gum::warn!(
+						target: LOG_TARGET,
+						%debug_id,
+						?program_path,
+						?extra_args,
+						?worker_dir_path,
+						?socket_path,
+						?spawn_timeout,
+						"spawning and connecting to socket timed out",
+					);
+					Err(SpawnErr::AcceptTimeout)
+				}
+			}
 		}
-	}
+	})
+	.await
+}
+
+async fn with_transient_socket_path<T, F, Fut>(debug_id: &'static str, f: F) -> Result<T, SpawnErr>
+where
+	F: FnOnce(&Path) -> Fut,
+	Fut: futures::Future<Output = Result<T, SpawnErr>> + 'static,
+{
+	let socket_path = tmppath(&format!("pvf-host-{}", debug_id))
+		.await
+		.map_err(|_| SpawnErr::TmpPath)?;
+	let result = f(&socket_path).await;
+
+	// Best effort to remove the socket file. Under normal circumstances the socket will be removed
+	// by the worker. We make sure that it is removed here, just in case a failed rendezvous.
+	let _ = tokio::fs::remove_file(socket_path).await;
+
+	result
 }
 
 /// Returns a path under the given `dir`. The path name will start with the given prefix.
@@ -169,7 +197,6 @@ pub async fn tmppath_in(prefix: &str, dir: &Path) -> io::Result<PathBuf> {
 }
 
 /// The same as [`tmppath_in`], but uses [`std::env::temp_dir`] as the directory.
-#[cfg(test)]
 pub async fn tmppath(prefix: &str) -> io::Result<PathBuf> {
 	let temp_dir = PathBuf::from(std::env::temp_dir());
 	tmppath_in(prefix, &temp_dir).await
@@ -234,6 +261,7 @@ impl WorkerHandle {
 	fn spawn(
 		program: impl AsRef<Path>,
 		extra_args: &[String],
+		socket_path: impl AsRef<Path>,
 		worker_dir_path: impl AsRef<Path>,
 		security_status: SecurityStatus,
 	) -> io::Result<Self> {
@@ -257,6 +285,8 @@ impl WorkerHandle {
 		}
 		let mut child = command
 			.args(extra_args)
+			.arg("--socket-path")
+			.arg(socket_path.as_ref().as_os_str())
 			.arg("--worker-dir-path")
 			.arg(worker_dir_path.as_ref().as_os_str())
 			.args(&security_args)
-- 
GitLab