Unverified Commit 61dd388b authored by Cecile Tonglet's avatar Cecile Tonglet Committed by GitHub
Browse files

Make ValidationPool accepts execution mode to run custom command or in process validation (#1622)

* Initial commit

Forked at: bf7ccb84
Parent branch: origin/master

* Propagate test mode all the way down to ValidationPool

* Update validation/src/validation_service/mod.rs

* Fix test

* WIP

Forked at: bf7ccb84


Parent branch: origin/master

* Update service/src/lib.rs

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Adapt code to review suggestions

* Run validation inside the same process

* Add test

* CLEANUP

Forked at: bf7ccb84


Parent branch: origin/master

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
parent c94bcca6
Pipeline #106032 passed with stages
in 21 minutes and 5 seconds
...@@ -35,8 +35,10 @@ use polkadot_primitives::v1::{ ...@@ -35,8 +35,10 @@ use polkadot_primitives::v1::{
ValidationCode, PoV, CandidateDescriptor, ValidationData, PersistedValidationData, ValidationCode, PoV, CandidateDescriptor, ValidationData, PersistedValidationData,
TransientValidationData, OccupiedCoreAssumption, Hash, TransientValidationData, OccupiedCoreAssumption, Hash,
}; };
use polkadot_parachain::wasm_executor::{self, ValidationPool, ExecutionMode, ValidationError, use polkadot_parachain::wasm_executor::{
InvalidCandidate as WasmInvalidCandidate}; self, ValidationPool, ExecutionMode, ValidationError,
InvalidCandidate as WasmInvalidCandidate, ValidationExecutionMode,
};
use polkadot_parachain::primitives::{ValidationResult as WasmValidationResult, ValidationParams}; use polkadot_parachain::primitives::{ValidationResult as WasmValidationResult, ValidationParams};
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
...@@ -128,7 +130,7 @@ async fn run( ...@@ -128,7 +130,7 @@ async fn run(
) )
-> SubsystemResult<()> -> SubsystemResult<()>
{ {
let pool = ValidationPool::new(); let pool = ValidationPool::new(ValidationExecutionMode::ExternalProcessSelfHost);
loop { loop {
match ctx.recv().await? { match ctx.recv().await? {
......
...@@ -28,7 +28,7 @@ use sp_externalities::Extensions; ...@@ -28,7 +28,7 @@ use sp_externalities::Extensions;
use sp_wasm_interface::HostFunctions as _; use sp_wasm_interface::HostFunctions as _;
#[cfg(not(any(target_os = "android", target_os = "unknown")))] #[cfg(not(any(target_os = "android", target_os = "unknown")))]
pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC}; pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode};
mod validation_host; mod validation_host;
...@@ -66,8 +66,6 @@ pub enum ExecutionMode<'a> { ...@@ -66,8 +66,6 @@ pub enum ExecutionMode<'a> {
Local, Local,
/// Remote execution in a spawned process. /// Remote execution in a spawned process.
Remote(&'a ValidationPool), Remote(&'a ValidationPool),
/// Remote execution in a spawned test runner.
RemoteTest(&'a ValidationPool),
} }
#[derive(Debug, derive_more::Display, derive_more::From)] #[derive(Debug, derive_more::Display, derive_more::From)]
...@@ -143,11 +141,7 @@ pub fn validate_candidate( ...@@ -143,11 +141,7 @@ pub fn validate_candidate(
}, },
#[cfg(not(any(target_os = "android", target_os = "unknown")))] #[cfg(not(any(target_os = "android", target_os = "unknown")))]
ExecutionMode::Remote(pool) => { ExecutionMode::Remote(pool) => {
pool.validate_candidate(validation_code, params, false) pool.validate_candidate(validation_code, params)
},
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
ExecutionMode::RemoteTest(pool) => {
pool.validate_candidate(validation_code, params, true)
}, },
#[cfg(any(target_os = "android", target_os = "unknown"))] #[cfg(any(target_os = "android", target_os = "unknown"))]
ExecutionMode::Remote(_pool) => ExecutionMode::Remote(_pool) =>
...@@ -156,13 +150,6 @@ pub fn validate_candidate( ...@@ -156,13 +150,6 @@ pub fn validate_candidate(
"Remote validator not available".to_string() "Remote validator not available".to_string()
) as Box<_> ) as Box<_>
))), ))),
#[cfg(any(target_os = "android", target_os = "unknown"))]
ExecutionMode::RemoteTest(_pool) =>
Err(ValidationError::Internal(InternalError::System(
Box::<dyn std::error::Error + Send + Sync>::from(
"Remote validator not available".to_string()
) as Box<_>
))),
} }
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#![cfg(not(any(target_os = "android", target_os = "unknown")))] #![cfg(not(any(target_os = "android", target_os = "unknown")))]
use std::{process, env, sync::Arc, sync::atomic}; use std::{process, env, sync::Arc, sync::atomic, path::PathBuf};
use codec::{Decode, Encode}; use codec::{Decode, Encode};
use crate::primitives::{ValidationParams, ValidationResult}; use crate::primitives::{ValidationParams, ValidationResult};
use super::{ use super::{
...@@ -29,7 +29,6 @@ use log::{debug, trace}; ...@@ -29,7 +29,6 @@ use log::{debug, trace};
use futures::executor::ThreadPool; use futures::executor::ThreadPool;
use sp_core::traits::SpawnNamed; use sp_core::traits::SpawnNamed;
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
/// CLI Argument to start in validation worker mode. /// CLI Argument to start in validation worker mode.
const WORKER_ARG: &'static str = "validation-worker"; const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
...@@ -66,19 +65,40 @@ impl SpawnNamed for TaskExecutor { ...@@ -66,19 +65,40 @@ impl SpawnNamed for TaskExecutor {
} }
} }
/// The execution mode for the `ValidationPool`.
#[derive(Debug, Clone)]
pub enum ValidationExecutionMode {
/// The validation worker is ran in a thread inside the same process.
InProcess,
/// The validation worker is ran using the process' executable and the subcommand `validation-worker` is passed
/// following by the address of the shared memory.
ExternalProcessSelfHost,
/// The validation worker is ran using the command provided and the argument provided. The address of the shared
/// memory is added at the end of the arguments.
ExternalProcessCustomHost {
/// Path to the validation worker. The file must exists and be executable.
binary: PathBuf,
/// List of arguments passed to the validation worker. The address of the shared memory will be automatically
/// added after the arguments.
args: Vec<String>,
},
}
/// A pool of hosts. /// A pool of hosts.
#[derive(Clone)] #[derive(Clone)]
pub struct ValidationPool { pub struct ValidationPool {
hosts: Arc<Vec<Mutex<ValidationHost>>>, hosts: Arc<Vec<Mutex<ValidationHost>>>,
execution_mode: ValidationExecutionMode,
} }
const DEFAULT_NUM_HOSTS: usize = 8; const DEFAULT_NUM_HOSTS: usize = 8;
impl ValidationPool { impl ValidationPool {
/// Creates a validation pool with the default configuration. /// Creates a validation pool with the default configuration.
pub fn new() -> ValidationPool { pub fn new(execution_mode: ValidationExecutionMode) -> ValidationPool {
ValidationPool { ValidationPool {
hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()), hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()),
execution_mode,
} }
} }
...@@ -90,16 +110,15 @@ impl ValidationPool { ...@@ -90,16 +110,15 @@ impl ValidationPool {
&self, &self,
validation_code: &[u8], validation_code: &[u8],
params: ValidationParams, params: ValidationParams,
test_mode: bool,
) -> Result<ValidationResult, ValidationError> { ) -> Result<ValidationResult, ValidationError> {
for host in self.hosts.iter() { for host in self.hosts.iter() {
if let Some(mut host) = host.try_lock() { if let Some(mut host) = host.try_lock() {
return host.validate_candidate(validation_code, params, test_mode); return host.validate_candidate(validation_code, params, self.execution_mode.clone());
} }
} }
// all workers are busy, just wait for the first one // all workers are busy, just wait for the first one
self.hosts[0].lock().validate_candidate(validation_code, params, test_mode) self.hosts[0].lock().validate_candidate(validation_code, params, self.execution_mode.clone())
} }
} }
...@@ -208,6 +227,7 @@ unsafe impl Send for ValidationHost {} ...@@ -208,6 +227,7 @@ unsafe impl Send for ValidationHost {}
#[derive(Default)] #[derive(Default)]
struct ValidationHost { struct ValidationHost {
worker: Option<process::Child>, worker: Option<process::Child>,
worker_thread: Option<std::thread::JoinHandle<Result<(), String>>>,
memory: Option<SharedMem>, memory: Option<SharedMem>,
id: u32, id: u32,
} }
...@@ -233,7 +253,7 @@ impl ValidationHost { ...@@ -233,7 +253,7 @@ impl ValidationHost {
Ok(mem_config.create()?) Ok(mem_config.create()?)
} }
fn start_worker(&mut self, test_mode: bool) -> Result<(), InternalError> { fn start_worker(&mut self, execution_mode: ValidationExecutionMode) -> Result<(), InternalError> {
if let Some(ref mut worker) = self.worker { if let Some(ref mut worker) = self.worker {
// Check if still alive // Check if still alive
if let Ok(None) = worker.try_wait() { if let Ok(None) = worker.try_wait() {
...@@ -241,17 +261,38 @@ impl ValidationHost { ...@@ -241,17 +261,38 @@ impl ValidationHost {
return Ok(()); return Ok(());
} }
} }
if self.worker_thread.is_some() {
return Ok(());
}
let memory = Self::create_memory()?; let memory = Self::create_memory()?;
let self_path = env::current_exe()?;
debug!("Starting worker at {:?}", self_path); let mut run_worker_process = |cmd: PathBuf, args: Vec<String>| -> Result<(), std::io::Error> {
let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() }; debug!("Starting worker at {:?} with arguments: {:?} and {:?}", cmd, args, memory.get_os_path());
args.push(memory.get_os_path()); let worker = process::Command::new(cmd)
let worker = process::Command::new(self_path) .args(args)
.args(args) .arg(memory.get_os_path())
.stdin(process::Stdio::piped()) .stdin(process::Stdio::piped())
.spawn()?; .spawn()?;
self.id = worker.id(); self.id = worker.id();
self.worker = Some(worker); self.worker = Some(worker);
Ok(())
};
match execution_mode {
ValidationExecutionMode::InProcess => {
let mem_id = memory.get_os_path().to_string();
self.worker_thread = Some(std::thread::spawn(move || run_worker(mem_id.as_str())));
},
ValidationExecutionMode::ExternalProcessSelfHost => run_worker_process(
env::current_exe()?,
WORKER_ARGS.iter().map(|x| x.to_string()).collect(),
)?,
ValidationExecutionMode::ExternalProcessCustomHost { binary, args } => run_worker_process(
binary,
args,
)?,
};
memory.wait( memory.wait(
Event::WorkerReady as usize, Event::WorkerReady as usize,
...@@ -268,13 +309,13 @@ impl ValidationHost { ...@@ -268,13 +309,13 @@ impl ValidationHost {
&mut self, &mut self,
validation_code: &[u8], validation_code: &[u8],
params: ValidationParams, params: ValidationParams,
test_mode: bool, execution_mode: ValidationExecutionMode,
) -> Result<ValidationResult, ValidationError> { ) -> Result<ValidationResult, ValidationError> {
if validation_code.len() > MAX_CODE_MEM { if validation_code.len() > MAX_CODE_MEM {
return Err(ValidationError::InvalidCandidate(InvalidCandidate::CodeTooLarge(validation_code.len()))); return Err(ValidationError::InvalidCandidate(InvalidCandidate::CodeTooLarge(validation_code.len())));
} }
// First, check if need to spawn the child process // First, check if need to spawn the child process
self.start_worker(test_mode)?; self.start_worker(execution_mode)?;
let memory = self.memory.as_mut() let memory = self.memory.as_mut()
.expect("memory is always `Some` after `start_worker` completes successfully"); .expect("memory is always `Some` after `start_worker` completes successfully");
{ {
......
...@@ -16,11 +16,16 @@ ...@@ -16,11 +16,16 @@
//! Basic parachain that adds a number as part of its state. //! Basic parachain that adds a number as part of its state.
use parachain::primitives::{ const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
RelayChainBlockNumber,
BlockData as GenericBlockData, use parachain::{
HeadData as GenericHeadData, primitives::{
ValidationParams, RelayChainBlockNumber,
BlockData as GenericBlockData,
HeadData as GenericHeadData,
ValidationParams,
},
wasm_executor::{ValidationPool, ValidationExecutionMode}
}; };
use codec::{Decode, Encode}; use codec::{Decode, Encode};
...@@ -52,8 +57,28 @@ fn hash_head(head: &HeadData) -> [u8; 32] { ...@@ -52,8 +57,28 @@ fn hash_head(head: &HeadData) -> [u8; 32] {
tiny_keccak::keccak256(head.encode().as_slice()) tiny_keccak::keccak256(head.encode().as_slice())
} }
fn validation_pool() -> ValidationPool {
let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost {
binary: std::env::current_exe().unwrap(),
args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(),
};
ValidationPool::new(execution_mode)
}
#[test] #[test]
pub fn execute_good_on_parent() { fn execute_good_on_parent_with_inprocess_validation() {
let pool = ValidationPool::new(ValidationExecutionMode::InProcess);
execute_good_on_parent(pool);
}
#[test]
pub fn execute_good_on_parent_with_external_process_validation() {
let pool = validation_pool();
execute_good_on_parent(pool);
}
fn execute_good_on_parent(pool: ValidationPool) {
let parent_head = HeadData { let parent_head = HeadData {
number: 0, number: 0,
parent_hash: [0; 32], parent_hash: [0; 32],
...@@ -65,7 +90,6 @@ pub fn execute_good_on_parent() { ...@@ -65,7 +90,6 @@ pub fn execute_good_on_parent() {
add: 512, add: 512,
}; };
let pool = parachain::wasm_executor::ValidationPool::new();
let ret = parachain::wasm_executor::validate_candidate( let ret = parachain::wasm_executor::validate_candidate(
adder::wasm_binary_unwrap(), adder::wasm_binary_unwrap(),
...@@ -75,7 +99,7 @@ pub fn execute_good_on_parent() { ...@@ -75,7 +99,7 @@ pub fn execute_good_on_parent() {
relay_chain_height: 1, relay_chain_height: 1,
hrmp_mqc_heads: Vec::new(), hrmp_mqc_heads: Vec::new(),
}, },
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), parachain::wasm_executor::ExecutionMode::Remote(&pool),
sp_core::testing::TaskExecutor::new(), sp_core::testing::TaskExecutor::new(),
).unwrap(); ).unwrap();
...@@ -91,7 +115,7 @@ fn execute_good_chain_on_parent() { ...@@ -91,7 +115,7 @@ fn execute_good_chain_on_parent() {
let mut number = 0; let mut number = 0;
let mut parent_hash = [0; 32]; let mut parent_hash = [0; 32];
let mut last_state = 0; let mut last_state = 0;
let pool = parachain::wasm_executor::ValidationPool::new(); let pool = validation_pool();
for add in 0..10 { for add in 0..10 {
let parent_head = HeadData { let parent_head = HeadData {
...@@ -113,7 +137,7 @@ fn execute_good_chain_on_parent() { ...@@ -113,7 +137,7 @@ fn execute_good_chain_on_parent() {
relay_chain_height: number as RelayChainBlockNumber + 1, relay_chain_height: number as RelayChainBlockNumber + 1,
hrmp_mqc_heads: Vec::new(), hrmp_mqc_heads: Vec::new(),
}, },
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), parachain::wasm_executor::ExecutionMode::Remote(&pool),
sp_core::testing::TaskExecutor::new(), sp_core::testing::TaskExecutor::new(),
).unwrap(); ).unwrap();
...@@ -131,7 +155,7 @@ fn execute_good_chain_on_parent() { ...@@ -131,7 +155,7 @@ fn execute_good_chain_on_parent() {
#[test] #[test]
fn execute_bad_on_parent() { fn execute_bad_on_parent() {
let pool = parachain::wasm_executor::ValidationPool::new(); let pool = validation_pool();
let parent_head = HeadData { let parent_head = HeadData {
number: 0, number: 0,
...@@ -152,7 +176,7 @@ fn execute_bad_on_parent() { ...@@ -152,7 +176,7 @@ fn execute_bad_on_parent() {
relay_chain_height: 1, relay_chain_height: 1,
hrmp_mqc_heads: Vec::new(), hrmp_mqc_heads: Vec::new(),
}, },
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), parachain::wasm_executor::ExecutionMode::Remote(&pool),
sp_core::testing::TaskExecutor::new(), sp_core::testing::TaskExecutor::new(),
).unwrap_err(); ).unwrap_err();
} }
...@@ -16,15 +16,26 @@ ...@@ -16,15 +16,26 @@
//! Basic parachain that adds a number as part of its state. //! Basic parachain that adds a number as part of its state.
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
use crate::adder; use crate::adder;
use parachain::{ use parachain::{
primitives::{BlockData, ValidationParams}, primitives::{BlockData, ValidationParams},
wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC}, wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode, ValidationPool},
}; };
fn validation_pool() -> ValidationPool {
let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost {
binary: std::env::current_exe().unwrap(),
args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(),
};
ValidationPool::new(execution_mode)
}
#[test] #[test]
fn terminates_on_timeout() { fn terminates_on_timeout() {
let pool = parachain::wasm_executor::ValidationPool::new(); let pool = validation_pool();
let result = parachain::wasm_executor::validate_candidate( let result = parachain::wasm_executor::validate_candidate(
halt::wasm_binary_unwrap(), halt::wasm_binary_unwrap(),
...@@ -34,7 +45,7 @@ fn terminates_on_timeout() { ...@@ -34,7 +45,7 @@ fn terminates_on_timeout() {
relay_chain_height: 1, relay_chain_height: 1,
hrmp_mqc_heads: Vec::new(), hrmp_mqc_heads: Vec::new(),
}, },
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), parachain::wasm_executor::ExecutionMode::Remote(&pool),
sp_core::testing::TaskExecutor::new(), sp_core::testing::TaskExecutor::new(),
); );
match result { match result {
...@@ -43,12 +54,12 @@ fn terminates_on_timeout() { ...@@ -43,12 +54,12 @@ fn terminates_on_timeout() {
} }
// check that another parachain can validate normaly // check that another parachain can validate normaly
adder::execute_good_on_parent(); adder::execute_good_on_parent_with_external_process_validation();
} }
#[test] #[test]
fn parallel_execution() { fn parallel_execution() {
let pool = parachain::wasm_executor::ValidationPool::new(); let pool = validation_pool();
let start = std::time::Instant::now(); let start = std::time::Instant::now();
...@@ -62,7 +73,7 @@ fn parallel_execution() { ...@@ -62,7 +73,7 @@ fn parallel_execution() {
relay_chain_height: 1, relay_chain_height: 1,
hrmp_mqc_heads: Vec::new(), hrmp_mqc_heads: Vec::new(),
}, },
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool2), parachain::wasm_executor::ExecutionMode::Remote(&pool2),
sp_core::testing::TaskExecutor::new(), sp_core::testing::TaskExecutor::new(),
).ok()); ).ok());
let _ = parachain::wasm_executor::validate_candidate( let _ = parachain::wasm_executor::validate_candidate(
...@@ -73,7 +84,7 @@ fn parallel_execution() { ...@@ -73,7 +84,7 @@ fn parallel_execution() {
relay_chain_height: 1, relay_chain_height: 1,
hrmp_mqc_heads: Vec::new(), hrmp_mqc_heads: Vec::new(),
}, },
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), parachain::wasm_executor::ExecutionMode::Remote(&pool),
sp_core::testing::TaskExecutor::new(), sp_core::testing::TaskExecutor::new(),
); );
thread.join().unwrap(); thread.join().unwrap();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment