diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 98d8ffc01638d12017b0500748275852a4d2d486..20321502cf97990e128aec6c5d02c126c7de2596 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -1027,6 +1027,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cpu-time" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e393a7668fe1fad3075085b86c781883000b4ede868f43627b34a87c8b7ded" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "cpufeatures" version = "0.2.1" @@ -6487,6 +6497,7 @@ dependencies = [ "assert_matches", "async-process", "async-std", + "cpu-time", "futures", "futures-timer", "hex-literal", diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index f21f1be2f1bfde9aaa1880abeea2f3b6fcf05765..74610bc113ece96c831524ef9ba45dada695aed5 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -638,7 +638,7 @@ trait ValidationBackend { } } - async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>; + async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError>; } #[async_trait] @@ -664,7 +664,7 @@ impl ValidationBackend for ValidationHost { .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? } - async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> { + async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> { let (tx, rx) = oneshot::channel(); if let Err(_) = self.precheck_pvf(pvf, tx).await { return Err(PrepareError::DidNotMakeIt) diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index cf467cd5c0570270466994ccda45e5af9826281f..5ac93bc7d1f4191b6dc981d83bb0023d48771a97 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -377,7 +377,7 @@ impl ValidationBackend for MockValidateCandidateBackend { result } - async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> { + async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> { unreachable!() } } @@ -894,11 +894,11 @@ fn pov_decompression_failure_is_invalid() { } struct MockPreCheckBackend { - result: Result<(), PrepareError>, + result: Result<Duration, PrepareError>, } impl MockPreCheckBackend { - fn with_hardcoded_result(result: Result<(), PrepareError>) -> Self { + fn with_hardcoded_result(result: Result<Duration, PrepareError>) -> Self { Self { result } } } @@ -914,7 +914,7 @@ impl ValidationBackend for MockPreCheckBackend { unreachable!() } - async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> { + async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> { self.result.clone() } } @@ -931,7 +931,7 @@ fn precheck_works() { let (check_fut, check_result) = precheck_pvf( ctx.sender(), - MockPreCheckBackend::with_hardcoded_result(Ok(())), + MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())), relay_parent, validation_code_hash, ) @@ -977,7 +977,7 @@ fn precheck_invalid_pvf_blob_compression() { let (check_fut, check_result) = precheck_pvf( ctx.sender(), - MockPreCheckBackend::with_hardcoded_result(Ok(())), + MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())), relay_parent, validation_code_hash, ) diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml index b69d96c0cfefaaba1b3376d76d4b76eb59ff2822..b88837e0833e7285ba9045c372547059f1a97eaa 100644 --- a/polkadot/node/core/pvf/Cargo.toml +++ b/polkadot/node/core/pvf/Cargo.toml @@ -13,6 +13,7 @@ always-assert = "0.1" async-std = { version = "1.11.0", features = ["attributes"] } async-process = "1.3.0" assert_matches = "1.4.0" +cpu-time = "1.0.0" futures = "0.3.21" futures-timer = "3.0.2" slotmap = "1.0" @@ -21,10 +22,13 @@ pin-project = "1.0.9" rand = "0.8.5" tempfile = "3.3.0" rayon = "1.5.1" + parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] } + polkadot-parachain = { path = "../../../parachain" } polkadot-core-primitives = { path = "../../../core-primitives" } polkadot-node-metrics = { path = "../../metrics"} + sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/core/pvf/src/artifacts.rs b/polkadot/node/core/pvf/src/artifacts.rs index 038d8e803299e721b8014d0e10ddddfc0ff3218e..413d73b4c558bfccb9018a806fa977eae1d978ff 100644 --- a/polkadot/node/core/pvf/src/artifacts.rs +++ b/polkadot/node/core/pvf/src/artifacts.rs @@ -101,6 +101,8 @@ pub enum ArtifactState { /// This is updated when we get the heads up for this artifact or when we just discover /// this file. last_time_needed: SystemTime, + /// The CPU time that was taken preparing this artifact. + cpu_time_elapsed: Duration, }, /// A task to prepare this artifact is scheduled. Preparing { @@ -171,11 +173,16 @@ impl Artifacts { /// This function must be used only for brand-new artifacts and should never be used for /// replacing existing ones. #[cfg(test)] - pub fn insert_prepared(&mut self, artifact_id: ArtifactId, last_time_needed: SystemTime) { + pub fn insert_prepared( + &mut self, + artifact_id: ArtifactId, + last_time_needed: SystemTime, + cpu_time_elapsed: Duration, + ) { // See the precondition. always!(self .artifacts - .insert(artifact_id, ArtifactState::Prepared { last_time_needed }) + .insert(artifact_id, ArtifactState::Prepared { last_time_needed, cpu_time_elapsed }) .is_none()); } diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs index 4aca2da4b3ba0c82272d82ebf2b02c10f90a58f9..ddcdb2561cfde6d90292487e7e086518db2a7630 100644 --- a/polkadot/node/core/pvf/src/error.rs +++ b/polkadot/node/core/pvf/src/error.rs @@ -15,10 +15,11 @@ // along with Polkadot. If not, see <http://www.gnu.org/licenses/>. use parity_scale_codec::{Decode, Encode}; -use std::any::Any; +use std::{any::Any, time::Duration}; -/// Result of PVF preparation performed by the validation host. -pub type PrepareResult = Result<(), PrepareError>; +/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if +/// successful +pub type PrepareResult = Result<Duration, PrepareError>; /// An error that occurred during the prepare part of the PVF pipeline. #[derive(Debug, Clone, Encode, Decode)] diff --git a/polkadot/node/core/pvf/src/execute/mod.rs b/polkadot/node/core/pvf/src/execute/mod.rs index 86e1d79fc95186796afe37007939c8254e127c0b..bc7f035a8b40b79989a31cea7e1a846aa68ab0f1 100644 --- a/polkadot/node/core/pvf/src/execute/mod.rs +++ b/polkadot/node/core/pvf/src/execute/mod.rs @@ -24,4 +24,4 @@ mod queue; mod worker; pub use queue::{start, ToQueue}; -pub use worker::worker_entrypoint; +pub use worker::{worker_entrypoint, Response as ExecuteResponse}; diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index 17fb5765f7d38b4c9cb530c7b0fc49e7e3e3e1dc..72b6e450351b33257cc5beb8a26363b30134c480 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -225,8 +225,9 @@ fn handle_job_finish( result_tx: ResultSender, ) { let (idle_worker, result) = match outcome { - Outcome::Ok { result_descriptor, duration_ms: _, idle_worker } => { + Outcome::Ok { result_descriptor, duration: _, idle_worker } => { // TODO: propagate the soft timeout + (Some(idle_worker), Ok(result_descriptor)) }, Outcome::InvalidCandidate { err, idle_worker } => ( diff --git a/polkadot/node/core/pvf/src/execute/worker.rs b/polkadot/node/core/pvf/src/execute/worker.rs index a0b8337ddc4ab0fa2ffb684e405bd736cd3f5f18..46226a159c26ab1331a3a8cd0543d5ca2558364c 100644 --- a/polkadot/node/core/pvf/src/execute/worker.rs +++ b/polkadot/node/core/pvf/src/execute/worker.rs @@ -18,8 +18,9 @@ use crate::{ artifacts::ArtifactPathId, executor_intf::Executor, worker_common::{ - bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, - worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, + bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, + spawn_with_program_path, worker_event_loop, IdleWorker, JobKind, SpawnErr, WorkerHandle, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; @@ -27,12 +28,21 @@ use async_std::{ io, os::unix::net::UnixStream, path::{Path, PathBuf}, + task, }; +use cpu_time::ProcessTime; use futures::FutureExt; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationResult; -use std::time::{Duration, Instant}; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread, + time::Duration, +}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -48,7 +58,7 @@ pub async fn spawn( pub enum Outcome { /// PVF execution completed successfully and the result is returned. The worker is ready for /// another job. - Ok { result_descriptor: ValidationResult, duration_ms: u64, idle_worker: IdleWorker }, + Ok { result_descriptor: ValidationResult, duration: Duration, idle_worker: IdleWorker }, /// The candidate validation failed. It may be for example because the wasm execution triggered a trap. /// Errors related to the preparation process are not expected to be encountered by the execution workers. InvalidCandidate { err: String, idle_worker: IdleWorker }, @@ -80,7 +90,9 @@ pub async fn start_work( artifact.path.display(), ); - if let Err(error) = send_request(&mut stream, &artifact.path, &validation_params).await { + if let Err(error) = + send_request(&mut stream, &artifact.path, &validation_params, execution_timeout).await + { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -91,6 +103,12 @@ pub async fn start_work( return Outcome::IoErr } + // We use a generous timeout here. This is in addition to the one in the child process, in + // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout + // in the child. We want to use CPU time because it varies less than wall clock time under + // load, but the CPU resources of the child can only be measured from the parent after the + // child process terminates. + let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; let response = futures::select! { response = recv_response(&mut stream).fuse() => { match response { @@ -104,25 +122,47 @@ pub async fn start_work( ); return Outcome::IoErr }, - Ok(response) => response, + Ok(response) => { + if let Response::Ok{duration, ..} = response { + if duration > execution_timeout { + // The job didn't complete within the timeout. + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "execute job took {}ms cpu time, exceeded execution timeout {}ms.", + duration.as_millis(), + execution_timeout.as_millis(), + ); + + // Return a timeout error. + return Outcome::HardTimeout; + } + } + + response + }, } }, - _ = Delay::new(execution_timeout).fuse() => { + _ = Delay::new(timeout).fuse() => { gum::warn!( target: LOG_TARGET, worker_pid = %pid, validation_code_hash = ?artifact.id.code_hash, "execution worker exceeded alloted time for execution", ); - return Outcome::HardTimeout; + // TODO: This case is not really a hard timeout as the timeout here in the host is + // lenient. Should fix this as part of + // https://github.com/paritytech/polkadot/issues/3754. + Response::TimedOut }, }; match response { - Response::Ok { result_descriptor, duration_ms } => - Outcome::Ok { result_descriptor, duration_ms, idle_worker: IdleWorker { stream, pid } }, + Response::Ok { result_descriptor, duration } => + Outcome::Ok { result_descriptor, duration, idle_worker: IdleWorker { stream, pid } }, Response::InvalidCandidate(err) => Outcome::InvalidCandidate { err, idle_worker: IdleWorker { stream, pid } }, + Response::TimedOut => Outcome::HardTimeout, Response::InternalError(err) => Outcome::InternalError { err, idle_worker: IdleWorker { stream, pid } }, } @@ -132,12 +172,14 @@ async fn send_request( stream: &mut UnixStream, artifact_path: &Path, validation_params: &[u8], + execution_timeout: Duration, ) -> io::Result<()> { framed_send(stream, path_to_bytes(artifact_path)).await?; - framed_send(stream, validation_params).await + framed_send(stream, validation_params).await?; + framed_send(stream, &execution_timeout.encode()).await } -async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>)> { +async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>, Duration)> { let artifact_path = framed_recv(stream).await?; let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| { io::Error::new( @@ -146,7 +188,14 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>)> ) })?; let params = framed_recv(stream).await?; - Ok((artifact_path, params)) + let execution_timeout = framed_recv(stream).await?; + let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: failed to decode duration".to_string(), + ) + })?; + Ok((artifact_path, params, execution_timeout)) } async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> { @@ -164,9 +213,10 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> { } #[derive(Encode, Decode)] -enum Response { - Ok { result_descriptor: ValidationResult, duration_ms: u64 }, +pub enum Response { + Ok { result_descriptor: ValidationResult, duration: Duration }, InvalidCandidate(String), + TimedOut, InternalError(String), } @@ -187,15 +237,53 @@ pub fn worker_entrypoint(socket_path: &str) { let executor = Executor::new().map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) })?; + loop { - let (artifact_path, params) = recv_request(&mut stream).await?; + let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), "worker: validating artifact {}", artifact_path.display(), ); - let response = validate_using_artifact(&artifact_path, ¶ms, &executor).await; + + // Create a lock flag. We set it when either thread finishes. + let lock = Arc::new(AtomicBool::new(false)); + let cpu_time_start = ProcessTime::now(); + + // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from + // sleeping and then either sleeps for the remaining CPU time, or kills the process if + // we exceed the CPU timeout. + let (stream_2, cpu_time_start_2, execution_timeout_2, lock_2) = + (stream.clone(), cpu_time_start, execution_timeout, lock.clone()); + let handle = + thread::Builder::new().name("CPU time monitor".into()).spawn(move || { + task::block_on(async { + cpu_time_monitor_loop( + JobKind::Execute, + stream_2, + cpu_time_start_2, + execution_timeout_2, + lock_2, + ) + .await; + }) + })?; + + let response = + validate_using_artifact(&artifact_path, ¶ms, &executor, cpu_time_start).await; + + let lock_result = + lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); + if lock_result.is_err() { + // The other thread is still sending an error response over the socket. Wait on it + // and return. + let _ = handle.join(); + // Monitor thread detected timeout and likely already terminated the process, + // nothing to do. + continue + } + send_response(&mut stream, response).await?; } }); @@ -205,19 +293,19 @@ async fn validate_using_artifact( artifact_path: &Path, params: &[u8], executor: &Executor, + cpu_time_start: ProcessTime, ) -> Response { - let validation_started_at = Instant::now(); let descriptor_bytes = match unsafe { // SAFETY: this should be safe since the compiled artifact passed here comes from the // file created by the prepare workers. These files are obtained by calling // [`executor_intf::prepare`]. executor.execute(artifact_path.as_ref(), params) } { - Err(err) => return Response::format_invalid("execute", &err.to_string()), + Err(err) => return Response::format_invalid("execute", &err), Ok(d) => d, }; - let duration_ms = validation_started_at.elapsed().as_millis() as u64; + let duration = cpu_time_start.elapsed(); let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) { Err(err) => @@ -225,5 +313,5 @@ async fn validate_using_artifact( Ok(r) => r, }; - Response::Ok { result_descriptor, duration_ms } + Response::Ok { result_descriptor, duration } } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 5c29072da1c36aed98784ae98acf4578faa04035..4834194094487467cf7de1833562a1fb1a3b7376 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -218,7 +218,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O ); let (to_execute_queue_tx, run_execute_queue) = execute::start( - metrics.clone(), + metrics, config.execute_worker_program_path.to_owned(), config.execute_workers_max_num, config.execute_worker_spawn_timeout, @@ -443,7 +443,7 @@ async fn handle_to_host( /// Handles PVF prechecking requests. /// -/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]). +/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_PREPARATION_TIMEOUT`]). /// /// If the prepare job failed previously, we may retry it under certain conditions. async fn handle_precheck_pvf( @@ -456,9 +456,9 @@ async fn handle_precheck_pvf( if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { - ArtifactState::Prepared { last_time_needed } => { + ArtifactState::Prepared { last_time_needed, cpu_time_elapsed } => { *last_time_needed = SystemTime::now(); - let _ = result_sender.send(Ok(())); + let _ = result_sender.send(Ok(*cpu_time_elapsed)); }, ArtifactState::Preparing { waiting_for_response, num_failures: _ } => waiting_for_response.push(result_sender), @@ -490,7 +490,7 @@ async fn handle_precheck_pvf( /// /// If the prepare job failed previously, we may retry it under certain conditions. /// -/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_COMPILATION_TIMEOUT`]) +/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_PREPARATION_TIMEOUT`]) /// than when prechecking. async fn handle_execute_pvf( cache_path: &Path, @@ -505,7 +505,7 @@ async fn handle_execute_pvf( if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { - ArtifactState::Prepared { last_time_needed } => { + ArtifactState::Prepared { last_time_needed, .. } => { *last_time_needed = SystemTime::now(); // This artifact has already been prepared, send it to the execute queue. @@ -563,7 +563,7 @@ async fn handle_execute_pvf( awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); } - return Ok(()) + Ok(()) } async fn handle_heads_up( @@ -701,11 +701,12 @@ async fn handle_prepare_done( } *state = match result { - Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() }, + Ok(cpu_time_elapsed) => + ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed }, Err(error) => ArtifactState::FailedToProcess { last_time_failed: SystemTime::now(), num_failures: *num_failures + 1, - error: error.clone(), + error, }, }; @@ -780,7 +781,7 @@ fn can_retry_prepare_after_failure( // Gracefully returned an error, so it will probably be reproducible. Don't retry. Prevalidation(_) | Preparation(_) => false, // Retry if the retry cooldown has elapsed and if we have already retried less than - // `NUM_PREPARE_RETRIES` times. + // `NUM_PREPARE_RETRIES` times. IO errors may resolve themselves. Panic(_) | TimedOut | DidNotMakeIt => SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && num_failures <= NUM_PREPARE_RETRIES, @@ -1016,8 +1017,8 @@ mod tests { let mut builder = Builder::default(); builder.cleanup_pulse_interval = Duration::from_millis(100); builder.artifact_ttl = Duration::from_millis(500); - builder.artifacts.insert_prepared(artifact_id(1), mock_now); - builder.artifacts.insert_prepared(artifact_id(2), mock_now); + builder.artifacts.insert_prepared(artifact_id(1), mock_now, Duration::default()); + builder.artifacts.insert_prepared(artifact_id(2), mock_now, Duration::default()); let mut test = builder.build(); let mut host = test.host_handle(); @@ -1087,7 +1088,10 @@ mod tests { ); test.from_prepare_queue_tx - .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) }) + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Ok(Duration::default()), + }) .await .unwrap(); let result_tx_pvf_1_1 = assert_matches!( @@ -1100,7 +1104,10 @@ mod tests { ); test.from_prepare_queue_tx - .send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) }) + .send(prepare::FromQueue { + artifact_id: artifact_id(2), + result: Ok(Duration::default()), + }) .await .unwrap(); let result_tx_pvf_2 = assert_matches!( @@ -1149,13 +1156,16 @@ mod tests { ); // Send `Ok` right away and poll the host. test.from_prepare_queue_tx - .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) }) + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Ok(Duration::default()), + }) .await .unwrap(); // No pending execute requests. test.poll_ensure_to_execute_queue_is_empty().await; // Received the precheck result. - assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(())); + assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_)); // Send multiple requests for the same PVF. let mut precheck_receivers = Vec::new(); @@ -1253,7 +1263,10 @@ mod tests { prepare::ToQueue::Enqueue { .. } ); test.from_prepare_queue_tx - .send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) }) + .send(prepare::FromQueue { + artifact_id: artifact_id(2), + result: Ok(Duration::default()), + }) .await .unwrap(); // The execute queue receives new request, preckecking is finished and we can @@ -1263,7 +1276,7 @@ mod tests { execute::ToQueue::Enqueue { .. } ); for result_rx in precheck_receivers { - assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(())); + assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_)); } } @@ -1511,7 +1524,10 @@ mod tests { ); test.from_prepare_queue_tx - .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) }) + .send(prepare::FromQueue { + artifact_id: artifact_id(1), + result: Ok(Duration::default()), + }) .await .unwrap(); diff --git a/polkadot/node/core/pvf/src/prepare/queue.rs b/polkadot/node/core/pvf/src/prepare/queue.rs index ae0757d8046197dfdd6f88a35d7f87e483d1c8d3..df0a8ec41883c86c1dc748fca68afff907bdbb60 100644 --- a/polkadot/node/core/pvf/src/prepare/queue.rs +++ b/polkadot/node/core/pvf/src/prepare/queue.rs @@ -364,16 +364,14 @@ async fn handle_worker_concluded( // the pool up to the hard cap. spawn_extra_worker(queue, false).await?; } + } else if queue.limits.should_cull(queue.workers.len() + queue.spawn_inflight) { + // We no longer need services of this worker. Kill it. + queue.workers.remove(worker); + send_pool(&mut queue.to_pool_tx, pool::ToPool::Kill(worker)).await?; } else { - if queue.limits.should_cull(queue.workers.len() + queue.spawn_inflight) { - // We no longer need services of this worker. Kill it. - queue.workers.remove(worker); - send_pool(&mut queue.to_pool_tx, pool::ToPool::Kill(worker)).await?; - } else { - // see if there are more work available and schedule it. - if let Some(job) = queue.unscheduled.next() { - assign(queue, worker, job).await?; - } + // see if there are more work available and schedule it. + if let Some(job) = queue.unscheduled.next() { + assign(queue, worker, job).await?; } } @@ -618,7 +616,11 @@ mod tests { let w = test.workers.insert(()); test.send_from_pool(pool::FromPool::Spawned(w)); - test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, result: Ok(()) }); + test.send_from_pool(pool::FromPool::Concluded { + worker: w, + rip: false, + result: Ok(Duration::default()), + }); assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } @@ -647,7 +649,11 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); - test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) }); + test.send_from_pool(pool::FromPool::Concluded { + worker: w1, + rip: false, + result: Ok(Duration::default()), + }); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); @@ -693,7 +699,11 @@ mod tests { // That's a bit silly in this context, but in production there will be an entire pool up // to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way, // we just check that edge case of an edge case works. - test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) }); + test.send_from_pool(pool::FromPool::Concluded { + worker: w1, + rip: false, + result: Ok(Duration::default()), + }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); } @@ -719,7 +729,11 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); // Conclude worker 1 and rip it. - test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) }); + test.send_from_pool(pool::FromPool::Concluded { + worker: w1, + rip: true, + result: Ok(Duration::default()), + }); // Since there is still work, the queue requested one extra worker to spawn to handle the // remaining enqueued work items. diff --git a/polkadot/node/core/pvf/src/prepare/worker.rs b/polkadot/node/core/pvf/src/prepare/worker.rs index a16b9b94176e54c358e53d74e3fb4f52f20796cf..4e0c411e45de24bcc6e35976af22e028d53e475f 100644 --- a/polkadot/node/core/pvf/src/prepare/worker.rs +++ b/polkadot/node/core/pvf/src/prepare/worker.rs @@ -18,8 +18,9 @@ use crate::{ artifacts::CompiledArtifact, error::{PrepareError, PrepareResult}, worker_common::{ - bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, - tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, + bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, + spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, JobKind, SpawnErr, + WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; @@ -27,10 +28,20 @@ use async_std::{ io, os::unix::net::UnixStream, path::{Path, PathBuf}, + task, }; +use cpu_time::ProcessTime; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; -use std::{panic, sync::Arc, time::Duration}; +use std::{ + panic, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread, + time::Duration, +}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -58,6 +69,13 @@ pub enum Outcome { DidNotMakeIt, } +#[derive(Debug)] +enum Selected { + Done(PrepareResult), + IoErr, + Deadline, +} + /// Given the idle token of a worker and parameters of work, communicates with the worker and /// returns the outcome. pub async fn start_work( @@ -77,7 +95,7 @@ pub async fn start_work( ); with_tmp_file(pid, cache_path, |tmp_file| async move { - if let Err(err) = send_request(&mut stream, code, &tmp_file).await { + if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -88,78 +106,52 @@ pub async fn start_work( } // Wait for the result from the worker, keeping in mind that there may be a timeout, the - // worker may get killed, or something along these lines. + // worker may get killed, or something along these lines. In that case we should propagate + // the error to the pool. // - // In that case we should propagate the error to the pool. + // We use a generous timeout here. This is in addition to the one in the child process, in + // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout + // in the child. We want to use CPU time because it varies less than wall clock time under + // load, but the CPU resources of the child can only be measured from the parent after the + // child process terminates. + let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; + let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await; - #[derive(Debug)] - enum Selected { - Done(PrepareResult), - IoErr, - Deadline, - } - - let selected = - match async_std::future::timeout(preparation_timeout, framed_recv(&mut stream)).await { - Ok(Ok(response_bytes)) => { - // Received bytes from worker within the time limit. - // By convention we expect encoded `PrepareResult`. - if let Ok(result) = PrepareResult::decode(&mut response_bytes.as_slice()) { - if result.is_ok() { - gum::debug!( - target: LOG_TARGET, - worker_pid = %pid, - "promoting WIP artifact {} to {}", - tmp_file.display(), - artifact_path.display(), - ); - - async_std::fs::rename(&tmp_file, &artifact_path) - .await - .map(|_| Selected::Done(result)) - .unwrap_or_else(|err| { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "failed to rename the artifact from {} to {}: {:?}", - tmp_file.display(), - artifact_path.display(), - err, - ); - Selected::IoErr - }) - } else { - Selected::Done(result) - } - } else { - // We received invalid bytes from the worker. - let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "received unexpected response from the prepare worker: {}", - HexDisplay::from(&bound_bytes), - ); - Selected::IoErr - } - }, - Ok(Err(err)) => { - // Communication error within the time limit. - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "failed to recv a prepare response: {:?}", - err, - ); - Selected::IoErr - }, - Err(_) => { - // Timed out. - Selected::Deadline - }, - }; + let selected = match result { + // Received bytes from worker within the time limit. + Ok(Ok(response_bytes)) => + handle_response_bytes( + response_bytes, + pid, + tmp_file, + artifact_path, + preparation_timeout, + ) + .await, + Ok(Err(err)) => { + // Communication error within the time limit. + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to recv a prepare response: {:?}", + err, + ); + Selected::IoErr + }, + Err(_) => { + // Timed out here on the host. + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "did not recv a prepare response within the time limit", + ); + Selected::Deadline + }, + }; match selected { + // Timed out on the child. This should already be logged by the child. + Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut, Selected::Done(result) => Outcome::Concluded { worker: IdleWorker { stream, pid }, result }, Selected::Deadline => Outcome::TimedOut, @@ -169,6 +161,76 @@ pub async fn start_work( .await } +/// Handles the case where we successfully received response bytes on the host from the child. +async fn handle_response_bytes( + response_bytes: Vec<u8>, + pid: u32, + tmp_file: PathBuf, + artifact_path: PathBuf, + preparation_timeout: Duration, +) -> Selected { + // By convention we expect encoded `PrepareResult`. + let result = match PrepareResult::decode(&mut response_bytes.as_slice()) { + Ok(result) => result, + Err(_) => { + // We received invalid bytes from the worker. + let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "received unexpected response from the prepare worker: {}", + HexDisplay::from(&bound_bytes), + ); + return Selected::IoErr + }, + }; + let cpu_time_elapsed = match result { + Ok(result) => result, + Err(_) => return Selected::Done(result), + }; + + if cpu_time_elapsed > preparation_timeout { + // The job didn't complete within the timeout. + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}", + cpu_time_elapsed.as_millis(), + preparation_timeout.as_millis(), + tmp_file.display(), + ); + + // Return a timeout error. + // + // NOTE: The artifact exists, but is located in a temporary file which + // will be cleared by `with_tmp_file`. + return Selected::Deadline + } + + gum::debug!( + target: LOG_TARGET, + worker_pid = %pid, + "promoting WIP artifact {} to {}", + tmp_file.display(), + artifact_path.display(), + ); + + async_std::fs::rename(&tmp_file, &artifact_path) + .await + .map(|_| Selected::Done(result)) + .unwrap_or_else(|err| { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to rename the artifact from {} to {}: {:?}", + tmp_file.display(), + artifact_path.display(), + err, + ); + Selected::IoErr + }) +} + /// Create a temporary file for an artifact at the given cache path and execute the given /// future/closure passing the file path in. /// @@ -218,13 +280,15 @@ async fn send_request( stream: &mut UnixStream, code: Arc<Vec<u8>>, tmp_file: &Path, + preparation_timeout: Duration, ) -> io::Result<()> { framed_send(stream, &code).await?; framed_send(stream, path_to_bytes(tmp_file)).await?; + framed_send(stream, &preparation_timeout.encode()).await?; Ok(()) } -async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)> { +async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf, Duration)> { let code = framed_recv(stream).await?; let tmp_file = framed_recv(stream).await?; let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { @@ -233,7 +297,14 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)> "prepare pvf recv_request: non utf-8 artifact path".to_string(), ) })?; - Ok((code, tmp_file)) + let preparation_timeout = framed_recv(stream).await?; + let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "prepare pvf recv_request: failed to decode duration".to_string(), + ) + })?; + Ok((code, tmp_file, preparation_timeout)) } /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies @@ -241,7 +312,7 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)> pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("prepare", socket_path, |mut stream| async move { loop { - let (code, dest) = recv_request(&mut stream).await?; + let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, @@ -249,18 +320,54 @@ pub fn worker_entrypoint(socket_path: &str) { "worker: preparing artifact", ); - let result = match prepare_artifact(&code) { + // Create a lock flag. We set it when either thread finishes. + let lock = Arc::new(AtomicBool::new(false)); + let cpu_time_start = ProcessTime::now(); + + // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from + // sleeping and then either sleeps for the remaining CPU time, or kills the process if + // we exceed the CPU timeout. + let (stream_2, cpu_time_start_2, preparation_timeout_2, lock_2) = + (stream.clone(), cpu_time_start, preparation_timeout, lock.clone()); + let handle = + thread::Builder::new().name("CPU time monitor".into()).spawn(move || { + task::block_on(async { + cpu_time_monitor_loop( + JobKind::Prepare, + stream_2, + cpu_time_start_2, + preparation_timeout_2, + lock_2, + ) + .await; + }) + })?; + + // Prepares the artifact in a separate thread. + let result = match prepare_artifact(&code).await { Err(err) => { // Serialized error will be written into the socket. Err(err) }, Ok(compiled_artifact) => { + let cpu_time_elapsed = cpu_time_start.elapsed(); + + let lock_result = + lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); + if lock_result.is_err() { + // The other thread is still sending an error response over the socket. Wait on it and + // return. + let _ = handle.join(); + // Monitor thread detected timeout and likely already terminated the + // process, nothing to do. + continue + } + // Write the serialized artifact into a temp file. - // PVF host only keeps artifacts statuses in its memory, - // successfully compiled code gets stored on the disk (and - // consequently deserialized by execute-workers). The prepare - // worker is only required to send an empty `Ok` to the pool - // to indicate the success. + // + // PVF host only keeps artifacts statuses in its memory, successfully compiled code gets stored + // on the disk (and consequently deserialized by execute-workers). The prepare worker is only + // required to send `Ok` to the pool to indicate the success. gum::debug!( target: LOG_TARGET, @@ -270,7 +377,7 @@ pub fn worker_entrypoint(socket_path: &str) { ); async_std::fs::write(&dest, &compiled_artifact).await?; - Ok(()) + Ok(cpu_time_elapsed) }, }; @@ -279,7 +386,7 @@ pub fn worker_entrypoint(socket_path: &str) { }); } -fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> { +async fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> { panic::catch_unwind(|| { let blob = match crate::executor_intf::prevalidate(code) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), diff --git a/polkadot/node/core/pvf/src/worker_common.rs b/polkadot/node/core/pvf/src/worker_common.rs index 572e3717832b58a8ed9c9d43bdd8aa9fe8c25ff6..55c91a64424dcb3352508fd320dec0c10c7c6fb3 100644 --- a/polkadot/node/core/pvf/src/worker_common.rs +++ b/polkadot/node/core/pvf/src/worker_common.rs @@ -16,25 +16,54 @@ //! Common logic for implementation of worker processes. -use crate::LOG_TARGET; +use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET}; use async_std::{ io, os::unix::net::{UnixListener, UnixStream}, path::{Path, PathBuf}, }; +use cpu_time::ProcessTime; use futures::{ never::Never, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, FutureExt as _, }; use futures_timer::Delay; +use parity_scale_codec::Encode; use pin_project::pin_project; use rand::Rng; use std::{ fmt, mem, pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, task::{Context, Poll}, time::Duration, }; +/// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in +/// wall clock time). This is lenient because CPU time may go slower than wall clock time. +pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4; + +/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the +/// child process. +pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); + +#[derive(Copy, Clone, Debug)] +pub enum JobKind { + Prepare, + Execute, +} + +impl fmt::Display for JobKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Prepare => write!(f, "prepare"), + Self::Execute => write!(f, "execute"), + } + } +} + /// This is publicly exposed only for integration tests. #[doc(hidden)] pub async fn spawn_with_program_path( @@ -169,6 +198,74 @@ where ); } +/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up +/// from sleeping and then either sleeps for the remaining CPU time, or kills the process if we +/// exceed the CPU timeout. +/// +/// NOTE: Killed processes are detected and cleaned up in `purge_dead`. +/// +/// NOTE: If the job completes and this thread is still sleeping, it will continue sleeping in the +/// background. When it wakes, it will see that the flag has been set and return. +pub async fn cpu_time_monitor_loop( + job_kind: JobKind, + mut stream: UnixStream, + cpu_time_start: ProcessTime, + timeout: Duration, + lock: Arc<AtomicBool>, +) { + loop { + let cpu_time_elapsed = cpu_time_start.elapsed(); + + // Treat the timeout as CPU time, which is less subject to variance due to load. + if cpu_time_elapsed > timeout { + let result = lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); + if result.is_err() { + // Hit the job-completed case first, return from this thread. + return + } + + // Log if we exceed the timeout. + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "{job_kind} job took {}ms cpu time, exceeded {job_kind} timeout {}ms", + cpu_time_elapsed.as_millis(), + timeout.as_millis(), + ); + + // Send back a TimedOut error on timeout. + let encoded_result = match job_kind { + JobKind::Prepare => { + let result: Result<(), PrepareError> = Err(PrepareError::TimedOut); + result.encode() + }, + JobKind::Execute => { + let result = ExecuteResponse::TimedOut; + result.encode() + }, + }; + // If we error there is nothing else we can do here, and we are killing the process, + // anyway. The receiving side will just have to time out. + if let Err(err) = framed_send(&mut stream, encoded_result.as_slice()).await { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "{job_kind} worker -> pvf host: error sending result over the socket: {:?}", + err + ); + } + + // Kill the process. + std::process::exit(1); + } + + // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep + // is wall clock time. The CPU clock may be slower than the wall clock. + let sleep_interval = timeout - cpu_time_elapsed + JOB_TIMEOUT_OVERHEAD; + std::thread::sleep(sleep_interval); + } +} + /// A struct that represents an idle worker. /// /// This struct is supposed to be used as a token that is passed by move into a subroutine that @@ -200,8 +297,8 @@ pub enum SpawnErr { /// This is a representation of a potentially running worker. Drop it and the process will be killed. /// /// A worker's handle is also a future that resolves when it's detected that the worker's process -/// has been terminated. Since the worker is running in another process it is obviously not necessary -/// to poll this future to make the worker run, it's only for termination detection. +/// has been terminated. Since the worker is running in another process it is obviously not +/// necessary to poll this future to make the worker run, it's only for termination detection. /// /// This future relies on the fact that a child process's stdout `fd` is closed upon it's termination. #[pin_project] diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index 83cbd27b6ed52dc706cad5ab91ff4ee3344afa45..69b6b7d21979a21aaf72c76fd4925d2b4aa6e0f1 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -23,7 +23,7 @@ use polkadot_parachain::primitives::{ }; #[async_std::test] -async fn execute_good_on_parent() { +async fn execute_good_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; @@ -89,7 +89,7 @@ async fn execute_good_chain_on_parent() { } #[async_std::test] -async fn execute_bad_on_parent() { +async fn execute_bad_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index bf0983d50874819c830e704fde71c189fa1febb8..a6aaf5d369d46ce95ba88f01b98d55c141119e4b 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -101,6 +101,7 @@ async fn terminates_on_timeout() { #[async_std::test] async fn parallel_execution() { + // Run some jobs that do not complete, thus timing out. let host = TestHost::new(); let execute_pvf_future_1 = host.validate_candidate( halt::wasm_binary_unwrap(), @@ -124,11 +125,14 @@ async fn parallel_execution() { let start = std::time::Instant::now(); let (_, _) = futures::join!(execute_pvf_future_1, execute_pvf_future_2); - // total time should be < 2 x EXECUTION_TIMEOUT_SEC - const EXECUTION_TIMEOUT_SEC: u64 = 3; + // Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel). + let duration = std::time::Instant::now().duration_since(start); + let max_duration = 2 * TEST_EXECUTION_TIMEOUT; assert!( - std::time::Instant::now().duration_since(start) < - std::time::Duration::from_secs(EXECUTION_TIMEOUT_SEC * 2) + duration < max_duration, + "Expected duration {}ms to be less than {}ms", + duration.as_millis(), + max_duration.as_millis() ); } @@ -141,6 +145,7 @@ async fn execute_queue_doesnt_stall_if_workers_died() { // Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The // first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of // workers should be spun up. + let start = std::time::Instant::now(); futures::future::join_all((0u8..=8).map(|_| { host.validate_candidate( halt::wasm_binary_unwrap(), @@ -153,4 +158,15 @@ async fn execute_queue_doesnt_stall_if_workers_died() { ) })) .await; + + // Total time should be >= 2 x TEST_EXECUTION_TIMEOUT (two separate sets of workers that should + // both timeout). + let duration = std::time::Instant::now().duration_since(start); + let max_duration = 2 * TEST_EXECUTION_TIMEOUT; + assert!( + duration >= max_duration, + "Expected duration {}ms to be greater than or equal to {}ms", + duration.as_millis(), + max_duration.as_millis() + ); } diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/candidate-validation.md b/polkadot/roadmap/implementers-guide/src/node/utility/candidate-validation.md index 4e67be0691551bf35bf3eb2e14ea0670e933c8a5..6e7a5f3d0c8f8e5534766212f6332cf80b2ed3b5 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/candidate-validation.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/candidate-validation.md @@ -77,10 +77,18 @@ time they can take. As the time for a job can vary depending on the machine and load on the machine, this can potentially lead to disputes where some validators successfuly execute a PVF and others don't. -One mitigation we have in place is a more lenient timeout for preparation during -execution than during pre-checking. The rationale is that the PVF has already -passed pre-checking, so we know it should be valid, and we allow it to take -longer than expected, as this is likely due to an issue with the machine and not -the PVF. +One dispute mitigation we have in place is a more lenient timeout for +preparation during execution than during pre-checking. The rationale is that the +PVF has already passed pre-checking, so we know it should be valid, and we allow +it to take longer than expected, as this is likely due to an issue with the +machine and not the PVF. + +#### CPU clock timeouts + +Another timeout-related mitigation we employ is to measure the time taken by +jobs using CPU time, rather than wall clock time. This is because the CPU time +of a process is less variable under different system conditions. When the +overall system is under heavy load, the wall clock time of a job is affected +more than the CPU time. [CVM]: ../../types/overseer-protocol.md#validationrequesttype diff --git a/polkadot/rpc/src/lib.rs b/polkadot/rpc/src/lib.rs index 0f54840f2ca553916f4cf564e9539141e6fd181d..43efefcae15bcad3e36e83e77b70403e6ff2987d 100644 --- a/polkadot/rpc/src/lib.rs +++ b/polkadot/rpc/src/lib.rs @@ -108,11 +108,7 @@ where + Sync + 'static, C::Api: frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>, - C::Api: mmr_rpc::MmrRuntimeApi< - Block, - <Block as sp_runtime::traits::Block>::Hash, - BlockNumber, - >, + C::Api: mmr_rpc::MmrRuntimeApi<Block, <Block as sp_runtime::traits::Block>::Hash, BlockNumber>, C::Api: pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>, C::Api: BabeApi<Block>, C::Api: BlockBuilder<Block>,