Commit 0ba8e4c7 authored by Pierre Krieger's avatar Pierre Krieger Committed by Bastian Köcher
Browse files

Make polkadot-parachain compile for WASM (#350)

parent d297bbac
Pipeline #48146 passed with stages
in 10 minutes and 38 seconds
......@@ -12,11 +12,13 @@ derive_more = { version = "0.14", optional = true }
serde = { version = "1.0", default-features = false, features = [ "derive" ] }
rstd = { package = "sr-std", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", default-features = false }
shared_memory = { version = "0.8", optional = true }
lazy_static = { version = "1.3.0", optional = true }
parking_lot = { version = "0.7.1", optional = true }
log = { version = "0.4.6", optional = true }
[target.'cfg(not(target_os = "unknown"))'.dependencies]
shared_memory = { version = "0.8", optional = true }
[dev-dependencies]
tiny-keccak = "1.4"
adder = { path = "../test-parachains/adder" }
......
......@@ -20,8 +20,7 @@
//! Assuming the parameters are correct, this module provides a wrapper around
//! a WASM VM for re-execution of a parachain candidate.
use std::{cell::RefCell, fmt, convert::TryInto, process, env};
use std::sync::{Arc, atomic};
use std::{cell::RefCell, fmt, convert::TryInto};
use crate::codec::{Decode, Encode};
use wasmi::{
self, Module, ModuleInstance, Trap, MemoryInstance, MemoryDescriptor, MemoryRef,
......@@ -29,30 +28,15 @@ use wasmi::{
memory_units::{self, Bytes, Pages, RoundUpTo}
};
use super::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef, UpwardMessage, IncomingMessage};
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
use parking_lot::Mutex;
use log::{trace, debug};
#[cfg(not(target_os = "unknown"))]
pub use validation_host::run_worker;
mod validation_host;
// maximum memory in bytes
const MAX_RUNTIME_MEM: usize = 1024 * 1024 * 1024; // 1 GiB
const MAX_CODE_MEM: usize = 16 * 1024 * 1024; // 16 MiB
// Message data limit
const MAX_MESSAGE_MEM: usize = 16 * 1024 * 1024; // 16 MiB
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
/// CLI Argument to start in validation worker mode.
const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
enum Event {
CandidateReady = 0,
ResultReady = 1,
WorkerReady = 2,
}
lazy_static::lazy_static! {
static ref HOST: Mutex<ValidationHost> = Mutex::new(ValidationHost::new());
}
mod ids {
/// Post a message to another parachain.
......@@ -63,6 +47,8 @@ mod ids {
}
/// WASM code execution mode.
///
/// > Note: When compiling for WASM, the `Remote` variants are not available.
pub enum ExecutionMode {
/// Execute in-process. The execution can not be interrupted or aborted.
Local,
......@@ -334,248 +320,6 @@ impl Externalities for WorkerExternalities {
}
}
/// Validation worker process entry point. Runs a loop waiting for canidates to validate
/// and sends back results via shared memory.
pub fn run_worker(mem_id: &str) -> Result<(), String> {
let mut memory = match SharedMem::open(mem_id) {
Ok(memory) => memory,
Err(e) => {
debug!("Error opening shared memory: {:?}", e);
return Err(format!("Error opening shared memory: {:?}", e));
}
};
let mut externalities = WorkerExternalities::default();
let exit = Arc::new(atomic::AtomicBool::new(false));
// spawn parent monitor thread
let watch_exit = exit.clone();
std::thread::spawn(move || {
use std::io::Read;
let mut in_data = Vec::new();
std::io::stdin().read_to_end(&mut in_data).ok(); // pipe terminates when parent process exits
debug!("Parent process is dead. Exiting");
exit.store(true, atomic::Ordering::Relaxed);
});
memory.set(Event::WorkerReady as usize, EventState::Signaled)
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
loop {
if watch_exit.load(atomic::Ordering::Relaxed) {
break;
}
debug!("Waiting for candidate");
match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(1)) {
Err(e) => {
// Timeout
trace!("Timeout waiting for candidate: {:?}", e);
continue;
}
Ok(()) => {}
}
{
debug!("Processing candidate");
// we have candidate data
let mut slice = memory.wlock_as_slice(0)
.map_err(|e| format!("Error locking shared memory: {:?}", e))?;
let result = {
let data: &mut[u8] = &mut **slice;
let (header_buf, rest) = data.split_at_mut(1024);
let mut header_buf: &[u8] = header_buf;
let header = ValidationHeader::decode(&mut header_buf)
.ok_or_else(|| format!("Error decoding validation request."))?;
debug!("Candidate header: {:?}", header);
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
let (code, _) = code.split_at_mut(header.code_size as usize);
let (call_data, rest) = rest.split_at_mut(MAX_RUNTIME_MEM);
let (call_data, _) = call_data.split_at_mut(header.params_size as usize);
let message_data = rest;
let result = validate_candidate_internal(code, call_data, &mut externalities);
debug!("Candidate validated: {:?}", result);
match result {
Ok(r) => {
if externalities.egress_data.len() + externalities.up_data.len() > MAX_MESSAGE_MEM {
ValidationResultHeader::Error("Message data is too large".into())
} else {
let e_len = externalities.egress_data.len();
let up_len = externalities.up_data.len();
message_data[0..e_len].copy_from_slice(&externalities.egress_data);
message_data[e_len..(e_len + up_len)].copy_from_slice(&externalities.up_data);
ValidationResultHeader::Ok {
result: r,
egress_message_count: externalities.egress_message_count as u64,
up_message_count: externalities.up_message_count as u64,
}
}
},
Err(e) => ValidationResultHeader::Error(e.to_string()),
}
};
let mut data: &mut[u8] = &mut **slice;
result.encode_to(&mut data);
}
debug!("Signaling result");
memory.set(Event::ResultReady as usize, EventState::Signaled)
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
}
Ok(())
}
unsafe impl Send for ValidationHost {}
struct ValidationHost {
worker: Option<process::Child>,
memory: Option<SharedMem>,
}
impl Drop for ValidationHost {
fn drop(&mut self) {
if let Some(ref mut worker) = &mut self.worker {
worker.kill().ok();
}
}
}
impl ValidationHost {
fn create_memory() -> Result<SharedMem, Error> {
let mem_size = MAX_RUNTIME_MEM + MAX_CODE_MEM + MAX_MESSAGE_MEM + 1024;
let mem_config = SharedMemConf::new()
.set_size(mem_size)
.add_lock(shared_memory::LockType::Mutex, 0, mem_size)?
.add_event(shared_memory::EventType::Auto)? // Event::CandidateReady
.add_event(shared_memory::EventType::Auto)? // Event::ResultReady
.add_event(shared_memory::EventType::Auto)?; // Evebt::WorkerReady
Ok(mem_config.create()?)
}
fn new() -> ValidationHost {
ValidationHost {
worker: None,
memory: None,
}
}
fn start_worker(&mut self, test_mode: bool) -> Result<(), Error> {
if let Some(ref mut worker) = self.worker {
// Check if still alive
if let Ok(None) = worker.try_wait() {
// Still running
return Ok(());
}
}
let memory = Self::create_memory()?;
let self_path = env::current_exe()?;
debug!("Starting worker at {:?}", self_path);
let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() };
args.push(memory.get_os_path());
let worker = process::Command::new(self_path)
.args(args)
.stdin(process::Stdio::piped())
.spawn()?;
self.worker = Some(worker);
memory.wait(Event::WorkerReady as usize, shared_memory::Timeout::Sec(5))?;
self.memory = Some(memory);
Ok(())
}
/// Validate a candidate under the given validation code.
///
/// This will fail if the validation code is not a proper parachain validation module.
fn validate_candidate<E: Externalities>(
&mut self,
validation_code: &[u8],
params: ValidationParams,
externalities: &mut E,
test_mode: bool,
) -> Result<ValidationResult, Error>
{
if validation_code.len() > MAX_CODE_MEM {
return Err(Error::CodeTooLarge(validation_code.len()));
}
// First, check if need to spawn the child process
self.start_worker(test_mode)?;
let memory = self.memory.as_mut().expect("memory is always `Some` after `start_worker` completes successfully");
{
// Put data in shared mem
let data: &mut[u8] = &mut **memory.wlock_as_slice(0)?;
let (mut header_buf, rest) = data.split_at_mut(1024);
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
let (code, _) = code.split_at_mut(validation_code.len());
let (call_data, _) = rest.split_at_mut(MAX_RUNTIME_MEM);
code[..validation_code.len()].copy_from_slice(validation_code);
let encoded_params = params.encode();
if encoded_params.len() >= MAX_RUNTIME_MEM {
return Err(Error::ParamsTooLarge(MAX_RUNTIME_MEM));
}
call_data[..encoded_params.len()].copy_from_slice(&encoded_params);
let header = ValidationHeader {
code_size: validation_code.len() as u64,
params_size: encoded_params.len() as u64,
};
header.encode_to(&mut header_buf);
}
debug!("Signaling candidate");
memory.set(Event::CandidateReady as usize, EventState::Signaled)?;
debug!("Waiting for results");
match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(5)) {
Err(e) => {
debug!("Worker timeout: {:?}", e);
if let Some(mut worker) = self.worker.take() {
worker.kill().ok();
}
return Err(Error::Timeout.into());
}
Ok(()) => {}
}
{
let data: &[u8] = &**memory.wlock_as_slice(0)?;
let (header_buf, rest) = data.split_at(1024);
let (_, rest) = rest.split_at(MAX_CODE_MEM);
let (_, message_data) = rest.split_at(MAX_RUNTIME_MEM);
let mut header_buf: &[u8] = header_buf;
let mut message_data: &[u8] = message_data;
let header = ValidationResultHeader::decode(&mut header_buf).unwrap();
match header {
ValidationResultHeader::Ok { result, egress_message_count, up_message_count } => {
for _ in 0 .. egress_message_count {
let message = IncomingMessage::decode(&mut message_data).unwrap();
let message_ref = MessageRef {
target: message.source,
data: &message.data,
};
externalities.post_message(message_ref)?;
}
for _ in 0 .. up_message_count {
let message = UpwardMessage::decode(&mut message_data).unwrap();
let message_ref = UpwardMessageRef {
origin: message.origin,
data: &message.data,
};
externalities.post_upward_message(message_ref)?;
}
Ok(result)
}
ValidationResultHeader::Error(message) => {
Err(Error::External(message).into())
}
}
}
}
}
/// Validate a candidate under the given validation code.
///
/// This will fail if the validation code is not a proper parachain validation module.
......@@ -590,10 +334,18 @@ pub fn validate_candidate<E: Externalities>(
ExecutionMode::Local => {
validate_candidate_internal(validation_code, &params.encode(), externalities)
},
#[cfg(not(target_os = "unknown"))]
ExecutionMode::Remote =>
validation_host::HOST.lock().validate_candidate(validation_code, params, externalities, false),
#[cfg(not(target_os = "unknown"))]
ExecutionMode::RemoteTest =>
validation_host::HOST.lock().validate_candidate(validation_code, params, externalities, true),
#[cfg(target_os = "unknown")]
ExecutionMode::Remote =>
HOST.lock().validate_candidate(validation_code, params, externalities, false),
Err(Error::System("Remote validator not available".to_string().into())),
#[cfg(target_os = "unknown")]
ExecutionMode::RemoteTest =>
HOST.lock().validate_candidate(validation_code, params, externalities, true),
Err(Error::System("Remote validator not available".to_string().into())),
}
}
......
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
#![cfg(not(target_os = "unknown"))]
use std::{process, env, sync::Arc, sync::atomic};
use crate::codec::{Decode, Encode};
use crate::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef, UpwardMessage, IncomingMessage};
use super::{validate_candidate_internal, Error, Externalities, WorkerExternalities};
use super::{MAX_CODE_MEM, MAX_RUNTIME_MEM};
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
use parking_lot::Mutex;
use log::{debug, trace};
// Message data limit
const MAX_MESSAGE_MEM: usize = 16 * 1024 * 1024; // 16 MiB
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
/// CLI Argument to start in validation worker mode.
const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
enum Event {
CandidateReady = 0,
ResultReady = 1,
WorkerReady = 2,
}
lazy_static::lazy_static! {
pub static ref HOST: Mutex<ValidationHost> = Mutex::new(ValidationHost::new());
}
/// Validation worker process entry point. Runs a loop waiting for canidates to validate
/// and sends back results via shared memory.
pub fn run_worker(mem_id: &str) -> Result<(), String> {
let mut memory = match SharedMem::open(mem_id) {
Ok(memory) => memory,
Err(e) => {
debug!("Error opening shared memory: {:?}", e);
return Err(format!("Error opening shared memory: {:?}", e));
}
};
let mut externalities = WorkerExternalities::default();
let exit = Arc::new(atomic::AtomicBool::new(false));
// spawn parent monitor thread
let watch_exit = exit.clone();
std::thread::spawn(move || {
use std::io::Read;
let mut in_data = Vec::new();
std::io::stdin().read_to_end(&mut in_data).ok(); // pipe terminates when parent process exits
debug!("Parent process is dead. Exiting");
exit.store(true, atomic::Ordering::Relaxed);
});
memory.set(Event::WorkerReady as usize, EventState::Signaled)
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
loop {
if watch_exit.load(atomic::Ordering::Relaxed) {
break;
}
debug!("Waiting for candidate");
match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(1)) {
Err(e) => {
// Timeout
trace!("Timeout waiting for candidate: {:?}", e);
continue;
}
Ok(()) => {}
}
{
debug!("Processing candidate");
// we have candidate data
let mut slice = memory.wlock_as_slice(0)
.map_err(|e| format!("Error locking shared memory: {:?}", e))?;
let result = {
let data: &mut[u8] = &mut **slice;
let (header_buf, rest) = data.split_at_mut(1024);
let mut header_buf: &[u8] = header_buf;
let header = ValidationHeader::decode(&mut header_buf)
.ok_or_else(|| format!("Error decoding validation request."))?;
debug!("Candidate header: {:?}", header);
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
let (code, _) = code.split_at_mut(header.code_size as usize);
let (call_data, rest) = rest.split_at_mut(MAX_RUNTIME_MEM);
let (call_data, _) = call_data.split_at_mut(header.params_size as usize);
let message_data = rest;
let result = validate_candidate_internal(code, call_data, &mut externalities);
debug!("Candidate validated: {:?}", result);
match result {
Ok(r) => {
if externalities.egress_data.len() + externalities.up_data.len() > MAX_MESSAGE_MEM {
ValidationResultHeader::Error("Message data is too large".into())
} else {
let e_len = externalities.egress_data.len();
let up_len = externalities.up_data.len();
message_data[0..e_len].copy_from_slice(&externalities.egress_data);
message_data[e_len..(e_len + up_len)].copy_from_slice(&externalities.up_data);
ValidationResultHeader::Ok {
result: r,
egress_message_count: externalities.egress_message_count as u64,
up_message_count: externalities.up_message_count as u64,
}
}
},
Err(e) => ValidationResultHeader::Error(e.to_string()),
}
};
let mut data: &mut[u8] = &mut **slice;
result.encode_to(&mut data);
}
debug!("Signaling result");
memory.set(Event::ResultReady as usize, EventState::Signaled)
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
}
Ok(())
}
/// Params header in shared memory. All offsets should be aligned to WASM page size.
#[derive(Encode, Decode, Debug)]
struct ValidationHeader {
code_size: u64,
params_size: u64,
}
#[derive(Encode, Decode, Debug)]
pub enum ValidationResultHeader {
Ok {
result: ValidationResult,
egress_message_count: u64,
up_message_count: u64,
},
Error(String),
}
unsafe impl Send for ValidationHost {}
pub struct ValidationHost {
worker: Option<process::Child>,
memory: Option<SharedMem>,
}
impl Drop for ValidationHost {
fn drop(&mut self) {
if let Some(ref mut worker) = &mut self.worker {
worker.kill().ok();
}
}
}
impl ValidationHost {
fn create_memory() -> Result<SharedMem, Error> {
let mem_size = MAX_RUNTIME_MEM + MAX_CODE_MEM + MAX_MESSAGE_MEM + 1024;
let mem_config = SharedMemConf::new()
.set_size(mem_size)
.add_lock(shared_memory::LockType::Mutex, 0, mem_size)?
.add_event(shared_memory::EventType::Auto)? // Event::CandidateReady
.add_event(shared_memory::EventType::Auto)? // Event::ResultReady
.add_event(shared_memory::EventType::Auto)?; // Evebt::WorkerReady
Ok(mem_config.create()?)
}
fn new() -> ValidationHost {
ValidationHost {
worker: None,
memory: None,
}
}
fn start_worker(&mut self, test_mode: bool) -> Result<(), Error> {
if let Some(ref mut worker) = self.worker {
// Check if still alive
if let Ok(None) = worker.try_wait() {
// Still running
return Ok(());
}
}
let memory = Self::create_memory()?;
let self_path = env::current_exe()?;
debug!("Starting worker at {:?}", self_path);
let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() };
args.push(memory.get_os_path());
let worker = process::Command::new(self_path)
.args(args)
.stdin(process::Stdio::piped())
.spawn()?;
self.worker = Some(worker);
memory.wait(Event::WorkerReady as usize, shared_memory::Timeout::Sec(5))?;
self.memory = Some(memory);
Ok(())
}
/// Validate a candidate under the given validation code.
///
/// This will fail if the validation code is not a proper parachain validation module.
pub fn validate_candidate<E: Externalities>(
&mut self,
validation_code: &[u8],
params: ValidationParams,
externalities: &mut E,
test_mode: bool,
) -> Result<ValidationResult, Error>
{
if validation_code.len() > MAX_CODE_MEM {
return Err(Error::CodeTooLarge(validation_code.len()));
}
// First, check if need to spawn the child process
self.start_worker(test_mode)?;
let memory = self.memory.as_mut().expect("memory is always `Some` after `start_worker` completes successfully");
{
// Put data in shared mem
let data: &mut[u8] = &mut **memory.wlock_as_slice(0)?;
let (mut header_buf, rest) = data.split_at_mut(1024);
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
let (code, _) = code.split_at_mut(validation_code.len());
let (call_data, _) = rest.split_at_mut(MAX_RUNTIME_MEM);
code[..validation_code.len()].copy_from_slice(validation_code);
let encoded_params = params.encode();
if encoded_params.len() >= MAX_RUNTIME_MEM {
return Err(Error::ParamsTooLarge(MAX_RUNTIME_MEM));
}
call_data[..encoded_params.len()].copy_from_slice(&encoded_params);
let header = ValidationHeader {
code_size: validation_code.len() as u64,
params_size: encoded_params.len() as u64,
};
header.encode_to(&mut header_buf);
}
debug!("Signaling candidate");