Commit 79990c9e authored by Arkadiy Paronyan's avatar Arkadiy Paronyan Committed by asynchronous rob
Browse files

Allow parallel parachain validation (#370)



* Allow parallel parachain validation

* Fixed test interference

* Switch to pooled implementation

* Apply suggestions from code review
Co-Authored-By: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Update validation_host.rs

* Minor cleanups

* Fixed build
parent 00769242
Pipeline #50064 passed with stages
in 19 minutes and 42 seconds
......@@ -27,10 +27,13 @@ use wasmi::{
ModuleImportResolver, RuntimeValue, Externals, Error as WasmError, ValueType,
memory_units::{self, Bytes, Pages, RoundUpTo}
};
use super::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef, UpwardMessage, IncomingMessage};
use super::{
ValidationParams, ValidationResult, MessageRef, UpwardMessageRef,
UpwardMessage, IncomingMessage};
#[cfg(not(target_os = "unknown"))]
pub use validation_host::run_worker;
pub use validation_host::EXECUTION_TIMEOUT_SEC;
mod validation_host;
......@@ -336,10 +339,10 @@ pub fn validate_candidate<E: Externalities>(
},
#[cfg(not(target_os = "unknown"))]
ExecutionMode::Remote =>
validation_host::HOST.lock().validate_candidate(validation_code, params, externalities, false),
validation_host::validate_candidate(validation_code, params, externalities, false),
#[cfg(not(target_os = "unknown"))]
ExecutionMode::RemoteTest =>
validation_host::HOST.lock().validate_candidate(validation_code, params, externalities, true),
validation_host::validate_candidate(validation_code, params, externalities, true),
#[cfg(target_os = "unknown")]
ExecutionMode::Remote =>
Err(Error::System("Remote validator not available".to_string().into())),
......
......@@ -18,7 +18,8 @@
use std::{process, env, sync::Arc, sync::atomic};
use crate::codec::{Decode, Encode};
use crate::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef, UpwardMessage, IncomingMessage};
use crate::{ValidationParams, ValidationResult, MessageRef,
UpwardMessageRef, UpwardMessage, IncomingMessage};
use super::{validate_candidate_internal, Error, Externalities, WorkerExternalities};
use super::{MAX_CODE_MEM, MAX_RUNTIME_MEM};
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
......@@ -33,6 +34,11 @@ const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
const NUM_HOSTS: usize = 8;
/// Execution timeout in seconds;
pub const EXECUTION_TIMEOUT_SEC: u64 = 5;
enum Event {
CandidateReady = 0,
ResultReady = 1,
......@@ -40,7 +46,7 @@ enum Event {
}
lazy_static::lazy_static! {
pub static ref HOST: Mutex<ValidationHost> = Mutex::new(ValidationHost::new());
static ref HOSTS: [Mutex<ValidationHost>; NUM_HOSTS] = Default::default();
}
/// Validation worker process entry point. Runs a loop waiting for canidates to validate
......@@ -154,12 +160,32 @@ pub enum ValidationResultHeader {
unsafe impl Send for ValidationHost {}
pub struct ValidationHost {
#[derive(Default)]
struct ValidationHost {
worker: Option<process::Child>,
memory: Option<SharedMem>,
}
/// Validate a candidate under the given validation code.
///
/// This will fail if the validation code is not a proper parachain validation module.
pub fn validate_candidate<E: Externalities>(
validation_code: &[u8],
params: ValidationParams,
externalities: &mut E,
test_mode: bool,
) -> Result<ValidationResult, Error> {
for host in HOSTS.iter() {
if let Some(mut host) = host.try_lock() {
return host.validate_candidate(validation_code, params, externalities, test_mode);
}
}
// all workers are busy, just wait for the first one
HOSTS[0].lock().validate_candidate(validation_code, params, externalities, test_mode)
}
impl Drop for ValidationHost {
fn drop(&mut self) {
if let Some(ref mut worker) = &mut self.worker {
......@@ -176,18 +202,11 @@ impl ValidationHost {
.add_lock(shared_memory::LockType::Mutex, 0, mem_size)?
.add_event(shared_memory::EventType::Auto)? // Event::CandidateReady
.add_event(shared_memory::EventType::Auto)? // Event::ResultReady
.add_event(shared_memory::EventType::Auto)?; // Evebt::WorkerReady
.add_event(shared_memory::EventType::Auto)?; // Event::WorkerReady
Ok(mem_config.create()?)
}
fn new() -> ValidationHost {
ValidationHost {
worker: None,
memory: None,
}
}
fn start_worker(&mut self, test_mode: bool) -> Result<(), Error> {
if let Some(ref mut worker) = self.worker {
// Check if still alive
......@@ -207,7 +226,7 @@ impl ValidationHost {
.spawn()?;
self.worker = Some(worker);
memory.wait(Event::WorkerReady as usize, shared_memory::Timeout::Sec(5))?;
memory.wait(Event::WorkerReady as usize, shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize))?;
self.memory = Some(memory);
Ok(())
}
......@@ -221,8 +240,7 @@ impl ValidationHost {
params: ValidationParams,
externalities: &mut E,
test_mode: bool,
) -> Result<ValidationResult, Error>
{
) -> Result<ValidationResult, Error> {
if validation_code.len() > MAX_CODE_MEM {
return Err(Error::CodeTooLarge(validation_code.len()));
}
......
......@@ -18,7 +18,7 @@
use polkadot_parachain as parachain;
use crate::{adder, DummyExt};
use crate::parachain::ValidationParams;
use crate::parachain::{ValidationParams, wasm_executor::EXECUTION_TIMEOUT_SEC};
// Code that exposes `validate_block` and loops infinitely
const INFINITE_LOOP_CODE: &[u8] = halt::WASM_BINARY;
......@@ -43,3 +43,35 @@ fn terminates_on_timeout() {
// check that another parachain can validate normaly
adder::execute_good_on_parent();
}
#[test]
fn parallel_execution() {
let start = std::time::Instant::now();
let thread = std::thread::spawn(move ||
parachain::wasm_executor::validate_candidate(
INFINITE_LOOP_CODE,
ValidationParams {
parent_head: Default::default(),
block_data: Vec::new(),
ingress: Vec::new(),
},
&mut DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest,
).ok());
let _ = parachain::wasm_executor::validate_candidate(
INFINITE_LOOP_CODE,
ValidationParams {
parent_head: Default::default(),
block_data: Vec::new(),
ingress: Vec::new(),
},
&mut DummyExt,
parachain::wasm_executor::ExecutionMode::RemoteTest,
);
thread.join().unwrap();
// total time should be < 2 x EXECUTION_TIMEOUT_SEC
assert!(
std::time::Instant::now().duration_since(start)
< std::time::Duration::from_secs(EXECUTION_TIMEOUT_SEC * 2)
);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment