diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 6c5420adb46cb1f8fd641a5a4b6b623180fc568e..ad9d2b913bca2ecc84f7cd2d2675e6cfdc5d5073 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -305,57 +305,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" -[[package]] -name = "async-attributes" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" -dependencies = [ - "quote", - "syn", -] - -[[package]] -name = "async-channel" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-executor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "once_cell", - "slab", -] - -[[package]] -name = "async-global-executor" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6" -dependencies = [ - "async-channel", - "async-executor", - "async-io", - "async-mutex", - "blocking", - "futures-lite", - "num_cpus", - "once_cell", -] - [[package]] name = "async-io" version = "1.6.0" @@ -384,65 +333,6 @@ dependencies = [ "event-listener", ] -[[package]] -name = "async-mutex" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" -dependencies = [ - "event-listener", -] - -[[package]] -name = "async-process" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83137067e3a2a6a06d67168e49e68a0957d215410473a740cea95a2425c0b7c6" -dependencies = [ - "async-io", - "blocking", - "cfg-if", - "event-listener", - "futures-lite", - "libc", - "once_cell", - "signal-hook", - "winapi", -] - -[[package]] -name = "async-std" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" -dependencies = [ - "async-attributes", - "async-channel", - "async-global-executor", - "async-io", - "async-lock", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite 0.2.7", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] -name = "async-task" -version = "4.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" - [[package]] name = "async-trait" version = "0.1.58" @@ -753,20 +643,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" -[[package]] -name = "blocking" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046e47d4b2d391b1f6f8b407b1deb8dee56c1852ccd868becf2710f601b5f427" -dependencies = [ - "async-channel", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", - "once_cell", -] - [[package]] name = "bounded-vec" version = "0.6.0" @@ -2911,19 +2787,6 @@ dependencies = [ "regex", ] -[[package]] -name = "gloo-timers" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "group" version = "0.12.1" @@ -3665,15 +3528,6 @@ dependencies = [ "sp-weights", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "kvdb" version = "0.13.0" @@ -6930,8 +6784,6 @@ version = "0.9.33" dependencies = [ "always-assert", "assert_matches", - "async-process", - "async-std", "cpu-time", "futures", "futures-timer", @@ -6956,6 +6808,7 @@ dependencies = [ "tempfile", "test-parachain-adder", "test-parachain-halt", + "tokio", "tracing-gum", ] diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 743a053f2ec7aa79340a1eed29a6159049b805f3..dd2827e751fe14957194bae97838b042b96131aa 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -627,7 +627,8 @@ trait ValidationBackend { self.validate_candidate(pvf.clone(), timeout, params.encode()).await; // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the - // assumption that the conditions that caused this error may have been transient. + // assumption that the conditions that caused this error may have been transient. Note that + // this error is only a result of execution itself and not of preparation. if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) = validation_result { @@ -676,12 +677,12 @@ impl ValidationBackend for ValidationHost { async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> { let (tx, rx) = oneshot::channel(); - if let Err(_) = self.precheck_pvf(pvf, tx).await { + if let Err(err) = self.precheck_pvf(pvf, tx).await { // Return an IO error if there was an error communicating with the host. - return Err(PrepareError::IoErr) + return Err(PrepareError::IoErr(err)) } - let precheck_result = rx.await.or(Err(PrepareError::IoErr))?; + let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?; precheck_result } diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index c6003c734973a062ed1cd851e4bb7e71ad7e0ce4..476e4ea7f985380be24a8467955212cf8282046b 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -1053,5 +1053,5 @@ fn precheck_properly_classifies_outcomes() { inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid); inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed); - inner(Err(PrepareError::IoErr), PreCheckOutcome::Failed); + inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed); } diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml index 2aaf408ae56dbb1fa6016a30c1cbb7a46a52e444..e000928264289845a4af5f829f80ecdc96e7ee62 100644 --- a/polkadot/node/core/pvf/Cargo.toml +++ b/polkadot/node/core/pvf/Cargo.toml @@ -10,8 +10,6 @@ path = "bin/puppet_worker.rs" [dependencies] always-assert = "0.1" -async-std = { version = "1.11.0", features = ["attributes"] } -async-process = "1.3.0" assert_matches = "1.4.0" cpu-time = "1.0.0" futures = "0.3.21" @@ -21,6 +19,7 @@ gum = { package = "tracing-gum", path = "../../gum" } pin-project = "1.0.9" rand = "0.8.5" tempfile = "3.3.0" +tokio = { version = "1.22.0", features = ["fs", "process"] } rayon = "1.5.1" parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] } diff --git a/polkadot/node/core/pvf/src/artifacts.rs b/polkadot/node/core/pvf/src/artifacts.rs index 413d73b4c558bfccb9018a806fa977eae1d978ff..297ed0829ccaaff1453df247a6e80cd4ebd6373d 100644 --- a/polkadot/node/core/pvf/src/artifacts.rs +++ b/polkadot/node/core/pvf/src/artifacts.rs @@ -16,10 +16,10 @@ use crate::{error::PrepareError, host::PrepareResultSender}; use always_assert::always; -use async_std::path::{Path, PathBuf}; use polkadot_parachain::primitives::ValidationCodeHash; use std::{ collections::HashMap, + path::{Path, PathBuf}, time::{Duration, SystemTime}, }; @@ -136,8 +136,8 @@ impl Artifacts { pub async fn new(cache_path: &Path) -> Self { // Make sure that the cache path directory and all its parents are created. // First delete the entire cache. Nodes are long-running so this should populate shortly. - let _ = async_std::fs::remove_dir_all(cache_path).await; - let _ = async_std::fs::create_dir_all(cache_path).await; + let _ = tokio::fs::remove_dir_all(cache_path).await; + let _ = tokio::fs::create_dir_all(cache_path).await; Self { artifacts: HashMap::new() } } @@ -214,9 +214,8 @@ impl Artifacts { #[cfg(test)] mod tests { use super::{ArtifactId, Artifacts}; - use async_std::path::Path; use sp_core::H256; - use std::str::FromStr; + use std::{path::Path, str::FromStr}; #[test] fn from_file_name() { @@ -252,11 +251,9 @@ mod tests { ); } - #[test] - fn artifacts_removes_cache_on_startup() { - let fake_cache_path = async_std::task::block_on(async move { - crate::worker_common::tmpfile("test-cache").await.unwrap() - }); + #[tokio::test] + async fn artifacts_removes_cache_on_startup() { + let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap(); let fake_artifact_path = { let mut p = fake_cache_path.clone(); p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234"); @@ -271,7 +268,7 @@ mod tests { // this should remove it and re-create. let p = &fake_cache_path; - async_std::task::block_on(async { Artifacts::new(p).await }); + Artifacts::new(p).await; assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0); diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs index 01d8c78d39cac9dd6aab2d1d9666837ee73d3a60..a679b2f96062a87b7a58b62c23eaef509da99162 100644 --- a/polkadot/node/core/pvf/src/error.rs +++ b/polkadot/node/core/pvf/src/error.rs @@ -34,7 +34,7 @@ pub enum PrepareError { TimedOut, /// An IO error occurred while receiving the result from the worker process. This state is reported by the /// validation host (not by the worker). - IoErr, + IoErr(String), /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the /// validation host (not by the worker). CreateTmpFileErr(String), @@ -54,7 +54,7 @@ impl PrepareError { use PrepareError::*; match self { Prevalidation(_) | Preparation(_) | Panic(_) => true, - TimedOut | IoErr | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, + TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, } } } @@ -67,7 +67,7 @@ impl fmt::Display for PrepareError { Preparation(err) => write!(f, "preparation: {}", err), Panic(err) => write!(f, "panic: {}", err), TimedOut => write!(f, "prepare: timeout"), - IoErr => write!(f, "prepare: io error while receiving response"), + IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err), CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err), } diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index 72b6e450351b33257cc5beb8a26363b30134c480..f2f1b4e0cfff6584c6981085f2d620371f66df47 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -24,7 +24,6 @@ use crate::{ worker_common::{IdleWorker, WorkerHandle}, InvalidCandidate, ValidationError, LOG_TARGET, }; -use async_std::path::PathBuf; use futures::{ channel::mpsc, future::BoxFuture, @@ -32,7 +31,7 @@ use futures::{ Future, FutureExt, }; use slotmap::HopSlotMap; -use std::{collections::VecDeque, fmt, time::Duration}; +use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration}; slotmap::new_key_type! { struct Worker; } diff --git a/polkadot/node/core/pvf/src/execute/worker.rs b/polkadot/node/core/pvf/src/execute/worker.rs index 105accf18e2b72f7c87ec1631a861e782412069c..df928efaa64209e6d9931f978d99d09e080e62f3 100644 --- a/polkadot/node/core/pvf/src/execute/worker.rs +++ b/polkadot/node/core/pvf/src/execute/worker.rs @@ -19,30 +19,22 @@ use crate::{ executor_intf::Executor, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, - spawn_with_program_path, worker_event_loop, IdleWorker, JobKind, SpawnErr, WorkerHandle, + spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; -use async_std::{ - io, - os::unix::net::UnixStream, - path::{Path, PathBuf}, - task, -}; use cpu_time::ProcessTime; -use futures::FutureExt; +use futures::{pin_mut, select_biased, FutureExt}; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationResult; use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, + path::{Path, PathBuf}, + sync::{mpsc::channel, Arc}, time::Duration, }; +use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -235,10 +227,10 @@ impl Response { /// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { - worker_event_loop("execute", socket_path, |mut stream| async move { - let executor = Executor::new().map_err(|e| { + worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move { + let executor = Arc::new(Executor::new().map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) - })?; + })?); loop { let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; @@ -249,52 +241,61 @@ pub fn worker_entrypoint(socket_path: &str) { artifact_path.display(), ); - // Create a lock flag. We set it when either thread finishes. - let lock = Arc::new(AtomicBool::new(false)); + // Used to signal to the cpu time monitor thread that it can finish. + let (finished_tx, finished_rx) = channel::<()>(); let cpu_time_start = ProcessTime::now(); - // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from - // sleeping and then either sleeps for the remaining CPU time, or kills the process if - // we exceed the CPU timeout. - let (stream_2, cpu_time_start_2, execution_timeout_2, lock_2) = - (stream.clone(), cpu_time_start, execution_timeout, lock.clone()); - let handle = - thread::Builder::new().name("CPU time monitor".into()).spawn(move || { - task::block_on(async { - cpu_time_monitor_loop( - JobKind::Execute, - stream_2, - cpu_time_start_2, - execution_timeout_2, - lock_2, - ) - .await; - }) - })?; + // Spawn a new thread that runs the CPU time monitor. + let thread_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx) + }) + .fuse(); + let executor_2 = executor.clone(); + let execute_fut = rt_handle + .spawn_blocking(move || { + validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) + }) + .fuse(); - let response = - validate_using_artifact(&artifact_path, ¶ms, &executor, cpu_time_start).await; + pin_mut!(thread_fut); + pin_mut!(execute_fut); - let lock_result = - lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); - if lock_result.is_err() { - // The other thread is still sending an error response over the socket. Wait on it - // and return. - let _ = handle.join(); - // Monitor thread detected timeout and likely already terminated the process, - // nothing to do. - continue - } + let response = select_biased! { + // If this future is not selected, the join handle is dropped and the thread will + // finish in the background. + join_res = thread_fut => { + match join_res { + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "execute job took {}ms cpu time, exceeded execute timeout {}ms", + cpu_time_elapsed.as_millis(), + execution_timeout.as_millis(), + ); + Response::TimedOut + }, + Ok(None) => Response::InternalError("error communicating over finished channel".into()), + Err(e) => Response::InternalError(format!("{}", e)), + } + }, + execute_res = execute_fut => { + let _ = finished_tx.send(()); + execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e))) + }, + }; send_response(&mut stream, response).await?; } }); } -async fn validate_using_artifact( +fn validate_using_artifact( artifact_path: &Path, params: &[u8], - executor: &Executor, + executor: Arc<Executor>, cpu_time_start: ProcessTime, ) -> Response { let descriptor_bytes = match unsafe { diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 96aed4eae7a8be7a169620665a604e0f465d941f..b41ebd3c425b581ec17d10abc00c2ade00c572f3 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -28,7 +28,6 @@ use crate::{ prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, }; use always_assert::never; -use async_std::path::{Path, PathBuf}; use futures::{ channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt, @@ -36,6 +35,7 @@ use futures::{ use polkadot_parachain::primitives::ValidationResult; use std::{ collections::HashMap, + path::{Path, PathBuf}, time::{Duration, SystemTime}, }; @@ -171,7 +171,7 @@ pub struct Config { impl Config { /// Create a new instance of the configuration. pub fn new(cache_path: std::path::PathBuf, program_path: std::path::PathBuf) -> Self { - // Do not contaminate the other parts of the codebase with the types from `async_std`. + // Do not contaminate the other parts of the codebase with the types from `tokio`. let cache_path = PathBuf::from(cache_path); let program_path = PathBuf::from(program_path); @@ -723,10 +723,19 @@ async fn handle_prepare_done( *state = match result { Ok(cpu_time_elapsed) => ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed }, - Err(error) => ArtifactState::FailedToProcess { - last_time_failed: SystemTime::now(), - num_failures: *num_failures + 1, - error, + Err(error) => { + gum::debug!( + target: LOG_TARGET, + artifact_id = ?artifact_id, + num_failures = ?num_failures, + "Failed to process artifact: {}", + error + ); + ArtifactState::FailedToProcess { + last_time_failed: SystemTime::now(), + num_failures: *num_failures + 1, + error, + } }, }; @@ -778,7 +787,7 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) { match sweeper_rx.next().await { None => break, Some(condemned) => { - let result = async_std::fs::remove_file(&condemned).await; + let result = tokio::fs::remove_file(&condemned).await; gum::trace!( target: LOG_TARGET, ?result, @@ -827,7 +836,7 @@ mod tests { const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); - #[async_std::test] + #[tokio::test] async fn pulse_test() { let pulse = pulse_every(Duration::from_millis(100)); futures::pin_mut!(pulse); @@ -1017,19 +1026,19 @@ mod tests { } } - #[async_std::test] + #[tokio::test] async fn shutdown_on_handle_drop() { let test = Builder::default().build(); - let join_handle = async_std::task::spawn(test.run); + let join_handle = tokio::task::spawn(test.run); // Dropping the handle will lead to conclusion of the read part and thus will make the event // loop to stop, which in turn will resolve the join handle. drop(test.to_host_tx); - join_handle.await; + join_handle.await.unwrap(); } - #[async_std::test] + #[tokio::test] async fn pruning() { let mock_now = SystemTime::now() - Duration::from_millis(1000); @@ -1059,7 +1068,7 @@ mod tests { test.poll_ensure_to_sweeper_is_empty().await; } - #[async_std::test] + #[tokio::test] async fn execute_pvf_requests() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1159,7 +1168,7 @@ mod tests { ); } - #[async_std::test] + #[tokio::test] async fn precheck_pvf() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1214,7 +1223,7 @@ mod tests { } } - #[async_std::test] + #[tokio::test] async fn test_prepare_done() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1301,7 +1310,7 @@ mod tests { // Test that multiple prechecking requests do not trigger preparation retries if the first one // failed. - #[async_std::test] + #[tokio::test] async fn test_precheck_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1344,7 +1353,7 @@ mod tests { // Test that multiple execution requests trigger preparation retries if the first one failed due // to a potentially non-reproducible error. - #[async_std::test] + #[tokio::test] async fn test_execute_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1414,7 +1423,7 @@ mod tests { // Test that multiple execution requests don't trigger preparation retries if the first one // failed due to a reproducible error (e.g. Prevalidation). - #[async_std::test] + #[tokio::test] async fn test_execute_prepare_no_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1480,7 +1489,7 @@ mod tests { } // Test that multiple heads-up requests trigger preparation retries if the first one failed. - #[async_std::test] + #[tokio::test] async fn test_heads_up_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1521,7 +1530,7 @@ mod tests { ); } - #[async_std::test] + #[tokio::test] async fn cancellation() { let mut test = Builder::default().build(); let mut host = test.host_handle(); diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs index 1aabb11004370a08f15942e58a4cfc0606b33bd5..0e858147bd29387337c1657021d5f5d2499ef1bf 100644 --- a/polkadot/node/core/pvf/src/lib.rs +++ b/polkadot/node/core/pvf/src/lib.rs @@ -113,6 +113,7 @@ pub use pvf::Pvf; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; +pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR; pub use execute::worker_entrypoint as execute_worker_entrypoint; pub use prepare::worker_entrypoint as prepare_worker_entrypoint; diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs index 3319d44e7fb4841fcf9a95c9eb0ec58a62e2e457..0d39623c99db643475a208ef6ab5f3ec37301a4b 100644 --- a/polkadot/node/core/pvf/src/prepare/pool.rs +++ b/polkadot/node/core/pvf/src/prepare/pool.rs @@ -22,12 +22,17 @@ use crate::{ LOG_TARGET, }; use always_assert::never; -use async_std::path::{Path, PathBuf}; use futures::{ channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, }; use slotmap::HopSlotMap; -use std::{fmt, sync::Arc, task::Poll, time::Duration}; +use std::{ + fmt, + path::{Path, PathBuf}, + sync::Arc, + task::Poll, + time::Duration, +}; slotmap::new_key_type! { pub struct Worker; } @@ -322,14 +327,14 @@ fn handle_mux( Ok(()) }, - Outcome::IoErr => { + Outcome::IoErr(err) => { if attempt_retire(metrics, spawned, worker) { reply( from_pool, FromPool::Concluded { worker, rip: true, - result: Err(PrepareError::IoErr), + result: Err(PrepareError::IoErr(err)), }, )?; } diff --git a/polkadot/node/core/pvf/src/prepare/queue.rs b/polkadot/node/core/pvf/src/prepare/queue.rs index e78351af9839c5eaf2b87e98864d59ccd63e01e8..c44301c7427b56cd246289d96a81c08ecaa69f61 100644 --- a/polkadot/node/core/pvf/src/prepare/queue.rs +++ b/polkadot/node/core/pvf/src/prepare/queue.rs @@ -19,10 +19,10 @@ use super::pool::{self, Worker}; use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET}; use always_assert::{always, never}; -use async_std::path::PathBuf; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; use std::{ collections::{HashMap, VecDeque}, + path::PathBuf, time::Duration, }; @@ -603,7 +603,7 @@ mod tests { } } - #[async_std::test] + #[tokio::test] async fn properly_concludes() { let mut test = Test::new(2, 2); @@ -625,7 +625,7 @@ mod tests { assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } - #[async_std::test] + #[tokio::test] async fn dont_spawn_over_soft_limit_unless_critical() { let mut test = Test::new(2, 3); let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; @@ -669,7 +669,7 @@ mod tests { assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); } - #[async_std::test] + #[tokio::test] async fn cull_unwanted() { let mut test = Test::new(1, 2); let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; @@ -707,7 +707,7 @@ mod tests { assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); } - #[async_std::test] + #[tokio::test] async fn worker_mass_die_out_doesnt_stall_queue() { let mut test = Test::new(2, 2); @@ -741,7 +741,7 @@ mod tests { assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } - #[async_std::test] + #[tokio::test] async fn doesnt_resurrect_ripped_worker_if_no_work() { let mut test = Test::new(2, 2); @@ -761,12 +761,12 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, - result: Err(PrepareError::IoErr), + result: Err(PrepareError::IoErr("test".into())), }); test.poll_ensure_to_pool_is_empty().await; } - #[async_std::test] + #[tokio::test] async fn rip_for_start_work() { let mut test = Test::new(2, 2); diff --git a/polkadot/node/core/pvf/src/prepare/worker.rs b/polkadot/node/core/pvf/src/prepare/worker.rs index 5b4212e1e313418acef39fe3a11b1a28faf7f1bb..d3550fe3afe66e919d99fbc63786f1bba4508a4e 100644 --- a/polkadot/node/core/pvf/src/prepare/worker.rs +++ b/polkadot/node/core/pvf/src/prepare/worker.rs @@ -19,29 +19,22 @@ use crate::{ error::{PrepareError, PrepareResult}, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, - spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, JobKind, SpawnErr, - WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, + spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; -use async_std::{ - io, - os::unix::net::UnixStream, - path::{Path, PathBuf}, - task, -}; use cpu_time::ProcessTime; +use futures::{pin_mut, select_biased, FutureExt}; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{ panic, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, + path::{Path, PathBuf}, + sync::{mpsc::channel, Arc}, time::Duration, }; +use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -71,7 +64,7 @@ pub enum Outcome { /// An IO error occurred while receiving the result from the worker process. /// /// This doesn't return an idle worker instance, thus this worker is no longer usable. - IoErr, + IoErr(String), } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -86,7 +79,7 @@ pub async fn start_work( artifact_path: PathBuf, preparation_timeout: Duration, ) -> Outcome { - let IdleWorker { mut stream, pid } = worker; + let IdleWorker { stream, pid } = worker; gum::debug!( target: LOG_TARGET, @@ -95,7 +88,7 @@ pub async fn start_work( artifact_path.display(), ); - with_tmp_file(stream.clone(), pid, cache_path, |tmp_file| async move { + with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { gum::warn!( target: LOG_TARGET, @@ -116,7 +109,7 @@ pub async fn start_work( // load, but the CPU resources of the child can only be measured from the parent after the // child process terminates. let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; - let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await; + let result = tokio::time::timeout(timeout, framed_recv(&mut stream)).await; match result { // Received bytes from worker within the time limit. @@ -138,7 +131,7 @@ pub async fn start_work( "failed to recv a prepare response: {:?}", err, ); - Outcome::IoErr + Outcome::IoErr(err.to_string()) }, Err(_) => { // Timed out here on the host. @@ -169,7 +162,7 @@ async fn handle_response_bytes( // By convention we expect encoded `PrepareResult`. let result = match PrepareResult::decode(&mut response_bytes.as_slice()) { Ok(result) => result, - Err(_) => { + Err(err) => { // We received invalid bytes from the worker. let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; gum::warn!( @@ -178,7 +171,7 @@ async fn handle_response_bytes( "received unexpected response from the prepare worker: {}", HexDisplay::from(&bound_bytes), ); - return Outcome::IoErr + return Outcome::IoErr(err.to_string()) }, }; let cpu_time_elapsed = match result { @@ -198,11 +191,6 @@ async fn handle_response_bytes( preparation_timeout.as_millis(), tmp_file.display(), ); - - // Return a timeout error. - // - // NOTE: The artifact exists, but is located in a temporary file which - // will be cleared by `with_tmp_file`. return Outcome::TimedOut } @@ -214,8 +202,8 @@ async fn handle_response_bytes( artifact_path.display(), ); - match async_std::fs::rename(&tmp_file, &artifact_path).await { - Ok(_) => Outcome::Concluded { worker, result }, + match tokio::fs::rename(&tmp_file, &artifact_path).await { + Ok(()) => Outcome::Concluded { worker, result }, Err(err) => { gum::warn!( target: LOG_TARGET, @@ -237,7 +225,7 @@ async fn handle_response_bytes( async fn with_tmp_file<F, Fut>(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome where Fut: futures::Future<Output = Outcome>, - F: FnOnce(PathBuf) -> Fut, + F: FnOnce(PathBuf, UnixStream) -> Fut, { let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await { Ok(f) => f, @@ -255,14 +243,14 @@ where }, }; - let outcome = f(tmp_file.clone()).await; + let outcome = f(tmp_file.clone(), stream).await; // The function called above is expected to move `tmp_file` to a new location upon success. However, // the function may as well fail and in that case we should remove the tmp file here. // // In any case, we try to remove the file here so that there are no leftovers. We only report // errors that are different from the `NotFound`. - match async_std::fs::remove_file(tmp_file).await { + match tokio::fs::remove_file(tmp_file).await { Ok(()) => (), Err(err) if err.kind() == std::io::ErrorKind::NotFound => (), Err(err) => { @@ -312,74 +300,78 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf, /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { - worker_event_loop("prepare", socket_path, |mut stream| async move { + worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; - gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), "worker: preparing artifact", ); - // Create a lock flag. We set it when either thread finishes. - let lock = Arc::new(AtomicBool::new(false)); + // Used to signal to the cpu time monitor thread that it can finish. + let (finished_tx, finished_rx) = channel::<()>(); let cpu_time_start = ProcessTime::now(); - // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from - // sleeping and then either sleeps for the remaining CPU time, or kills the process if - // we exceed the CPU timeout. - let (stream_2, cpu_time_start_2, preparation_timeout_2, lock_2) = - (stream.clone(), cpu_time_start, preparation_timeout, lock.clone()); - let handle = - thread::Builder::new().name("CPU time monitor".into()).spawn(move || { - task::block_on(async { - cpu_time_monitor_loop( - JobKind::Prepare, - stream_2, - cpu_time_start_2, - preparation_timeout_2, - lock_2, - ) - .await; - }) - })?; - - // Prepares the artifact in a separate thread. - let result = match prepare_artifact(&code).await { - Err(err) => { - // Serialized error will be written into the socket. - Err(err) + // Spawn a new thread that runs the CPU time monitor. + let thread_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop(cpu_time_start, preparation_timeout, finished_rx) + }) + .fuse(); + let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse(); + + pin_mut!(thread_fut); + pin_mut!(prepare_fut); + + let result = select_biased! { + // If this future is not selected, the join handle is dropped and the thread will + // finish in the background. + join_res = thread_fut => { + match join_res { + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", + cpu_time_elapsed.as_millis(), + preparation_timeout.as_millis(), + ); + Err(PrepareError::TimedOut) + }, + Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), + Err(err) => Err(PrepareError::IoErr(err.to_string())), + } }, - Ok(compiled_artifact) => { + compilation_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); - - let lock_result = - lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); - if lock_result.is_err() { - // The other thread is still sending an error response over the socket. Wait on it and - // return. - let _ = handle.join(); - // Monitor thread detected timeout and likely already terminated the - // process, nothing to do. - continue + let _ = finished_tx.send(()); + + match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { + Err(err) => { + // Serialized error will be written into the socket. + Err(err) + }, + Ok(compiled_artifact) => { + // Write the serialized artifact into a temp file. + // + // PVF host only keeps artifacts statuses in its memory, successfully + // compiled code gets stored on the disk (and consequently deserialized + // by execute-workers). The prepare worker is only required to send `Ok` + // to the pool to indicate the success. + + gum::debug!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: writing artifact to {}", + dest.display(), + ); + tokio::fs::write(&dest, &compiled_artifact).await?; + + Ok(cpu_time_elapsed) + }, } - - // Write the serialized artifact into a temp file. - // - // PVF host only keeps artifacts statuses in its memory, successfully compiled code gets stored - // on the disk (and consequently deserialized by execute-workers). The prepare worker is only - // required to send `Ok` to the pool to indicate the success. - - gum::debug!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: writing artifact to {}", - dest.display(), - ); - async_std::fs::write(&dest, &compiled_artifact).await?; - - Ok(cpu_time_elapsed) }, }; @@ -388,7 +380,7 @@ pub fn worker_entrypoint(socket_path: &str) { }); } -async fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> { +fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> { panic::catch_unwind(|| { let blob = match crate::executor_intf::prevalidate(code) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), diff --git a/polkadot/node/core/pvf/src/worker_common.rs b/polkadot/node/core/pvf/src/worker_common.rs index e052bd77ed066c33a2e8bce84348af546a128963..9cda5f8cd0b74f4d00dd02772069b7ea2377209e 100644 --- a/polkadot/node/core/pvf/src/worker_common.rs +++ b/polkadot/node/core/pvf/src/worker_common.rs @@ -16,31 +16,26 @@ //! Common logic for implementation of worker processes. -use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET}; -use async_std::{ - io, - net::Shutdown, - os::unix::net::{UnixListener, UnixStream}, - path::{Path, PathBuf}, -}; +use crate::LOG_TARGET; use cpu_time::ProcessTime; -use futures::{ - never::Never, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, FutureExt as _, -}; +use futures::{never::Never, FutureExt as _}; use futures_timer::Delay; -use parity_scale_codec::Encode; use pin_project::pin_project; use rand::Rng; use std::{ fmt, mem, + path::{Path, PathBuf}, pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::mpsc::{Receiver, RecvTimeoutError}, task::{Context, Poll}, time::Duration, }; +use tokio::{ + io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}, + net::{UnixListener, UnixStream}, + process, + runtime::{Handle, Runtime}, +}; /// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in /// wall clock time). This is lenient because CPU time may go slower than wall clock time. @@ -50,21 +45,6 @@ pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4; /// child process. pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); -#[derive(Copy, Clone, Debug)] -pub enum JobKind { - Prepare, - Execute, -} - -impl fmt::Display for JobKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Prepare => write!(f, "prepare"), - Self::Execute => write!(f, "execute"), - } - } -} - /// This is publicly exposed only for integration tests. #[doc(hidden)] pub async fn spawn_with_program_path( @@ -77,7 +57,7 @@ pub async fn spawn_with_program_path( with_transient_socket_path(debug_id, |socket_path| { let socket_path = socket_path.to_owned(); async move { - let listener = UnixListener::bind(&socket_path).await.map_err(|err| { + let listener = UnixListener::bind(&socket_path).map_err(|err| { gum::warn!( target: LOG_TARGET, %debug_id, @@ -132,7 +112,7 @@ where // Best effort to remove the socket file. Under normal circumstances the socket will be removed // by the worker. We make sure that it is removed here, just in case a failed rendezvous. - let _ = async_std::fs::remove_file(socket_path).await; + let _ = tokio::fs::remove_file(socket_path).await; result } @@ -163,7 +143,7 @@ pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result<PathBuf> { for _ in 0..NUM_RETRIES { let candidate_path = tmppath(prefix, dir); - if !candidate_path.exists().await { + if !candidate_path.exists() { return Ok(candidate_path) } } @@ -179,28 +159,22 @@ pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> { pub fn worker_event_loop<F, Fut>(debug_id: &'static str, socket_path: &str, mut event_loop: F) where - F: FnMut(UnixStream) -> Fut, + F: FnMut(Handle, UnixStream) -> Fut, Fut: futures::Future<Output = io::Result<Never>>, { - let err = async_std::task::block_on::<_, io::Result<Never>>(async move { - let stream = UnixStream::connect(socket_path).await?; - let _ = async_std::fs::remove_file(socket_path).await; - - let result = event_loop(stream.clone()).await; - - if let Err(err) = stream.shutdown(Shutdown::Both) { - // Log, but don't return error here, as it may shadow any error from `event_loop`. - gum::debug!( - target: LOG_TARGET, - "error shutting down stream at path {}: {}", - socket_path, - err - ); - } + let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); + let handle = rt.handle(); + let err = rt + .block_on(async move { + let stream = UnixStream::connect(socket_path).await?; + let _ = tokio::fs::remove_file(socket_path).await; - result - }) - .unwrap_err(); // it's never `Ok` because it's `Ok(Never)` + let result = event_loop(handle.clone(), stream).await; + + result + }) + // It's never `Ok` because it's `Ok(Never)`. + .unwrap_err(); gum::debug!( target: LOG_TARGET, @@ -209,74 +183,45 @@ where debug_id, err, ); + + // We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast + // as possible and not wait for stalled validation to finish. This isn't strictly necessary now, + // but may be in the future. + rt.shutdown_background(); } /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up -/// from sleeping and then either sleeps for the remaining CPU time, or sends back a timeout error -/// if we exceed the CPU timeout. +/// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout. /// -/// NOTE: If the job completes and this thread is still sleeping, it will continue sleeping in the -/// background. When it wakes, it will see that the flag has been set and return. -pub async fn cpu_time_monitor_loop( - job_kind: JobKind, - mut stream: UnixStream, +/// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return +/// `None` if the other thread finishes first, without us timing out. +/// +/// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or +/// execution, to be killed by the host. We do not kill the process here because it would interfere +/// with the proper handling of this error. +pub fn cpu_time_monitor_loop( cpu_time_start: ProcessTime, timeout: Duration, - lock: Arc<AtomicBool>, -) { + finished_rx: Receiver<()>, +) -> Option<Duration> { loop { let cpu_time_elapsed = cpu_time_start.elapsed(); // Treat the timeout as CPU time, which is less subject to variance due to load. - if cpu_time_elapsed > timeout { - let result = lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); - if result.is_err() { - // Hit the job-completed case first, return from this thread. - return - } - - // Log if we exceed the timeout. - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "{job_kind} job took {}ms cpu time, exceeded {job_kind} timeout {}ms", - cpu_time_elapsed.as_millis(), - timeout.as_millis(), - ); - - // Send back a `TimedOut` error. - // - // NOTE: This will cause the worker, whether preparation or execution, to be killed by - // the host. We do not kill the process here because it would interfere with the proper - // handling of this error. - let encoded_result = match job_kind { - JobKind::Prepare => { - let result: Result<(), PrepareError> = Err(PrepareError::TimedOut); - result.encode() - }, - JobKind::Execute => { - let result = ExecuteResponse::TimedOut; - result.encode() - }, - }; - // If we error here there is nothing we can do apart from log it. The receiving side - // will just have to time out. - if let Err(err) = framed_send(&mut stream, encoded_result.as_slice()).await { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "{job_kind} worker -> pvf host: error sending result over the socket: {:?}", - err - ); + if cpu_time_elapsed <= timeout { + // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep + // is wall clock time. The CPU clock may be slower than the wall clock. + let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD; + match finished_rx.recv_timeout(sleep_interval) { + // Received finish signal. + Ok(()) => return None, + // Timed out, restart loop. + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => return None, } - - return } - // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep - // is wall clock time. The CPU clock may be slower than the wall clock. - let sleep_interval = timeout - cpu_time_elapsed + JOB_TIMEOUT_OVERHEAD; - std::thread::sleep(sleep_interval); + return Some(cpu_time_elapsed) } } @@ -317,9 +262,10 @@ pub enum SpawnErr { /// This future relies on the fact that a child process's stdout `fd` is closed upon it's termination. #[pin_project] pub struct WorkerHandle { - child: async_process::Child, + child: process::Child, + child_id: u32, #[pin] - stdout: async_process::ChildStdout, + stdout: process::ChildStdout, program: PathBuf, drop_box: Box<[u8]>, } @@ -330,13 +276,16 @@ impl WorkerHandle { extra_args: &[&str], socket_path: impl AsRef<Path>, ) -> io::Result<Self> { - let mut child = async_process::Command::new(program.as_ref()) + let mut child = process::Command::new(program.as_ref()) .args(extra_args) .arg(socket_path.as_ref().as_os_str()) - .stdout(async_process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) .kill_on_drop(true) .spawn()?; + let child_id = child + .id() + .ok_or(io::Error::new(io::ErrorKind::Other, "could not get id of spawned process"))?; let stdout = child .stdout .take() @@ -344,6 +293,7 @@ impl WorkerHandle { Ok(WorkerHandle { child, + child_id, stdout, program: program.as_ref().to_path_buf(), // We don't expect the bytes to be ever read. But in case we do, we should not use a buffer @@ -361,7 +311,7 @@ impl WorkerHandle { /// Returns the process id of this worker. pub fn id(&self) -> u32 { - self.child.id() + self.child_id } } @@ -370,15 +320,20 @@ impl futures::Future for WorkerHandle { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.project(); - match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut *me.drop_box)) { - Ok(0) => { - // 0 means `EOF` means the child was terminated. Resolve. - Poll::Ready(()) - }, - Ok(_bytes_read) => { - // weird, we've read something. Pretend that never happened and reschedule ourselves. - cx.waker().wake_by_ref(); - Poll::Pending + // Create a `ReadBuf` here instead of storing it in `WorkerHandle` to avoid a lifetime + // parameter on `WorkerHandle`. Creating the `ReadBuf` is fairly cheap. + let mut read_buf = ReadBuf::new(&mut *me.drop_box); + match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut read_buf)) { + Ok(()) => { + if read_buf.filled().len() > 0 { + // weird, we've read something. Pretend that never happened and reschedule + // ourselves. + cx.waker().wake_by_ref(); + Poll::Pending + } else { + // Nothing read means `EOF` means the child was terminated. Resolve. + Poll::Ready(()) + } }, Err(err) => { // The implementation is guaranteed to not to return `WouldBlock` and Interrupted. This @@ -387,8 +342,8 @@ impl futures::Future for WorkerHandle { // Log the status code. gum::debug!( target: LOG_TARGET, - worker_pid = %me.child.id(), - status_code = ?me.child.try_status(), + worker_pid = %me.child_id, + status_code = ?me.child.try_wait().ok().flatten().map(|c| c.to_string()), "pvf worker ({}): {:?}", me.program.display(), err, diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index 69b6b7d21979a21aaf72c76fd4925d2b4aa6e0f1..8eb57e4d90269a22850be9b70f400feeadf8b57e 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -22,7 +22,7 @@ use polkadot_parachain::primitives::{ ValidationParams, }; -#[async_std::test] +#[tokio::test] async fn execute_good_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; @@ -50,7 +50,7 @@ async fn execute_good_block_on_parent() { assert_eq!(new_head.post_state, hash_state(512)); } -#[async_std::test] +#[tokio::test] async fn execute_good_chain_on_parent() { let mut number = 0; let mut parent_hash = [0; 32]; @@ -88,7 +88,7 @@ async fn execute_good_chain_on_parent() { } } -#[async_std::test] +#[tokio::test] async fn execute_bad_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; @@ -113,7 +113,7 @@ async fn execute_bad_block_on_parent() { .unwrap_err(); } -#[async_std::test] +#[tokio::test] async fn stress_spawn() { let host = std::sync::Arc::new(TestHost::new()); diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index a6aaf5d369d46ce95ba88f01b98d55c141119e4b..07754ef8693d2da329dd16e7703cfd585495d46f 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -14,13 +14,15 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see <http://www.gnu.org/licenses/>. -use async_std::sync::Mutex; +use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; use std::time::Duration; +use tokio::sync::Mutex; mod adder; mod worker_common; @@ -47,7 +49,7 @@ impl TestHost { let mut config = Config::new(cache_dir.path().to_owned(), program_path); f(&mut config); let (host, task) = start(config, Metrics::default()); - let _ = async_std::task::spawn(task); + let _ = tokio::task::spawn(task); Self { _cache_dir: cache_dir, host: Mutex::new(host) } } @@ -77,10 +79,11 @@ impl TestHost { } } -#[async_std::test] +#[tokio::test] async fn terminates_on_timeout() { let host = TestHost::new(); + let start = std::time::Instant::now(); let result = host .validate_candidate( halt::wasm_binary_unwrap(), @@ -97,10 +100,14 @@ async fn terminates_on_timeout() { Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {}, r => panic!("{:?}", r), } + + let duration = std::time::Instant::now().duration_since(start); + assert!(duration >= TEST_EXECUTION_TIMEOUT); + assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR); } -#[async_std::test] -async fn parallel_execution() { +#[tokio::test] +async fn ensure_parallel_execution() { // Run some jobs that do not complete, thus timing out. let host = TestHost::new(); let execute_pvf_future_1 = host.validate_candidate( @@ -123,7 +130,14 @@ async fn parallel_execution() { ); let start = std::time::Instant::now(); - let (_, _) = futures::join!(execute_pvf_future_1, execute_pvf_future_2); + let (res1, res2) = futures::join!(execute_pvf_future_1, execute_pvf_future_2); + assert_matches!( + (res1, res2), + ( + Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)), + Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) + ) + ); // Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel). let duration = std::time::Instant::now().duration_since(start); @@ -136,7 +150,7 @@ async fn parallel_execution() { ); } -#[async_std::test] +#[tokio::test] async fn execute_queue_doesnt_stall_if_workers_died() { let host = TestHost::new_with_config(|cfg| { cfg.execute_workers_max_num = 5; diff --git a/polkadot/node/core/pvf/tests/it/worker_common.rs b/polkadot/node/core/pvf/tests/it/worker_common.rs index 464b80a9fe5855c97544e96ea8b3b9aed484af9a..7e00d005df196a7064f1079024eccbca6b2fae20 100644 --- a/polkadot/node/core/pvf/tests/it/worker_common.rs +++ b/polkadot/node/core/pvf/tests/it/worker_common.rs @@ -18,7 +18,7 @@ use crate::PUPPET_EXE; use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr}; use std::time::Duration; -#[async_std::test] +#[tokio::test] async fn spawn_timeout() { let result = spawn_with_program_path("integration-test", PUPPET_EXE, &["sleep"], Duration::from_secs(2)) @@ -26,7 +26,7 @@ async fn spawn_timeout() { assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); } -#[async_std::test] +#[tokio::test] async fn should_connect() { let _ = spawn_with_program_path( "integration-test", diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs index da0a0eca80beb9d3e8a7b9cc34c3b9dfa54abe0d..9af65b3d601e32859158b5af467eb4c9a81dc288 100644 --- a/polkadot/node/primitives/src/lib.rs +++ b/polkadot/node/primitives/src/lib.rs @@ -227,7 +227,7 @@ pub type UncheckedSignedFullStatement = UncheckedSigned<Statement, CompactStatem /// Candidate invalidity details #[derive(Debug)] pub enum InvalidCandidate { - /// Failed to execute.`validate_block`. This includes function panicking. + /// Failed to execute `validate_block`. This includes function panicking. ExecutionError(String), /// Validation outputs check doesn't pass. InvalidOutputs,