diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index e1d377bfb54aa48e6959a78b98b6a284fc923438..ea5d53d81438726398f0abaa81d521a7fbc40705 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6428,6 +6428,7 @@ dependencies = [ "assert_matches", "async-trait", "futures", + "futures-timer", "parity-scale-codec", "polkadot-node-core-pvf", "polkadot-node-primitives", diff --git a/polkadot/node/core/candidate-validation/Cargo.toml b/polkadot/node/core/candidate-validation/Cargo.toml index 8634cfe5a75efc9f0be867274c3605906e63967c..105d7c1a21dc8420a84e446e53d62ef90b32bf1e 100644 --- a/polkadot/node/core/candidate-validation/Cargo.toml +++ b/polkadot/node/core/candidate-validation/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] async-trait = "0.1.57" futures = "0.3.21" +futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../../gum" } sp-maybe-compressed-blob = { package = "sp-maybe-compressed-blob", git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index c3775ba1c453086897b267d36e6231034611045f..a82a0feb78a048d8b3186977307f271f410559eb 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -60,6 +60,12 @@ mod tests; const LOG_TARGET: &'static str = "parachain::candidate-validation"; +/// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error. +#[cfg(not(test))] +const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3); +#[cfg(test)] +const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200); + /// Configuration for the candidate validation subsystem #[derive(Clone)] pub struct Config { @@ -490,7 +496,7 @@ where } async fn validate_candidate_exhaustive( - mut validation_backend: impl ValidationBackend, + mut validation_backend: impl ValidationBackend + Send, persisted_validation_data: PersistedValidationData, validation_code: ValidationCode, candidate_receipt: CandidateReceipt, @@ -551,7 +557,7 @@ async fn validate_candidate_exhaustive( }; let result = validation_backend - .validate_candidate(raw_validation_code.to_vec(), timeout, params) + .validate_candidate_with_retry(raw_validation_code.to_vec(), timeout, params) .await; if let Err(ref error) = result { @@ -604,45 +610,63 @@ async fn validate_candidate_exhaustive( #[async_trait] trait ValidationBackend { async fn validate_candidate( + &mut self, + pvf: Pvf, + timeout: Duration, + encoded_params: Vec<u8>, + ) -> Result<WasmValidationResult, ValidationError>; + + async fn validate_candidate_with_retry( &mut self, raw_validation_code: Vec<u8>, timeout: Duration, params: ValidationParams, - ) -> Result<WasmValidationResult, ValidationError>; + ) -> Result<WasmValidationResult, ValidationError> { + // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. + let pvf = Pvf::from_code(raw_validation_code); + + let validation_result = + self.validate_candidate(pvf.clone(), timeout, params.encode()).await; + + // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the + // assumption that the conditions that caused this error may have been transient. + if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) = + validation_result + { + // Wait a brief delay before retrying. + futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await; + // Encode the params again when re-trying. We expect the retry case to be relatively + // rare, and we want to avoid unconditionally cloning data. + self.validate_candidate(pvf, timeout, params.encode()).await + } else { + validation_result + } + } async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>; } #[async_trait] impl ValidationBackend for ValidationHost { + /// Tries executing a PVF a single time (no retries). async fn validate_candidate( &mut self, - raw_validation_code: Vec<u8>, + pvf: Pvf, timeout: Duration, - params: ValidationParams, + encoded_params: Vec<u8>, ) -> Result<WasmValidationResult, ValidationError> { + let priority = polkadot_node_core_pvf::Priority::Normal; + let (tx, rx) = oneshot::channel(); - if let Err(err) = self - .execute_pvf( - Pvf::from_code(raw_validation_code), - timeout, - params.encode(), - polkadot_node_core_pvf::Priority::Normal, - tx, - ) - .await - { + if let Err(err) = self.execute_pvf(pvf, timeout, encoded_params, priority, tx).await { return Err(ValidationError::InternalError(format!( "cannot send pvf to the validation host: {:?}", err ))) } - let validation_result = rx - .await - .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?; - - validation_result + rx.await + .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? } async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> { diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index ecac13d1440db3eaafbbc6bed7f5712b1d9bebaf..cf467cd5c0570270466994ccda45e5af9826281f 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -345,12 +345,19 @@ fn check_does_not_match() { } struct MockValidateCandidateBackend { - result: Result<WasmValidationResult, ValidationError>, + result_list: Vec<Result<WasmValidationResult, ValidationError>>, + num_times_called: usize, } impl MockValidateCandidateBackend { fn with_hardcoded_result(result: Result<WasmValidationResult, ValidationError>) -> Self { - Self { result } + Self { result_list: vec![result], num_times_called: 0 } + } + + fn with_hardcoded_result_list( + result_list: Vec<Result<WasmValidationResult, ValidationError>>, + ) -> Self { + Self { result_list, num_times_called: 0 } } } @@ -358,11 +365,16 @@ impl MockValidateCandidateBackend { impl ValidationBackend for MockValidateCandidateBackend { async fn validate_candidate( &mut self, - _raw_validation_code: Vec<u8>, + _pvf: Pvf, _timeout: Duration, - _params: ValidationParams, + _encoded_params: Vec<u8>, ) -> Result<WasmValidationResult, ValidationError> { - self.result.clone() + // This is expected to panic if called more times than expected, indicating an error in the + // test. + let result = self.result_list[self.num_times_called].clone(); + self.num_times_called += 1; + + result } async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> { @@ -468,7 +480,7 @@ fn candidate_validation_bad_return_is_invalid() { let v = executor::block_on(validate_candidate_exhaustive( MockValidateCandidateBackend::with_hardcoded_result(Err( - ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath), + ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), )), validation_data, validation_code, @@ -479,6 +491,122 @@ fn candidate_validation_bad_return_is_invalid() { )) .unwrap(); + assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::Timeout)); +} + +#[test] +fn candidate_validation_one_ambiguous_error_is_valid() { + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; + + let pov = PoV { block_data: BlockData(vec![1; 32]) }; + let head_data = HeadData(vec![1, 1, 1]); + let validation_code = ValidationCode(vec![2; 16]); + + let descriptor = make_valid_candidate_descriptor( + ParaId::from(1_u32), + dummy_hash(), + validation_data.hash(), + pov.hash(), + validation_code.hash(), + head_data.hash(), + dummy_hash(), + Sr25519Keyring::Alice, + ); + + let check = perform_basic_checks( + &descriptor, + validation_data.max_pov_size, + &pov, + &validation_code.hash(), + ); + assert!(check.is_ok()); + + let validation_result = WasmValidationResult { + head_data, + new_validation_code: Some(vec![2, 2, 2].into()), + upward_messages: Vec::new(), + horizontal_messages: Vec::new(), + processed_downward_messages: 0, + hrmp_watermark: 0, + }; + + let commitments = CandidateCommitments { + head_data: validation_result.head_data.clone(), + upward_messages: validation_result.upward_messages.clone(), + horizontal_messages: validation_result.horizontal_messages.clone(), + new_validation_code: validation_result.new_validation_code.clone(), + processed_downward_messages: validation_result.processed_downward_messages, + hrmp_watermark: validation_result.hrmp_watermark, + }; + + let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; + + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + Ok(validation_result), + ]), + validation_data.clone(), + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &Default::default(), + )) + .unwrap(); + + assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => { + assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1])); + assert_eq!(outputs.upward_messages, Vec::<UpwardMessage>::new()); + assert_eq!(outputs.horizontal_messages, Vec::new()); + assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into())); + assert_eq!(outputs.hrmp_watermark, 0); + assert_eq!(used_validation_data, validation_data); + }); +} + +#[test] +fn candidate_validation_multiple_ambiguous_errors_is_invalid() { + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; + + let pov = PoV { block_data: BlockData(vec![1; 32]) }; + let validation_code = ValidationCode(vec![2; 16]); + + let descriptor = make_valid_candidate_descriptor( + ParaId::from(1_u32), + dummy_hash(), + validation_data.hash(), + pov.hash(), + validation_code.hash(), + dummy_hash(), + dummy_hash(), + Sr25519Keyring::Alice, + ); + + let check = perform_basic_checks( + &descriptor, + validation_data.max_pov_size, + &pov, + &validation_code.hash(), + ); + assert!(check.is_ok()); + + let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; + + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + ]), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &Default::default(), + )) + .unwrap(); + assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_))); } @@ -779,9 +907,9 @@ impl MockPreCheckBackend { impl ValidationBackend for MockPreCheckBackend { async fn validate_candidate( &mut self, - _raw_validation_code: Vec<u8>, + _pvf: Pvf, _timeout: Duration, - _params: ValidationParams, + _encoded_params: Vec<u8>, ) -> Result<WasmValidationResult, ValidationError> { unreachable!() } diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index 9b240e02df17a02b900a5a5620b648200cec5265..b4c6a66b77199b105f9c362ce4324cce6c13df87 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -252,8 +252,8 @@ fn handle_job_finish( "execute worker concluded", ); - // First we send the result. It may fail due the other end of the channel being dropped, that's - // legitimate and we don't treat that as an error. + // First we send the result. It may fail due to the other end of the channel being dropped, + // that's legitimate and we don't treat that as an error. let _ = result_tx.send(result); // Then, we should deal with the worker: @@ -305,7 +305,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu Err(err) => { gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err); - // Assume that the failure intermittent and retry after a delay. + // Assume that the failure is intermittent and retry after a delay. Delay::new(Duration::from_secs(3)).await; }, }