Unverified Commit 0eb7905a authored by Sergey Pepyakin's avatar Sergey Pepyakin Committed by GitHub
Browse files

New PVF validation host (#2710)



* Implement PVF validation host

* WIP: Diener

* Increase the alloted compilation time

* Add more comments

* Minor clean up

* Apply suggestions from code review

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Fix pruning artifact removal

* Fix formatting and newlines

* Fix the thread pool

* Update node/core/pvf/src/executor_intf.rs

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Remove redundant test declaration

* Don't convert the path into an intermediate string

* Try to workaround the test failure

* Use the puppet_worker trick again

* Fix a blip

* Move `ensure_wasmtime_version` under the tests mod

* Add a macro for puppet_workers

* fix build for not real-overseer

* Rename the puppet worker for adder collator

* play it safe with the name of adder puppet worker

* Typo: triggered

* Add more comments

* Do not kill exec worker on every error

* Plumb Duration for timeouts

* typo: critical

* Add proofs

* Clean unused imports

* Revert "WIP: Diener"

This reverts commit ff2d3ff2

.

* Sync version of wasmtime

* Update cargo.lock

* Update Substrate

* Merge fixes still

* Update wasmtime version in test

* bastifmt

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Squash spaces

* Trailing new line for testing.rs

* Remove controversial code

* comment about biasing

* Fix suggestion

* Add comments

* make it more clear why unwrap_err

* tmpfile retry

* proper proofs for claim_idle

* Remove mutex from ValidationHost

* Add some more logging

* Extract exec timeout into a constant

* Add some clarifying logging

* Use blake2_256

* Clean up the merge

Specifically the leftovers after removing real-overseer

* Update parachain/test-parachains/adder/collator/Cargo.toml

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent e5bab572
Pipeline #133477 failed with stages
in 13 minutes and 43 seconds
......@@ -120,9 +120,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.34"
version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf8dcb5b4bbaa28653b647d8c77bd4ed40183b48882e130c1f1ffb73de069fd7"
checksum = "81cddc5f91628367664cc7c69714ff08deee8a3efc54623011c772544d7b2767"
[[package]]
name = "approx"
......@@ -204,6 +204,16 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-attributes"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "async-channel"
version = "1.5.1"
......@@ -293,6 +303,7 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f9f84f1280a2b436a2c77c2582602732b6c2f4321d5494d6e799e6c367859a8"
dependencies = [
"async-attributes",
"async-channel",
"async-global-executor",
"async-io",
......@@ -5579,7 +5590,7 @@ dependencies = [
"frame-benchmarking-cli",
"futures 0.3.13",
"log",
"polkadot-parachain",
"polkadot-node-core-pvf",
"polkadot-service",
"sc-cli",
"sc-service",
......@@ -5823,8 +5834,10 @@ name = "polkadot-node-core-candidate-validation"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"futures 0.3.13",
"parity-scale-codec",
"polkadot-node-core-pvf",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
......@@ -5893,6 +5906,38 @@ dependencies = [
"tracing",
]
[[package]]
name = "polkadot-node-core-pvf"
version = "0.1.0"
dependencies = [
"always-assert",
"assert_matches",
"async-process",
"async-std",
"futures 0.3.13",
"futures-timer 3.0.2",
"hex-literal",
"libc",
"parity-scale-codec",
"pin-project 1.0.4",
"polkadot-core-primitives",
"polkadot-parachain",
"rand 0.8.3",
"sc-executor",
"sc-executor-common",
"sc-executor-wasmtime",
"slotmap",
"sp-core",
"sp-externalities",
"sp-io",
"sp-wasm-interface",
"tempfile",
"test-parachain-adder",
"test-parachain-halt",
"tracing",
"wasmtime-jit",
]
[[package]]
name = "polkadot-node-core-runtime-api"
version = "0.1.0"
......@@ -6074,25 +6119,13 @@ name = "polkadot-parachain"
version = "0.8.30"
dependencies = [
"derive_more",
"futures 0.3.13",
"libc",
"log",
"parity-scale-codec",
"parity-util-mem",
"parking_lot 0.11.1",
"polkadot-core-primitives",
"raw_sync",
"sc-executor",
"serde",
"shared_memory",
"sp-core",
"sp-externalities",
"sp-io",
"sp-runtime",
"sp-std",
"sp-wasm-interface",
"static_assertions",
"thiserror",
]
[[package]]
......@@ -7031,19 +7064,6 @@ dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "raw_sync"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a34bde3561f980a51c70495164200569a11662644fe5af017f0b5d7015688cc"
dependencies = [
"cfg-if 0.1.10",
"libc",
"nix",
"rand 0.8.3",
"winapi 0.3.9",
]
[[package]]
name = "rawpointer"
version = "0.2.1"
......@@ -8704,20 +8724,6 @@ dependencies = [
"loom",
]
[[package]]
name = "shared_memory"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b854a362375dfe8ab12ea8a98228040d37293c988f85fbac9fa0f83336387966"
dependencies = [
"cfg-if 0.1.10",
"libc",
"nix",
"quick-error 2.0.0",
"rand 0.8.3",
"winapi 0.3.9",
]
[[package]]
name = "shlex"
version = "0.1.1"
......@@ -8778,6 +8784,15 @@ dependencies = [
"sp-std",
]
[[package]]
name = "slotmap"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab3003725ae562cf995f3dc82bb99e70926e09000396816765bb6d7adbe740b1"
dependencies = [
"version_check",
]
[[package]]
name = "smallvec"
version = "0.6.13"
......@@ -9868,6 +9883,7 @@ dependencies = [
"log",
"parity-scale-codec",
"polkadot-cli",
"polkadot-node-core-pvf",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-parachain",
......
......@@ -55,6 +55,7 @@ members = [
"node/core/chain-api",
"node/core/proposer",
"node/core/provisioner",
"node/core/pvf",
"node/core/runtime-api",
"node/network/approval-distribution",
"node/network/bridge",
......
......@@ -22,7 +22,7 @@ wasm-bindgen-futures = { version = "0.4.19", optional = true }
futures = "0.3.12"
service = { package = "polkadot-service", path = "../node/service", default-features = false, optional = true }
polkadot-parachain = { path = "../parachain", optional = true }
polkadot-node-core-pvf = { path = "../node/core/pvf", optional = true }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
frame-benchmarking-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
......@@ -39,8 +39,8 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master",
substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
[features]
default = [ "wasmtime", "db", "cli", "full-node", "trie-memory-tracker", "polkadot-parachain" ]
wasmtime = [ "sc-cli/wasmtime", "polkadot-parachain/wasmtime" ]
default = [ "wasmtime", "db", "cli", "full-node", "trie-memory-tracker" ]
wasmtime = [ "sc-cli/wasmtime" ]
db = [ "service/db" ]
cli = [
"structopt",
......@@ -48,6 +48,7 @@ cli = [
"sc-service",
"frame-benchmarking-cli",
"try-runtime-cli",
"polkadot-node-core-pvf",
]
browser = [
"wasm-bindgen",
......
......@@ -43,8 +43,12 @@ pub enum Subcommand {
Revert(sc_cli::RevertCmd),
#[allow(missing_docs)]
#[structopt(name = "validation-worker", setting = structopt::clap::AppSettings::Hidden)]
ValidationWorker(ValidationWorkerCommand),
#[structopt(name = "prepare-worker", setting = structopt::clap::AppSettings::Hidden)]
PvfPrepareWorker(ValidationWorkerCommand),
#[allow(missing_docs)]
#[structopt(name = "execute-worker", setting = structopt::clap::AppSettings::Hidden)]
PvfExecuteWorker(ValidationWorkerCommand),
/// The custom benchmark subcommand benchmarking runtime pallets.
#[structopt(
......@@ -64,11 +68,8 @@ pub enum Subcommand {
#[allow(missing_docs)]
#[derive(Debug, StructOpt)]
pub struct ValidationWorkerCommand {
/// The path that the executor can use for its caching purposes.
pub cache_base_path: std::path::PathBuf,
#[allow(missing_docs)]
pub mem_id: String,
/// The path to the validation host's socket.
pub socket_path: String,
}
#[allow(missing_docs)]
......
......@@ -256,19 +256,39 @@ pub fn run() -> Result<()> {
Ok((cmd.run(client, backend).map_err(Error::SubstrateCli), task_manager))
})?)
},
Some(Subcommand::ValidationWorker(cmd)) => {
Some(Subcommand::PvfPrepareWorker(cmd)) => {
let mut builder = sc_cli::LoggerBuilder::new("");
builder.with_colors(false);
let _ = builder.init();
if cfg!(feature = "browser") || cfg!(target_os = "android") {
Err(sc_cli::Error::Input("Cannot run validation worker in browser".into()).into())
} else {
#[cfg(not(any(target_os = "android", feature = "browser")))]
polkadot_parachain::wasm_executor::run_worker(
&cmd.mem_id,
Some(cmd.cache_base_path.clone()),
)?;
#[cfg(any(target_os = "android", feature = "browser"))]
{
return Err(
sc_cli::Error::Input("PVF preparation workers are not supported under this platform".into()).into()
);
}
#[cfg(not(any(target_os = "android", feature = "browser")))]
{
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path);
Ok(())
}
},
Some(Subcommand::PvfExecuteWorker(cmd)) => {
let mut builder = sc_cli::LoggerBuilder::new("");
builder.with_colors(false);
let _ = builder.init();
#[cfg(any(target_os = "android", feature = "browser"))]
{
return Err(
sc_cli::Error::Input("PVF execution workers are not supported under this platform".into()).into()
);
}
#[cfg(not(any(target_os = "android", feature = "browser")))]
{
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path);
Ok(())
}
},
......
......@@ -5,10 +5,10 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
async-trait = "0.1.42"
futures = "0.3.12"
tracing = "0.1.25"
sp-core = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" }
sp-maybe-compressed-blob = { package = "sp-maybe-compressed-blob", git = "https://github.com/paritytech/substrate", branch = "master" }
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["bit-vec", "derive"] }
......@@ -18,8 +18,12 @@ polkadot-node-primitives = { path = "../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
[target.'cfg(not(any(target_os = "android", target_os = "unknown")))'.dependencies]
polkadot-node-core-pvf = { path = "../pvf" }
[dev-dependencies]
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.12", features = ["thread-pool"] }
assert_matches = "1.4.0"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
......@@ -40,44 +40,51 @@ use polkadot_primitives::v1::{
ValidationCode, CandidateDescriptor, PersistedValidationData,
OccupiedCoreAssumption, Hash, CandidateCommitments,
};
use polkadot_parachain::wasm_executor::{
self, IsolationStrategy, ValidationError, InvalidCandidate as WasmInvalidCandidate
};
use polkadot_parachain::primitives::{ValidationResult as WasmValidationResult, ValidationParams};
use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult};
use polkadot_node_core_pvf::{Pvf, ValidationHost, ValidationError, InvalidCandidate as WasmInvalidCandidate};
use parity_scale_codec::Encode;
use sp_core::traits::SpawnNamed;
use futures::channel::oneshot;
use futures::prelude::*;
use std::sync::Arc;
use std::path::PathBuf;
use async_trait::async_trait;
const LOG_TARGET: &'static str = "parachain::candidate-validation";
/// Configuration for the candidate validation subsystem
pub struct Config {
/// The path where candidate validation can store compiled artifacts for PVFs.
pub artifacts_cache_path: PathBuf,
/// The path to the executable which can be used for spawning PVF compilation & validation
/// workers.
pub program_path: PathBuf,
}
/// The candidate validation subsystem.
pub struct CandidateValidationSubsystem<S> {
spawn: S,
pub struct CandidateValidationSubsystem {
metrics: Metrics,
isolation_strategy: IsolationStrategy,
config: Config,
}
impl<S> CandidateValidationSubsystem<S> {
impl CandidateValidationSubsystem {
/// Create a new `CandidateValidationSubsystem` with the given task spawner and isolation
/// strategy.
///
/// Check out [`IsolationStrategy`] to get more details.
pub fn new(spawn: S, metrics: Metrics, isolation_strategy: IsolationStrategy) -> Self {
CandidateValidationSubsystem { spawn, metrics, isolation_strategy }
pub fn with_config(config: Config, metrics: Metrics) -> Self {
CandidateValidationSubsystem { config, metrics, }
}
}
impl<S, C> Subsystem<C> for CandidateValidationSubsystem<S> where
impl<C> Subsystem<C> for CandidateValidationSubsystem where
C: SubsystemContext<Message = CandidateValidationMessage>,
S: SpawnNamed + Clone + 'static,
{
fn start(self, ctx: C) -> SpawnedSubsystem {
let future = run(ctx, self.spawn, self.metrics, self.isolation_strategy)
let future = run(ctx, self.metrics, self.config.artifacts_cache_path, self.config.program_path)
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
.boxed();
SpawnedSubsystem {
......@@ -87,13 +94,18 @@ impl<S, C> Subsystem<C> for CandidateValidationSubsystem<S> where
}
}
#[tracing::instrument(skip(ctx, spawn, metrics), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn run(
mut ctx: impl SubsystemContext<Message = CandidateValidationMessage>,
spawn: impl SpawnNamed + Clone + 'static,
metrics: Metrics,
isolation_strategy: IsolationStrategy,
cache_path: PathBuf,
program_path: PathBuf,
) -> SubsystemResult<()> {
let (mut validation_host, task) = polkadot_node_core_pvf::start(
polkadot_node_core_pvf::Config::new(cache_path, program_path),
);
ctx.spawn_blocking("pvf-validation-host", task.boxed()).await?;
loop {
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}
......@@ -109,10 +121,9 @@ async fn run(
let res = spawn_validate_from_chain_state(
&mut ctx,
isolation_strategy.clone(),
&mut validation_host,
descriptor,
pov,
spawn.clone(),
&metrics,
).await;
......@@ -133,14 +144,12 @@ async fn run(
) => {
let _timer = metrics.time_validate_from_exhaustive();
let res = spawn_validate_exhaustive(
&mut ctx,
isolation_strategy.clone(),
let res = validate_candidate_exhaustive(
&mut validation_host,
persisted_validation_data,
validation_code,
descriptor,
pov,
spawn.clone(),
&metrics,
).await;
......@@ -268,13 +277,16 @@ async fn find_assumed_validation_data(
Ok(AssumptionCheckOutcome::DoesNotMatch)
}
#[tracing::instrument(level = "trace", skip(ctx, pov, spawn, metrics), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(
level = "trace",
skip(ctx, validation_host, pov, metrics),
fields(subsystem = LOG_TARGET),
)]
async fn spawn_validate_from_chain_state(
ctx: &mut impl SubsystemContext<Message = CandidateValidationMessage>,
isolation_strategy: IsolationStrategy,
validation_host: &mut ValidationHost,
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
spawn: impl SpawnNamed + 'static,
metrics: &Metrics,
) -> SubsystemResult<Result<ValidationResult, ValidationFailed>> {
let (validation_data, validation_code) =
......@@ -293,14 +305,12 @@ async fn spawn_validate_from_chain_state(
}
};
let validation_result = spawn_validate_exhaustive(
ctx,
isolation_strategy,
let validation_result = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
descriptor.clone(),
pov,
spawn,
metrics,
)
.await;
......@@ -330,113 +340,19 @@ async fn spawn_validate_from_chain_state(
validation_result
}
#[tracing::instrument(level = "trace", skip(ctx, validation_code, pov, spawn, metrics), fields(subsystem = LOG_TARGET))]
async fn spawn_validate_exhaustive(
ctx: &mut impl SubsystemContext<Message = CandidateValidationMessage>,
isolation_strategy: IsolationStrategy,
#[tracing::instrument(
level = "trace",
skip(validation_backend, validation_code, pov, metrics),
fields(subsystem = LOG_TARGET),
)]
async fn validate_candidate_exhaustive(
mut validation_backend: impl ValidationBackend,
persisted_validation_data: PersistedValidationData,
validation_code: ValidationCode,
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
spawn: impl SpawnNamed + 'static,
metrics: &Metrics,
) -> SubsystemResult<Result<ValidationResult, ValidationFailed>> {
let (tx, rx) = oneshot::channel();
let metrics = metrics.clone();
let fut = async move {
let res = validate_candidate_exhaustive::<RealValidationBackend, _>(
isolation_strategy,
persisted_validation_data,
validation_code,
descriptor,
pov,
spawn,
&metrics,
);
let _ = tx.send(res);
};
ctx.spawn_blocking("blocking-candidate-validation-task", fut.boxed()).await?;
rx.await.map_err(Into::into)
}
/// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks
/// are passed, `Err` otherwise.
#[tracing::instrument(level = "trace", skip(pov, validation_code), fields(subsystem = LOG_TARGET))]
fn perform_basic_checks(
candidate: &CandidateDescriptor,
max_pov_size: u32,
pov: &PoV,
validation_code: &ValidationCode,
) -> Result<(), InvalidCandidate> {
let pov_hash = pov.hash();
let validation_code_hash = validation_code.hash();
let encoded_pov_size = pov.encoded_size();
if encoded_pov_size > max_pov_size as usize {
return Err(InvalidCandidate::ParamsTooLarge(encoded_pov_size as u64));
}
if pov_hash != candidate.pov_hash {
return Err(InvalidCandidate::PoVHashMismatch);
}
if validation_code_hash != candidate.validation_code_hash {
return Err(InvalidCandidate::CodeHashMismatch);
}
if let Err(()) = candidate.check_collator_signature() {
return Err(InvalidCandidate::BadSignature);
}
Ok(())
}
trait ValidationBackend {
type Arg;
fn validate<S: SpawnNamed + 'static>(
arg: Self::Arg,
raw_validation_code: &[u8],
params: ValidationParams,
spawn: S,
) -> Result<WasmValidationResult, ValidationError>;
}
struct RealValidationBackend;
impl ValidationBackend for RealValidationBackend {
type Arg = IsolationStrategy;
fn validate<S: SpawnNamed + 'static>(
isolation_strategy: IsolationStrategy,
raw_validation_code: &[u8],
params: ValidationParams,
spawn: S,
) -> Result<WasmValidationResult, ValidationError> {
wasm_executor::validate_candidate(
&raw_validation_code,
params,
&isolation_strategy,
spawn,
)
}
}
/// Validates the candidate from exhaustive parameters.
///
/// Sends the result of validation on the channel once complete.
#[tracing::instrument(level = "trace", skip(backend_arg, validation_code, pov, spawn, metrics), fields(subsystem = LOG_TARGET))]
fn validate_candidate_exhaustive<B: ValidationBackend, S: SpawnNamed + 'static>(
backend_arg: B::Arg,
persisted_validation_data: PersistedValidationData,
validation_code: ValidationCode,
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
spawn: S,
metrics: &Metrics,
) -> Result<ValidationResult, ValidationFailed> {
let _timer = metrics.time_validate_candidate_exhaustive();
if let Err(e) = perform_basic_checks(
......@@ -445,7 +361,7 @@ fn validate_candidate_exhaustive<B: ValidationBackend, S: SpawnNamed + 'static>(
&*pov,
&validation_code,
) {
return Ok(ValidationResult::Invalid(e))
return Ok(Ok(ValidationResult::Invalid(e)));
}
let raw_validation_code = match sp_maybe_compressed_blob::decompress(
......@@ -457,7 +373,7 @@ fn validate_candidate_exhaustive<B: ValidationBackend, S: SpawnNamed + 'static>(
tracing::debug!(target: LOG_TARGET, err=?e, "Invalid validation code");
// If the validation code is invalid, the candidate certainly is.
return Ok(ValidationResult::Invalid(InvalidCandidate::CodeDecompressionFailure));
return Ok(Ok(ValidationResult::Invalid(InvalidCandidate::CodeDecompressionFailure)));
}
};
......@@ -470,7 +386,7 @@ fn validate_candidate_exhaustive<B: ValidationBackend, S: SpawnNamed + 'static>(
tracing::debug!(target: LOG_TARGET, err=?e, "Invalid PoV code");
// If the PoV is invalid, the candidate certainly is.
return Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure));
return Ok(Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)));
}