// Copyright 2020-2021 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! The Candidate Validation subsystem. //! //! This handles incoming requests from other subsystems to validate candidates //! according to a validation function. This delegates validation to an underlying //! pool of processes used for execution of the Wasm. #![deny(unused_crate_dependencies, unused_results)] #![warn(missing_docs)] use polkadot_node_core_pvf::{ InvalidCandidate as WasmInvalidCandidate, Pvf, ValidationError, ValidationHost, }; use polkadot_node_primitives::{ BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, }; use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::{ CandidateValidationMessage, RuntimeApiMessage, RuntimeApiRequest, ValidationFailed, }, overseer, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult}; use polkadot_primitives::v1::{ CandidateCommitments, CandidateDescriptor, Hash, OccupiedCoreAssumption, PersistedValidationData, ValidationCode, ValidationCodeHash, }; use parity_scale_codec::Encode; use futures::{channel::oneshot, prelude::*}; use std::{path::PathBuf, sync::Arc}; use async_trait::async_trait; #[cfg(test)] mod tests; const LOG_TARGET: &'static str = "parachain::candidate-validation"; /// Configuration for the candidate validation subsystem #[derive(Clone)] pub struct Config { /// The path where candidate validation can store compiled artifacts for PVFs. pub artifacts_cache_path: PathBuf, /// The path to the executable which can be used for spawning PVF compilation & validation /// workers. pub program_path: PathBuf, } /// The candidate validation subsystem. pub struct CandidateValidationSubsystem { metrics: Metrics, pvf_metrics: polkadot_node_core_pvf::Metrics, config: Config, } impl CandidateValidationSubsystem { /// Create a new `CandidateValidationSubsystem` with the given task spawner and isolation /// strategy. /// /// Check out [`IsolationStrategy`] to get more details. pub fn with_config( config: Config, metrics: Metrics, pvf_metrics: polkadot_node_core_pvf::Metrics, ) -> Self { CandidateValidationSubsystem { config, metrics, pvf_metrics } } } impl overseer::Subsystem for CandidateValidationSubsystem where Context: SubsystemContext, Context: overseer::SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { let future = run( ctx, self.metrics, self.pvf_metrics, self.config.artifacts_cache_path, self.config.program_path, ) .map_err(|e| SubsystemError::with_origin("candidate-validation", e)) .boxed(); SpawnedSubsystem { name: "candidate-validation-subsystem", future } } } async fn run( mut ctx: Context, metrics: Metrics, pvf_metrics: polkadot_node_core_pvf::Metrics, cache_path: PathBuf, program_path: PathBuf, ) -> SubsystemResult<()> where Context: SubsystemContext, Context: overseer::SubsystemContext, { let (mut validation_host, task) = polkadot_node_core_pvf::start( polkadot_node_core_pvf::Config::new(cache_path, program_path), pvf_metrics, ); ctx.spawn_blocking("pvf-validation-host", task.boxed())?; loop { match ctx.recv().await? { FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}, FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}, FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), FromOverseer::Communication { msg } => match msg { CandidateValidationMessage::ValidateFromChainState( descriptor, pov, response_sender, ) => { let _timer = metrics.time_validate_from_chain_state(); let res = spawn_validate_from_chain_state( &mut ctx, &mut validation_host, descriptor, pov, &metrics, ) .await; match res { Ok(x) => { metrics.on_validation_event(&x); let _ = response_sender.send(x); }, Err(e) => return Err(e), } }, CandidateValidationMessage::ValidateFromExhaustive( persisted_validation_data, validation_code, descriptor, pov, response_sender, ) => { let _timer = metrics.time_validate_from_exhaustive(); let res = validate_candidate_exhaustive( &mut validation_host, persisted_validation_data, validation_code, descriptor, pov, &metrics, ) .await; match res { Ok(x) => { metrics.on_validation_event(&x); if let Err(_e) = response_sender.send(x) { tracing::warn!( target: LOG_TARGET, "Requester of candidate validation dropped", ) } }, Err(e) => return Err(e), } }, }, } } } async fn runtime_api_request( ctx: &mut Context, relay_parent: Hash, request: RuntimeApiRequest, receiver: oneshot::Receiver>, ) -> SubsystemResult> where Context: SubsystemContext, Context: overseer::SubsystemContext, { ctx.send_message(RuntimeApiMessage::Request(relay_parent, request)).await; receiver.await.map_err(Into::into) } #[derive(Debug)] enum AssumptionCheckOutcome { Matches(PersistedValidationData, ValidationCode), DoesNotMatch, BadRequest, } async fn check_assumption_validation_data( ctx: &mut Context, descriptor: &CandidateDescriptor, assumption: OccupiedCoreAssumption, ) -> SubsystemResult where Context: SubsystemContext, Context: overseer::SubsystemContext, { let validation_data = { let (tx, rx) = oneshot::channel(); let d = runtime_api_request( ctx, descriptor.relay_parent, RuntimeApiRequest::PersistedValidationData(descriptor.para_id, assumption, tx), rx, ) .await?; match d { Ok(None) | Err(_) => return Ok(AssumptionCheckOutcome::BadRequest), Ok(Some(d)) => d, } }; let persisted_validation_data_hash = validation_data.hash(); SubsystemResult::Ok( if descriptor.persisted_validation_data_hash == persisted_validation_data_hash { let (code_tx, code_rx) = oneshot::channel(); let validation_code = runtime_api_request( ctx, descriptor.relay_parent, RuntimeApiRequest::ValidationCode(descriptor.para_id, assumption, code_tx), code_rx, ) .await?; match validation_code { Ok(None) | Err(_) => AssumptionCheckOutcome::BadRequest, Ok(Some(v)) => AssumptionCheckOutcome::Matches(validation_data, v), } } else { AssumptionCheckOutcome::DoesNotMatch }, ) } async fn find_assumed_validation_data( ctx: &mut Context, descriptor: &CandidateDescriptor, ) -> SubsystemResult where Context: SubsystemContext, Context: overseer::SubsystemContext, { // The candidate descriptor has a `persisted_validation_data_hash` which corresponds to // one of up to two possible values that we can derive from the state of the // relay-parent. We can fetch these values by getting the persisted validation data // based on the different `OccupiedCoreAssumption`s. const ASSUMPTIONS: &[OccupiedCoreAssumption] = &[ OccupiedCoreAssumption::Included, OccupiedCoreAssumption::TimedOut, // `TimedOut` and `Free` both don't perform any speculation and therefore should be the same // for our purposes here. In other words, if `TimedOut` matched then the `Free` must be // matched as well. ]; // Consider running these checks in parallel to reduce validation latency. for assumption in ASSUMPTIONS { let outcome = check_assumption_validation_data(ctx, descriptor, *assumption).await?; match outcome { AssumptionCheckOutcome::Matches(_, _) => return Ok(outcome), AssumptionCheckOutcome::BadRequest => return Ok(outcome), AssumptionCheckOutcome::DoesNotMatch => continue, } } Ok(AssumptionCheckOutcome::DoesNotMatch) } async fn spawn_validate_from_chain_state( ctx: &mut Context, validation_host: &mut ValidationHost, descriptor: CandidateDescriptor, pov: Arc, metrics: &Metrics, ) -> SubsystemResult> where Context: SubsystemContext, Context: overseer::SubsystemContext, { let (validation_data, validation_code) = match find_assumed_validation_data(ctx, &descriptor).await? { AssumptionCheckOutcome::Matches(validation_data, validation_code) => (validation_data, validation_code), AssumptionCheckOutcome::DoesNotMatch => { // If neither the assumption of the occupied core having the para included or the assumption // of the occupied core timing out are valid, then the persisted_validation_data_hash in the descriptor // is not based on the relay parent and is thus invalid. return Ok(Ok(ValidationResult::Invalid(InvalidCandidate::BadParent))) }, AssumptionCheckOutcome::BadRequest => return Ok(Err(ValidationFailed("Assumption Check: Bad request".into()))), }; let validation_result = validate_candidate_exhaustive( validation_host, validation_data, validation_code, descriptor.clone(), pov, metrics, ) .await; if let Ok(Ok(ValidationResult::Valid(ref outputs, _))) = validation_result { let (tx, rx) = oneshot::channel(); match runtime_api_request( ctx, descriptor.relay_parent, RuntimeApiRequest::CheckValidationOutputs(descriptor.para_id, outputs.clone(), tx), rx, ) .await? { Ok(true) => {}, Ok(false) => return Ok(Ok(ValidationResult::Invalid(InvalidCandidate::InvalidOutputs))), Err(_) => return Ok(Err(ValidationFailed("Check Validation Outputs: Bad request".into()))), } } validation_result } async fn validate_candidate_exhaustive( mut validation_backend: impl ValidationBackend, persisted_validation_data: PersistedValidationData, validation_code: ValidationCode, descriptor: CandidateDescriptor, pov: Arc, metrics: &Metrics, ) -> SubsystemResult> { let _timer = metrics.time_validate_candidate_exhaustive(); let validation_code_hash = validation_code.hash(); tracing::debug!( target: LOG_TARGET, ?validation_code_hash, para_id = ?descriptor.para_id, "About to validate a candidate.", ); if let Err(e) = perform_basic_checks( &descriptor, persisted_validation_data.max_pov_size, &*pov, &validation_code_hash, ) { return Ok(Ok(ValidationResult::Invalid(e))) } let raw_validation_code = match sp_maybe_compressed_blob::decompress( &validation_code.0, VALIDATION_CODE_BOMB_LIMIT, ) { Ok(code) => code, Err(e) => { tracing::debug!(target: LOG_TARGET, err=?e, "Invalid validation code"); // If the validation code is invalid, the candidate certainly is. return Ok(Ok(ValidationResult::Invalid(InvalidCandidate::CodeDecompressionFailure))) }, }; let raw_block_data = match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) { Ok(block_data) => BlockData(block_data.to_vec()), Err(e) => { tracing::debug!(target: LOG_TARGET, err=?e, "Invalid PoV code"); // If the PoV is invalid, the candidate certainly is. return Ok(Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))) }, }; let params = ValidationParams { parent_head: persisted_validation_data.parent_head.clone(), block_data: raw_block_data, relay_parent_number: persisted_validation_data.relay_parent_number, relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root, }; let result = validation_backend .validate_candidate(raw_validation_code.to_vec(), params) .await; if let Err(ref e) = result { tracing::debug!( target: LOG_TARGET, error = ?e, "Failed to validate candidate", ); } let result = match result { Err(ValidationError::InternalError(e)) => Err(ValidationFailed(e)), Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout)) => Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)), Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedError(e))) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))), Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbigiousWorkerDeath)) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError( "ambigious worker death".to_string(), ))), Ok(res) => if res.head_data.hash() != descriptor.para_head { Ok(ValidationResult::Invalid(InvalidCandidate::ParaHeadHashMismatch)) } else { let outputs = CandidateCommitments { head_data: res.head_data, upward_messages: res.upward_messages, horizontal_messages: res.horizontal_messages, new_validation_code: res.new_validation_code, processed_downward_messages: res.processed_downward_messages, hrmp_watermark: res.hrmp_watermark, }; Ok(ValidationResult::Valid(outputs, persisted_validation_data)) }, }; Ok(result) } #[async_trait] trait ValidationBackend { async fn validate_candidate( &mut self, raw_validation_code: Vec, params: ValidationParams, ) -> Result; } #[async_trait] impl ValidationBackend for &'_ mut ValidationHost { async fn validate_candidate( &mut self, raw_validation_code: Vec, params: ValidationParams, ) -> Result { let (tx, rx) = oneshot::channel(); if let Err(err) = self .execute_pvf( Pvf::from_code(raw_validation_code), params.encode(), polkadot_node_core_pvf::Priority::Normal, tx, ) .await { return Err(ValidationError::InternalError(format!( "cannot send pvf to the validation host: {:?}", err ))) } let validation_result = rx .await .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?; validation_result } } /// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks /// are passed, `Err` otherwise. fn perform_basic_checks( candidate: &CandidateDescriptor, max_pov_size: u32, pov: &PoV, validation_code_hash: &ValidationCodeHash, ) -> Result<(), InvalidCandidate> { let pov_hash = pov.hash(); let encoded_pov_size = pov.encoded_size(); if encoded_pov_size > max_pov_size as usize { return Err(InvalidCandidate::ParamsTooLarge(encoded_pov_size as u64)) } if pov_hash != candidate.pov_hash { return Err(InvalidCandidate::PoVHashMismatch) } if *validation_code_hash != candidate.validation_code_hash { return Err(InvalidCandidate::CodeHashMismatch) } if let Err(()) = candidate.check_collator_signature() { return Err(InvalidCandidate::BadSignature) } Ok(()) } #[derive(Clone)] struct MetricsInner { validation_requests: prometheus::CounterVec, validate_from_chain_state: prometheus::Histogram, validate_from_exhaustive: prometheus::Histogram, validate_candidate_exhaustive: prometheus::Histogram, } /// Candidate validation metrics. #[derive(Default, Clone)] pub struct Metrics(Option); impl Metrics { fn on_validation_event(&self, event: &Result) { if let Some(metrics) = &self.0 { match event { Ok(ValidationResult::Valid(_, _)) => { metrics.validation_requests.with_label_values(&["valid"]).inc(); }, Ok(ValidationResult::Invalid(_)) => { metrics.validation_requests.with_label_values(&["invalid"]).inc(); }, Err(_) => { metrics.validation_requests.with_label_values(&["validation failure"]).inc(); }, } } } /// Provide a timer for `validate_from_chain_state` which observes on drop. fn time_validate_from_chain_state( &self, ) -> Option { self.0.as_ref().map(|metrics| metrics.validate_from_chain_state.start_timer()) } /// Provide a timer for `validate_from_exhaustive` which observes on drop. fn time_validate_from_exhaustive( &self, ) -> Option { self.0.as_ref().map(|metrics| metrics.validate_from_exhaustive.start_timer()) } /// Provide a timer for `validate_candidate_exhaustive` which observes on drop. fn time_validate_candidate_exhaustive( &self, ) -> Option { self.0 .as_ref() .map(|metrics| metrics.validate_candidate_exhaustive.start_timer()) } } impl metrics::Metrics for Metrics { fn try_register(registry: &prometheus::Registry) -> Result { let metrics = MetricsInner { validation_requests: prometheus::register( prometheus::CounterVec::new( prometheus::Opts::new( "parachain_validation_requests_total", "Number of validation requests served.", ), &["validity"], )?, registry, )?, validate_from_chain_state: prometheus::register( prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( "parachain_candidate_validation_validate_from_chain_state", "Time spent within `candidate_validation::validate_from_chain_state`", ))?, registry, )?, validate_from_exhaustive: prometheus::register( prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( "parachain_candidate_validation_validate_from_exhaustive", "Time spent within `candidate_validation::validate_from_exhaustive`", ))?, registry, )?, validate_candidate_exhaustive: prometheus::register( prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( "parachain_candidate_validation_validate_candidate_exhaustive", "Time spent within `candidate_validation::validate_candidate_exhaustive`", ))?, registry, )?, }; Ok(Metrics(Some(metrics))) } }