diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 85f446c3d6697c3bd42c73bfe5cf91603f2aab61..6f6d3024f2a2946973e6e925af1cd600823e303a 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -7161,6 +7161,7 @@ dependencies = [ "sp-maybe-compressed-blob", "sp-tracing", "sp-wasm-interface", + "substrate-build-script-utils", "tempfile", "test-parachain-adder", "test-parachain-halt", diff --git a/polkadot/cli/src/cli.rs b/polkadot/cli/src/cli.rs index 37083eb91278e2bdafe9d8f55033709a41cadf51..c78399788a6575d1a5a6aacc12d08351dcdeb83a 100644 --- a/polkadot/cli/src/cli.rs +++ b/polkadot/cli/src/cli.rs @@ -79,7 +79,11 @@ pub enum Subcommand { #[derive(Debug, Parser)] pub struct ValidationWorkerCommand { /// The path to the validation host's socket. + #[arg(long)] pub socket_path: String, + /// Calling node implementation version + #[arg(long)] + pub node_impl_version: String, } #[allow(missing_docs)] diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index 0d1a3f81639acbe399ce4e3b28a9459a32288244..e6eaf6f09562cbcfd3ba448da106bb290c91aae8 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -494,7 +494,10 @@ pub fn run() -> Result<()> { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path); + polkadot_node_core_pvf::prepare_worker_entrypoint( + &cmd.socket_path, + Some(&cmd.node_impl_version), + ); Ok(()) } }, @@ -513,7 +516,10 @@ pub fn run() -> Result<()> { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path); + polkadot_node_core_pvf::execute_worker_entrypoint( + &cmd.socket_path, + Some(&cmd.node_impl_version), + ); Ok(()) } }, diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml index b6c56c65951b472f3be26fc258b4ae184affe7d4..6478edc44115a3f8bfdfbc8029698cb6f0960222 100644 --- a/polkadot/node/core/pvf/Cargo.toml +++ b/polkadot/node/core/pvf/Cargo.toml @@ -15,6 +15,7 @@ cpu-time = "1.0.0" futures = "0.3.21" futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../../gum" } +libc = "0.2.139" pin-project = "1.0.9" rand = "0.8.5" rayon = "1.5.1" @@ -41,8 +42,10 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch = sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } +[build-dependencies] +substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } + [target.'cfg(target_os = "linux")'.dependencies] -libc = "0.2.139" tikv-jemalloc-ctl = "0.5.0" [dev-dependencies] diff --git a/polkadot/node/core/pvf/build.rs b/polkadot/node/core/pvf/build.rs new file mode 100644 index 0000000000000000000000000000000000000000..805fa3446f6bb5fc36d19b8911131d2fe62047bc --- /dev/null +++ b/polkadot/node/core/pvf/build.rs @@ -0,0 +1,19 @@ +// Copyright 2017-2023 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +fn main() { + substrate_build_script_utils::generate_cargo_keys(); +} diff --git a/polkadot/node/core/pvf/src/execute/worker.rs b/polkadot/node/core/pvf/src/execute/worker.rs index 523be67da268f5bdce4aed230893b8be4f2f9d8a..04357d8704bc8efa094aefc31d87afb537b5adcd 100644 --- a/polkadot/node/core/pvf/src/execute/worker.rs +++ b/polkadot/node/core/pvf/src/execute/worker.rs @@ -47,9 +47,13 @@ pub async fn spawn( executor_params: ExecutorParams, spawn_timeout: Duration, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { - let (mut idle_worker, worker_handle) = - spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout) - .await?; + let (mut idle_worker, worker_handle) = spawn_with_program_path( + "execute", + program_path, + &["execute-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")], + spawn_timeout, + ) + .await?; send_handshake(&mut idle_worker.stream, Handshake { executor_params }) .await .map_err(|error| { @@ -260,11 +264,25 @@ impl Response { } /// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies -/// the path to the socket used to communicate with the host. -pub fn worker_entrypoint(socket_path: &str) { +/// the path to the socket used to communicate with the host. The `node_version`, if `Some`, +/// is checked against the worker version. A mismatch results in immediate worker termination. +/// `None` is used for tests and in other situations when version check is not necessary. +pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move { - let handshake = recv_handshake(&mut stream).await?; + let worker_pid = std::process::id(); + if let Some(version) = node_version { + if version != env!("SUBSTRATE_CLI_IMPL_VERSION") { + gum::error!( + target: LOG_TARGET, + %worker_pid, + "Node and worker version mismatch, node needs restarting, forcing shutdown", + ); + crate::kill_parent_node_in_emergency(); + return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch")) + } + } + let handshake = recv_handshake(&mut stream).await?; let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) })?); @@ -273,7 +291,7 @@ pub fn worker_entrypoint(socket_path: &str) { let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, - worker_pid = %std::process::id(), + %worker_pid, "worker: validating artifact {}", artifact_path.display(), ); @@ -307,7 +325,7 @@ pub fn worker_entrypoint(socket_path: &str) { // Log if we exceed the timeout and the other thread hasn't finished. gum::warn!( target: LOG_TARGET, - worker_pid = %std::process::id(), + %worker_pid, "execute job took {}ms cpu time, exceeded execute timeout {}ms", cpu_time_elapsed.as_millis(), execution_timeout.as_millis(), diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs index 8c40bbb8b939023dd48503546ab3af005de3861f..88134529bc4b7f44664865082aad9cce0b1872ae 100644 --- a/polkadot/node/core/pvf/src/lib.rs +++ b/polkadot/node/core/pvf/src/lib.rs @@ -114,6 +114,7 @@ pub use pvf::PvfPrepData; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; +pub(crate) use worker_common::kill_parent_node_in_emergency; pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR; pub use execute::worker_entrypoint as execute_worker_entrypoint; diff --git a/polkadot/node/core/pvf/src/prepare/worker.rs b/polkadot/node/core/pvf/src/prepare/worker.rs index 962ad2742bf87526ef1c735e1c40580423dcfb39..1ccba603c1fb96b81398b28fd662d3ad61a3482a 100644 --- a/polkadot/node/core/pvf/src/prepare/worker.rs +++ b/polkadot/node/core/pvf/src/prepare/worker.rs @@ -52,7 +52,13 @@ pub async fn spawn( program_path: &Path, spawn_timeout: Duration, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { - spawn_with_program_path("prepare", program_path, &["prepare-worker"], spawn_timeout).await + spawn_with_program_path( + "prepare", + program_path, + &["prepare-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")], + spawn_timeout, + ) + .await } pub enum Outcome { @@ -321,7 +327,9 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR } /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies -/// the path to the socket used to communicate with the host. +/// the path to the socket used to communicate with the host. The `node_version`, if `Some`, +/// is checked against the worker version. A mismatch results in immediate worker termination. +/// `None` is used for tests and in other situations when version check is not necessary. /// /// # Flow /// @@ -342,10 +350,22 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR /// /// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we /// send that in the `PrepareResult`. -pub fn worker_entrypoint(socket_path: &str) { +pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { + let worker_pid = std::process::id(); + if let Some(version) = node_version { + if version != env!("SUBSTRATE_CLI_IMPL_VERSION") { + gum::error!( + target: LOG_TARGET, + %worker_pid, + "Node and worker version mismatch, node needs restarting, forcing shutdown", + ); + crate::kill_parent_node_in_emergency(); + return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch")) + } + } + loop { - let worker_pid = std::process::id(); let (pvf, dest) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, diff --git a/polkadot/node/core/pvf/src/testing.rs b/polkadot/node/core/pvf/src/testing.rs index f0b6a0a60f7cbc98c49dea3662f3199df3c6e62f..9ff6d5a15e0577f3a909d13257b2a8e204e4bb19 100644 --- a/polkadot/node/core/pvf/src/testing.rs +++ b/polkadot/node/core/pvf/src/testing.rs @@ -61,22 +61,34 @@ macro_rules! decl_puppet_worker_main { $crate::sp_tracing::try_init_simple(); let args = std::env::args().collect::<Vec<_>>(); - if args.len() < 2 { + if args.len() < 3 { panic!("wrong number of arguments"); } + let mut version = None; + let mut socket_path: &str = ""; + + for i in 2..args.len() { + match args[i].as_ref() { + "--socket-path" => socket_path = args[i + 1].as_str(), + "--node-version" => version = Some(args[i + 1].as_str()), + _ => (), + } + } + let subcommand = &args[1]; match subcommand.as_ref() { + "exit" => { + std::process::exit(1); + }, "sleep" => { std::thread::sleep(std::time::Duration::from_secs(5)); }, "prepare-worker" => { - let socket_path = &args[2]; - $crate::prepare_worker_entrypoint(socket_path); + $crate::prepare_worker_entrypoint(&socket_path, version); }, "execute-worker" => { - let socket_path = &args[2]; - $crate::execute_worker_entrypoint(socket_path); + $crate::execute_worker_entrypoint(&socket_path, version); }, other => panic!("unknown subcommand: {}", other), } diff --git a/polkadot/node/core/pvf/src/worker_common.rs b/polkadot/node/core/pvf/src/worker_common.rs index 430a6950fb4f0718d45678a4555c719ca7ab5e29..3ed2994a2f942c344ba8d3714e96848ded9b8406 100644 --- a/polkadot/node/core/pvf/src/worker_common.rs +++ b/polkadot/node/core/pvf/src/worker_common.rs @@ -61,6 +61,8 @@ pub async fn spawn_with_program_path( gum::warn!( target: LOG_TARGET, %debug_id, + ?program_path, + ?extra_args, "cannot bind unix socket: {:?}", err, ); @@ -68,10 +70,12 @@ pub async fn spawn_with_program_path( })?; let handle = - WorkerHandle::spawn(program_path, extra_args, socket_path).map_err(|err| { + WorkerHandle::spawn(&program_path, extra_args, socket_path).map_err(|err| { gum::warn!( target: LOG_TARGET, %debug_id, + ?program_path, + ?extra_args, "cannot spawn a worker: {:?}", err, ); @@ -84,6 +88,8 @@ pub async fn spawn_with_program_path( gum::warn!( target: LOG_TARGET, %debug_id, + ?program_path, + ?extra_args, "cannot accept a worker: {:?}", err, ); @@ -92,6 +98,14 @@ pub async fn spawn_with_program_path( Ok((IdleWorker { stream, pid: handle.id() }, handle)) } _ = Delay::new(spawn_timeout).fuse() => { + gum::warn!( + target: LOG_TARGET, + %debug_id, + ?program_path, + ?extra_args, + ?spawn_timeout, + "spawning and connecting to socket timed out", + ); Err(SpawnErr::AcceptTimeout) } } @@ -162,6 +176,13 @@ where F: FnMut(Handle, UnixStream) -> Fut, Fut: futures::Future<Output = io::Result<Never>>, { + gum::debug!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "starting pvf worker ({})", + debug_id, + ); + let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); let handle = rt.handle(); let err = rt @@ -179,7 +200,7 @@ where gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), - "pvf worker ({}): {:?}", + "quitting pvf worker ({}): {:?}", debug_id, err, ); @@ -280,6 +301,7 @@ impl WorkerHandle { ) -> io::Result<Self> { let mut child = process::Command::new(program.as_ref()) .args(extra_args) + .arg("--socket-path") .arg(socket_path.as_ref().as_os_str()) .stdout(std::process::Stdio::piped()) .kill_on_drop(true) @@ -393,3 +415,20 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8> r.read_exact(&mut buf).await?; Ok(buf) } + +/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGKILL` +/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node +/// restart should be handled by the node owner. As node exits, unix sockets opened to workers +/// get closed by the OS and other workers receive error on socket read and also exit. Preparation +/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so +/// no leftover artifacts are possible. +pub(crate) fn kill_parent_node_in_emergency() { + unsafe { + // SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in + // some corner cases, which is checked. `kill()` never fails. + let ppid = libc::getppid(); + if ppid > 1 { + libc::kill(ppid, libc::SIGKILL); + } + } +} diff --git a/polkadot/node/core/pvf/tests/it/worker_common.rs b/polkadot/node/core/pvf/tests/it/worker_common.rs index 7e00d005df196a7064f1079024eccbca6b2fae20..72bc80916262985d82491c8da081dcc5a895443b 100644 --- a/polkadot/node/core/pvf/tests/it/worker_common.rs +++ b/polkadot/node/core/pvf/tests/it/worker_common.rs @@ -18,6 +18,15 @@ use crate::PUPPET_EXE; use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr}; use std::time::Duration; +// Test spawning a program that immediately exits with a failure code. +#[tokio::test] +async fn spawn_immediate_exit() { + let result = + spawn_with_program_path("integration-test", PUPPET_EXE, &["exit"], Duration::from_secs(2)) + .await; + assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); +} + #[tokio::test] async fn spawn_timeout() { let result = diff --git a/polkadot/node/malus/src/malus.rs b/polkadot/node/malus/src/malus.rs index bb466d4ba4dea2af39aa2d806bf168950cd253ff..2c10f75beb5a5578ee50d06bd303e78bd5f454ba 100644 --- a/polkadot/node/malus/src/malus.rs +++ b/polkadot/node/malus/src/malus.rs @@ -97,7 +97,7 @@ impl MalusCli { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path); + polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path, None); } }, NemesisVariant::PvfExecuteWorker(cmd) => { @@ -108,7 +108,7 @@ impl MalusCli { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path); + polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path, None); } }, }