Commit c5836f3e authored by Arkadiy Paronyan's avatar Arkadiy Paronyan Committed by Bastian Köcher
Browse files

Parachain validation moved to external process (#325)



* Improved execution & tests

* Style

* Made CLI arg const

* Moved Upwards message

* CLI subcommand for validation worker

* Build halting parachain

* Build halting parachain

* Made stuff private

* Reorganized parachain tests

* Comment

* Whitespace

* Apply suggestions from code review
Co-Authored-By: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Fixed call data size check and introduced an enum

* Apply suggestions from code review
Co-Authored-By: default avatarBastian Köcher <bkchr@users.noreply.github.com>
parent 79d8c409
Pipeline #44110 passed with stages
in 17 minutes and 1 second
This diff is collapsed.
......@@ -10,5 +10,6 @@ log = "0.4.6"
tokio = "0.1.7"
futures = "0.1.17"
exit-future = "0.1"
structopt = "0.2"
cli = { package = "substrate-cli", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
service = { package = "polkadot-service", path = "../service" }
......@@ -28,6 +28,7 @@ use tokio::runtime::Runtime;
use service::Service as BareService;
use std::sync::Arc;
use log::info;
use structopt::StructOpt;
pub use service::{
Components as ServiceComponents, PolkadotService, CustomConfiguration, ServiceFactory, Factory,
......@@ -65,21 +66,28 @@ pub trait Worker: IntoExit {
fn work<S: PolkadotService>(self, service: &S, executor: TaskExecutor) -> Self::Work;
}
/// Parse command line arguments into service configuration.
///
/// IANA unassigned port ranges that we could use:
/// 6717-6766 Unassigned
/// 8504-8553 Unassigned
/// 9556-9591 Unassigned
/// 9803-9874 Unassigned
/// 9926-9949 Unassigned
pub fn run<I, T, W>(args: I, worker: W, version: cli::VersionInfo) -> error::Result<()> where
I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone,
#[derive(Debug, StructOpt, Clone)]
enum PolkadotSubCommands {
#[structopt(name = "validation-worker", raw(setting = "structopt::clap::AppSettings::Hidden"))]
ValidationWorker(ValidationWokerCommand),
}
impl cli::GetLogFilter for PolkadotSubCommands {
fn get_log_filter(&self) -> Option<String> { None }
}
#[derive(Debug, StructOpt, Clone)]
struct ValidationWokerCommand {
#[structopt()]
pub mem_id: String,
}
/// Parses polkadot specific CLI arguments and run the service.
pub fn run<W>(worker: W, version: cli::VersionInfo) -> error::Result<()> where
W: Worker,
{
cli::parse_and_execute::<service::Factory, NoCustom, NoCustom, _, _, _, _, _>(
load_spec, &version, "parity-polkadot", args, worker,
let command = cli::parse_and_execute::<service::Factory, PolkadotSubCommands, NoCustom, _, _, _, _, _>(
load_spec, &version, "parity-polkadot", std::env::args(), worker,
|worker, _cli_args, _custom_args, mut config| {
info!("{}", version.name);
info!(" version {}", config.full_version());
......@@ -103,7 +111,14 @@ pub fn run<I, T, W>(args: I, worker: W, version: cli::VersionInfo) -> error::Res
),
}.map_err(|e| format!("{:?}", e))
}
).map_err(Into::into).map(|_| ())
)?;
match command {
Some(PolkadotSubCommands::ValidationWorker(args)) => {
service::run_validation_worker(&args.mem_id).map_err(Into::into)
}
_ => Ok(())
}
}
fn run_until_exit<T, C, W>(
......
......@@ -429,12 +429,11 @@ fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRos
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
pub fn run_collator<P, E, I, ArgT>(
pub fn run_collator<P, E>(
build_parachain_context: P,
para_id: ParaId,
exit: E,
key: Arc<ed25519::Pair>,
args: I,
version: VersionInfo,
) -> polkadot_cli::error::Result<()> where
P: BuildParachainContext + Send + 'static,
......@@ -442,11 +441,9 @@ pub fn run_collator<P, E, I, ArgT>(
<<P::ParachainContext as ParachainContext>::ProduceCandidate as IntoFuture>::Future: Send + 'static,
E: IntoFuture<Item=(), Error=()>,
E::Future: Send + Clone + Sync + 'static,
I: IntoIterator<Item=ArgT>,
ArgT: Into<std::ffi::OsString> + Clone,
{
let node_logic = CollationNode { build_parachain_context, exit: exit.into_future(), para_id, key };
polkadot_cli::run(args, node_logic, version)
polkadot_cli::run(node_logic, version)
}
#[cfg(test)]
......
......@@ -12,12 +12,17 @@ derive_more = { version = "0.14", optional = true }
serde = { version = "1.0", default-features = false, features = [ "derive" ] }
rstd = { package = "sr-std", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", default-features = false }
shared_memory = { version = "0.8", optional = true }
lazy_static = { version = "1.3.0", optional = true }
parking_lot = { version = "0.7.1", optional = true }
log = { version = "0.4.6", optional = true }
[dev-dependencies]
tiny-keccak = "1.4"
adder = { path = "../test-parachains/adder" }
halt = { path = "../test-parachains/halt" }
[features]
default = ["std"]
wasm-api = []
std = [ "codec/std", "wasmi", "derive_more", "serde/std", "rstd/std" ]
std = [ "codec/std", "wasmi", "derive_more", "serde/std", "rstd/std", "shared_memory", "lazy_static", "parking_lot", "log" ]
......@@ -207,3 +207,13 @@ pub struct UpwardMessageRef<'a> {
/// Underlying data of the message.
pub data: &'a [u8],
}
/// A message from a parachain to its Relay Chain.
#[derive(Clone, PartialEq, Eq, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug))]
pub struct UpwardMessage {
/// The origin for the message to be sent from.
pub origin: ParachainDispatchOrigin,
/// The message data.
pub data: Vec<u8>,
}
......@@ -20,14 +20,39 @@
//! Assuming the parameters are correct, this module provides a wrapper around
//! a WASM VM for re-execution of a parachain candidate.
use std::{cell::RefCell, fmt, convert::TryInto};
use std::{cell::RefCell, fmt, convert::TryInto, process, env};
use std::sync::{Arc, atomic};
use crate::codec::{Decode, Encode};
use wasmi::{
self, Module, ModuleInstance, Trap, MemoryInstance, MemoryDescriptor, MemoryRef,
ModuleImportResolver, RuntimeValue, Externals, Error as WasmError, ValueType,
memory_units::{self, Bytes, Pages, RoundUpTo}
};
use super::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef};
use super::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef, UpwardMessage, IncomingMessage};
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
use parking_lot::Mutex;
use log::{trace, debug};
// maximum memory in bytes
const MAX_RUNTIME_MEM: usize = 1024 * 1024 * 1024; // 1 GiB
const MAX_CODE_MEM: usize = 16 * 1024 * 1024; // 16 MiB
// Message data limit
const MAX_MESSAGE_MEM: usize = 16 * 1024 * 1024; // 16 MiB
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
/// CLI Argument to start in validation worker mode.
const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
enum Event {
CandidateReady = 0,
ResultReady = 1,
WorkerReady = 2,
}
lazy_static::lazy_static! {
static ref HOST: Mutex<ValidationHost> = Mutex::new(ValidationHost::new());
}
mod ids {
/// Post a message to another parachain.
......@@ -37,6 +62,16 @@ mod ids {
pub const POST_UPWARDS_MESSAGE: usize = 2;
}
/// WASM code execution mode.
pub enum ExecutionMode {
/// Execute in-process. The execution can not be interrupted or aborted.
Local,
/// Remote execution in a spawned process.
Remote,
/// Remote execution in a spawned test runner.
RemoteTest,
}
/// Error type for the wasm executor
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
......@@ -44,12 +79,23 @@ pub enum Error {
Wasm(WasmError),
/// Externalities error
Externalities(ExternalitiesError),
/// Call data too big. WASM32 only has a 32-bit address space.
#[display(fmt = "Validation parameters took up {} bytes, max allowed by WASM is {}", _0, i32::max_value())]
/// Code size it too large.
#[display(fmt = "WASM code is {} bytes, max allowed is {}", _0, MAX_CODE_MEM)]
CodeTooLarge(usize),
/// Call data is too large.
#[display(fmt = "Validation parameters are {} bytes, max allowed is {}", _0, MAX_RUNTIME_MEM)]
ParamsTooLarge(usize),
/// Bad return data or type.
#[display(fmt = "Validation function returned invalid data.")]
BadReturn,
#[display(fmt = "Validation function timeout.")]
Timeout,
#[display(fmt = "IO error: {}", _0)]
Io(std::io::Error),
#[display(fmt = "System error: {}", _0)]
System(Box<dyn std::error::Error>),
#[display(fmt = "WASM worker error: {}", _0)]
External(String),
}
impl std::error::Error for Error {
......@@ -57,6 +103,8 @@ impl std::error::Error for Error {
match self {
Error::Wasm(ref err) => Some(err),
Error::Externalities(ref err) => Some(err),
Error::Io(ref err) => Some(err),
Error::System(ref err) => Some(&**err),
_ => None,
}
}
......@@ -238,6 +286,296 @@ impl<'a, E: 'a + Externalities> Externals for ValidationExternals<'a, E> {
}
}
/// Params header in shared memory. All offsets should be aligned to WASM page size.
#[derive(Encode, Decode, Debug)]
struct ValidationHeader {
code_size: u64,
params_size: u64,
}
#[derive(Encode, Decode, Debug)]
pub enum ValidationResultHeader {
Ok {
result: ValidationResult,
egress_message_count: u64,
up_message_count: u64,
},
Error(String),
}
#[derive(Default)]
struct WorkerExternalities {
egress_data: Vec<u8>,
egress_message_count: usize,
up_data: Vec<u8>,
up_message_count: usize,
}
impl Externalities for WorkerExternalities {
fn post_message(&mut self, message: MessageRef) -> Result<(), ExternalitiesError> {
IncomingMessage {
source: message.target,
data: message.data.to_vec(),
}
.encode_to(&mut self.egress_data);
self.egress_message_count += 1;
Ok(())
}
fn post_upward_message(&mut self, message: UpwardMessageRef) -> Result<(), ExternalitiesError> {
UpwardMessage {
origin: message.origin,
data: message.data.to_vec(),
}
.encode_to(&mut self.up_data);
self.up_message_count += 1;
Ok(())
}
}
/// Validation worker process entry point. Runs a loop waiting for canidates to validate
/// and sends back results via shared memory.
pub fn run_worker(mem_id: &str) -> Result<(), String> {
let mut memory = match SharedMem::open(mem_id) {
Ok(memory) => memory,
Err(e) => {
debug!("Error opening shared memory: {:?}", e);
return Err(format!("Error opening shared memory: {:?}", e));
}
};
let mut externalities = WorkerExternalities::default();
let exit = Arc::new(atomic::AtomicBool::new(false));
// spawn parent monitor thread
let watch_exit = exit.clone();
std::thread::spawn(move || {
use std::io::Read;
let mut in_data = Vec::new();
std::io::stdin().read_to_end(&mut in_data).ok(); // pipe terminates when parent process exits
debug!("Parent process is dead. Exiting");
exit.store(true, atomic::Ordering::Relaxed);
});
memory.set(Event::WorkerReady as usize, EventState::Signaled)
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
loop {
if watch_exit.load(atomic::Ordering::Relaxed) {
break;
}
debug!("Waiting for candidate");
match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(1)) {
Err(e) => {
// Timeout
trace!("Timeout waiting for candidate: {:?}", e);
continue;
}
Ok(()) => {}
}
{
debug!("Processing candidate");
// we have candidate data
let mut slice = memory.wlock_as_slice(0)
.map_err(|e| format!("Error locking shared memory: {:?}", e))?;
let result = {
let data: &mut[u8] = &mut **slice;
let (header_buf, rest) = data.split_at_mut(1024);
let mut header_buf: &[u8] = header_buf;
let header = ValidationHeader::decode(&mut header_buf)
.ok_or_else(|| format!("Error decoding validation request."))?;
debug!("Candidate header: {:?}", header);
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
let (code, _) = code.split_at_mut(header.code_size as usize);
let (call_data, rest) = rest.split_at_mut(MAX_RUNTIME_MEM);
let (call_data, _) = call_data.split_at_mut(header.params_size as usize);
let message_data = rest;
let result = validate_candidate_internal(code, call_data, &mut externalities);
debug!("Candidate validated: {:?}", result);
match result {
Ok(r) => {
if externalities.egress_data.len() + externalities.up_data.len() > MAX_MESSAGE_MEM {
ValidationResultHeader::Error("Message data is too large".into())
} else {
let e_len = externalities.egress_data.len();
let up_len = externalities.up_data.len();
message_data[0..e_len].copy_from_slice(&externalities.egress_data);
message_data[e_len..(e_len + up_len)].copy_from_slice(&externalities.up_data);
ValidationResultHeader::Ok {
result: r,
egress_message_count: externalities.egress_message_count as u64,
up_message_count: externalities.up_message_count as u64,
}
}
},
Err(e) => ValidationResultHeader::Error(e.to_string()),
}
};
let mut data: &mut[u8] = &mut **slice;
result.encode_to(&mut data);
}
debug!("Signaling result");
memory.set(Event::ResultReady as usize, EventState::Signaled)
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
}
Ok(())
}
unsafe impl Send for ValidationHost {}
struct ValidationHost {
worker: Option<process::Child>,
memory: Option<SharedMem>,
}
impl Drop for ValidationHost {
fn drop(&mut self) {
if let Some(ref mut worker) = &mut self.worker {
worker.kill().ok();
}
}
}
impl ValidationHost {
fn create_memory() -> Result<SharedMem, Error> {
let mem_size = MAX_RUNTIME_MEM + MAX_CODE_MEM + MAX_MESSAGE_MEM + 1024;
let mem_config = SharedMemConf::new()
.set_size(mem_size)
.add_lock(shared_memory::LockType::Mutex, 0, mem_size)?
.add_event(shared_memory::EventType::Auto)? // Event::CandidateReady
.add_event(shared_memory::EventType::Auto)? // Event::ResultReady
.add_event(shared_memory::EventType::Auto)?; // Evebt::WorkerReady
Ok(mem_config.create()?)
}
fn new() -> ValidationHost {
ValidationHost {
worker: None,
memory: None,
}
}
fn start_worker(&mut self, test_mode: bool) -> Result<(), Error> {
if let Some(ref mut worker) = self.worker {
// Check if still alive
if let Ok(None) = worker.try_wait() {
// Still running
return Ok(());
}
}
let memory = Self::create_memory()?;
let self_path = env::current_exe()?;
debug!("Starting worker at {:?}", self_path);
let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() };
args.push(memory.get_os_path());
let worker = process::Command::new(self_path)
.args(args)
.stdin(process::Stdio::piped())
.spawn()?;
self.worker = Some(worker);
memory.wait(Event::WorkerReady as usize, shared_memory::Timeout::Sec(5))?;
self.memory = Some(memory);
Ok(())
}
/// Validate a candidate under the given validation code.
///
/// This will fail if the validation code is not a proper parachain validation module.
fn validate_candidate<E: Externalities>(
&mut self,
validation_code: &[u8],
params: ValidationParams,
externalities: &mut E,
test_mode: bool,
) -> Result<ValidationResult, Error>
{
if validation_code.len() > MAX_CODE_MEM {
return Err(Error::CodeTooLarge(validation_code.len()));
}
// First, check if need to spawn the child process
self.start_worker(test_mode)?;
let memory = self.memory.as_mut().expect("memory is always `Some` after `start_worker` completes successfully");
{
// Put data in shared mem
let data: &mut[u8] = &mut **memory.wlock_as_slice(0)?;
let (mut header_buf, rest) = data.split_at_mut(1024);
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
let (code, _) = code.split_at_mut(validation_code.len());
let (call_data, _) = rest.split_at_mut(MAX_RUNTIME_MEM);
code[..validation_code.len()].copy_from_slice(validation_code);
let encoded_params = params.encode();
if encoded_params.len() >= MAX_RUNTIME_MEM {
return Err(Error::ParamsTooLarge(MAX_RUNTIME_MEM));
}
call_data[..encoded_params.len()].copy_from_slice(&encoded_params);
let header = ValidationHeader {
code_size: validation_code.len() as u64,
params_size: encoded_params.len() as u64,
};
header.encode_to(&mut header_buf);
}
debug!("Signaling candidate");
memory.set(Event::CandidateReady as usize, EventState::Signaled)?;
debug!("Waiting for results");
match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(5)) {
Err(e) => {
debug!("Worker timeout: {:?}", e);
if let Some(mut worker) = self.worker.take() {
worker.kill().ok();
}
return Err(Error::Timeout.into());
}
Ok(()) => {}
}
{
let data: &[u8] = &**memory.wlock_as_slice(0)?;
let (header_buf, rest) = data.split_at(1024);
let (_, rest) = rest.split_at(MAX_CODE_MEM);
let (_, message_data) = rest.split_at(MAX_RUNTIME_MEM);
let mut header_buf: &[u8] = header_buf;
let mut message_data: &[u8] = message_data;
let header = ValidationResultHeader::decode(&mut header_buf).unwrap();
match header {
ValidationResultHeader::Ok { result, egress_message_count, up_message_count } => {
for _ in 0 .. egress_message_count {
let message = IncomingMessage::decode(&mut message_data).unwrap();
let message_ref = MessageRef {
target: message.source,
data: &message.data,
};
externalities.post_message(message_ref)?;
}
for _ in 0 .. up_message_count {
let message = UpwardMessage::decode(&mut message_data).unwrap();
let message_ref = UpwardMessageRef {
origin: message.origin,
data: &message.data,
};
externalities.post_upward_message(message_ref)?;
}
Ok(result)
}
ValidationResultHeader::Error(message) => {
Err(Error::External(message).into())
}
}
}
}
}
/// Validate a candidate under the given validation code.
///
/// This will fail if the validation code is not a proper parachain validation module.
......@@ -245,11 +583,29 @@ pub fn validate_candidate<E: Externalities>(
validation_code: &[u8],
params: ValidationParams,
externalities: &mut E,
) -> Result<ValidationResult, Error> {
use wasmi::LINEAR_MEMORY_PAGE_SIZE;
options: ExecutionMode,
) -> Result<ValidationResult, Error>
{
match options {
ExecutionMode::Local => {
validate_candidate_internal(validation_code, &params.encode(), externalities)
},
ExecutionMode::Remote =>
HOST.lock().validate_candidate(validation_code, params, externalities, false),
ExecutionMode::RemoteTest =>
HOST.lock().validate_candidate(validation_code, params, externalities, true),
}
}
// maximum memory in bytes
const MAX_MEM: u32 = 1024 * 1024 * 1024; // 1 GiB
/// Validate a candidate under the given validation code.
///
/// This will fail if the validation code is not a proper parachain validation module.
pub fn validate_candidate_internal<E: Externalities>(
validation_code: &[u8],
encoded_call_data: &[u8],
externalities: &mut E,
) -> Result<ValidationResult, Error> {
use wasmi::LINEAR_MEMORY_PAGE_SIZE;
// instantiate the module.
let memory;
......@@ -258,7 +614,7 @@ pub fn validate_candidate<E: Externalities>(
let module = Module::from_buffer(validation_code)?;
let module_resolver = Resolver {
max_memory: MAX_MEM / LINEAR_MEMORY_PAGE_SIZE.0 as u32,
max_memory: (MAX_RUNTIME_MEM / LINEAR_MEMORY_PAGE_SIZE.0) as u32,
memory: RefCell::new(None),
};
......@@ -285,8 +641,6 @@ pub fn validate_candidate<E: Externalities>(
// - `offset` has alignment at least of 8,
// - `len` is not zero.
let (offset, len) = {
let encoded_call_data = params.encode();
// hard limit from WASM.
if encoded_call_data.len() > i32::max_value() as usize {
return Err(Error::ParamsTooLarge(encoded_call_data.len()));
......
......@@ -18,10 +18,8 @@
use polkadot_parachain as parachain;
use crate::parachain::{
MessageRef, UpwardMessageRef, IncomingMessage, ValidationParams,
wasm_executor::{Externalities, ExternalitiesError},
};
use crate::parachain::{IncomingMessage, ValidationParams};
use crate::DummyExt;
use codec::{Decode, Encode};
/// Head data for this parachain.
......@@ -50,16 +48,6 @@ struct AddMessage {
amount: u64,
}
struct DummyExt;
impl Externalities for DummyExt {
fn post_message(&mut self, _message: MessageRef) -> Result<(), ExternalitiesError> {
Ok(())
}
fn post_upward_message(&mut self, _message: UpwardMessageRef) -> Result<(), ExternalitiesError> {