Commit 5893fa7c authored by asynchronous rob's avatar asynchronous rob Committed by Gav Wood
Browse files

Parachain execution yields messages to send (#96)

* read head-data directly out of WASM memory

* implement ext_post_message for parachain WASM

* further refactoring of the parachain module

* add externalities error type

* accumulate posted messages when validating parachain candidate

* define Extrinsic type in primitives

* availability-store: store extrinsic data

* compute extrinsic and check against candidate

* add some egress queue tests

* grumbles & substrate update

* ensure everything builds
parent 3ae32004
Pipeline #28883 passed with stages
in 12 minutes and 58 seconds
This diff is collapsed.
......@@ -128,11 +128,11 @@ impl Store {
data.block_data.encode()
);
if let Some(_extrinsic) = data.extrinsic {
if let Some(extrinsic) = data.extrinsic {
tx.put_vec(
columns::DATA,
extrinsic_key(&data.relay_parent, &data.candidate_hash).as_slice(),
vec![],
extrinsic.encode(),
);
}
......@@ -182,7 +182,9 @@ impl Store {
pub fn extrinsic(&self, relay_parent: Hash, candidate_hash: Hash) -> Option<Extrinsic> {
let encoded_key = extrinsic_key(&relay_parent, &candidate_hash);
match self.inner.get(columns::DATA, &encoded_key[..]) {
Ok(Some(_raw)) => Some(Extrinsic),
Ok(Some(raw)) => Some(
Extrinsic::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed")
),
Ok(None) => None,
Err(e) => {
warn!(target: "availability", "Error reading from availability store: {:?}", e);
......@@ -215,7 +217,7 @@ mod tests {
parachain_id: para_id_1,
candidate_hash: candidate_1,
block_data: block_data_1.clone(),
extrinsic: Some(Extrinsic),
extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }),
}).unwrap();
store.make_available(Data {
......@@ -223,7 +225,7 @@ mod tests {
parachain_id: para_id_2,
candidate_hash: candidate_2,
block_data: block_data_2.clone(),
extrinsic: Some(Extrinsic),
extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }),
}).unwrap();
assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1);
......
......@@ -23,6 +23,7 @@ substrate-primitives = { git = "https://github.com/paritytech/substrate" }
substrate-transaction-pool = { git = "https://github.com/paritytech/substrate" }
srml-support = { git = "https://github.com/paritytech/substrate" }
substrate-client = { git = "https://github.com/paritytech/substrate" }
substrate-trie = { git = "https://github.com/paritytech/substrate" }
sr-primitives = { git = "https://github.com/paritytech/substrate" }
[dev-dependencies]
......
......@@ -22,9 +22,10 @@
use std::sync::Arc;
use polkadot_primitives::{Block, Hash, AccountId, BlockId};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic};
use polkadot_primitives::parachain::ParachainHost;
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, OutgoingMessage};
use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost};
use runtime_primitives::traits::ProvideRuntimeApi;
use parachain::{wasm_executor::{self, ExternalitiesError}, MessageRef};
use futures::prelude::*;
......@@ -100,7 +101,9 @@ impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P>
};
match validate_collation(&*self.client, &self.relay_parent, &x) {
Ok(e) => return Ok(Async::Ready((x, e))),
Ok(e) => {
return Ok(Async::Ready((x, e)))
}
Err(e) => {
debug!("Failed to validate parachain due to API error: {}", e);
......@@ -117,36 +120,137 @@ impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P>
error_chain! {
types { Error, ErrorKind, ResultExt; }
links {
Client(::client::error::Error, ::client::error::ErrorKind);
WasmValidation(wasm_executor::Error, wasm_executor::ErrorKind);
}
errors {
InactiveParachain(id: ParaId) {
description("Collated for inactive parachain"),
display("Collated for inactive parachain: {:?}", id),
}
ValidationFailure {
description("Parachain candidate failed validation."),
display("Parachain candidate failed validation."),
EgressRootMismatch(id: ParaId, expected: Hash, got: Hash) {
description("Got unexpected egress route."),
display(
"Got unexpected egress route to {:?}. (expected: {:?}, got {:?})",
id, expected, got
),
}
MissingEgressRoute(expected: Option<ParaId>, got: Option<ParaId>) {
description("Missing or extra egress route."),
display("Missing or extra egress route. (expected: {:?}, got {:?})", expected, got),
}
WrongHeadData(expected: Vec<u8>, got: Vec<u8>) {
description("Parachain validation produced wrong head data."),
display("Parachain validation produced wrong head data (expected: {:?}, got {:?}", expected, got),
}
}
}
links {
Client(::client::error::Error, ::client::error::ErrorKind);
/// Compute the egress trie root for a set of messages.
pub fn egress_trie_root<A, I: IntoIterator<Item=A>>(messages: I) -> Hash
where A: AsRef<[u8]>
{
::trie::ordered_trie_root::<primitives::Blake2Hasher, _, _>(messages)
}
fn check_and_compute_extrinsic(
mut outgoing: Vec<OutgoingMessage>,
expected_egress_roots: &[(ParaId, Hash)],
) -> Result<Extrinsic, Error> {
// stable sort messages by parachain ID.
outgoing.sort_by_key(|msg| ParaId::from(msg.target));
{
let mut messages_iter = outgoing.iter().peekable();
let mut expected_egress_roots = expected_egress_roots.iter();
while let Some(batch_target) = messages_iter.peek().map(|o| o.target) {
let expected_root = match expected_egress_roots.next() {
None => return Err(ErrorKind::MissingEgressRoute(Some(batch_target), None).into()),
Some(&(id, ref root)) => if id == batch_target {
root
} else {
return Err(ErrorKind::MissingEgressRoute(Some(batch_target), Some(id)).into());
}
};
// we borrow the iterator mutably to ensure it advances so the
// next iteration of the loop starts with `messages_iter` pointing to
// the next batch.
let messages_to = messages_iter
.clone()
.take_while(|o| o.target == batch_target)
.map(|o| { let _ = messages_iter.next(); &o.data[..] });
let computed_root = egress_trie_root(messages_to);
if &computed_root != expected_root {
return Err(ErrorKind::EgressRootMismatch(
batch_target,
expected_root.clone(),
computed_root,
).into());
}
}
// also check that there are no more additional expected roots.
if let Some((next_target, _)) = expected_egress_roots.next() {
return Err(ErrorKind::MissingEgressRoute(None, Some(*next_target)).into());
}
}
Ok(Extrinsic { outgoing_messages: outgoing })
}
struct Externalities {
parachain_index: ParaId,
outgoing: Vec<OutgoingMessage>,
}
impl wasm_executor::Externalities for Externalities {
fn post_message(&mut self, message: MessageRef) -> Result<(), ExternalitiesError> {
// TODO: https://github.com/paritytech/polkadot/issues/92
// check per-message and per-byte fees for the parachain.
let target: ParaId = message.target.into();
if target == self.parachain_index {
return Err(ExternalitiesError::CannotPostMessage("posted message to self"));
}
self.outgoing.push(OutgoingMessage {
target,
data: message.data.to_vec(),
});
Ok(())
}
}
impl Externalities {
// Performs final checks of validity, producing the extrinsic data.
fn final_checks(
self,
candidate: &CandidateReceipt,
) -> Result<Extrinsic, Error> {
check_and_compute_extrinsic(
self.outgoing,
&candidate.egress_queue_roots[..],
)
}
}
/// Check whether a given collation is valid. Returns `Ok` on success, error otherwise.
///
/// This assumes that basic validity checks have been done:
/// - Block data hash is the same as linked in candidate receipt.
pub fn validate_collation<P>(
client: &P,
relay_parent: &BlockId,
collation: &Collation
) -> Result<Extrinsic, Error> where
P: ProvideRuntimeApi,
P::Api: ParachainHost<Block>
P::Api: ParachainHost<Block>,
{
use parachain::{self, ValidationParams};
use parachain::ValidationParams;
let api = client.runtime_api();
let para_id = collation.receipt.parachain_index;
......@@ -161,10 +265,15 @@ pub fn validate_collation<P>(
block_data: collation.block_data.0.clone(),
};
match parachain::wasm::validate_candidate(&validation_code, params) {
let mut ext = Externalities {
parachain_index: collation.receipt.parachain_index.clone(),
outgoing: Vec::new(),
};
match wasm_executor::validate_candidate(&validation_code, params, &mut ext) {
Ok(result) => {
if result.head_data == collation.receipt.head_data.0 {
Ok(Extrinsic)
ext.final_checks(&collation.receipt)
} else {
Err(ErrorKind::WrongHeadData(
collation.receipt.head_data.0.clone(),
......@@ -172,6 +281,60 @@ pub fn validate_collation<P>(
).into())
}
}
Err(_) => Err(ErrorKind::ValidationFailure.into())
Err(e) => Err(e.into())
}
}
#[cfg(test)]
mod tests {
use super::*;
use parachain::wasm_executor::Externalities as ExternalitiesTrait;
#[test]
fn egress_roots() {
let messages = vec![
OutgoingMessage { target: 3.into(), data: vec![1, 1, 1] },
OutgoingMessage { target: 1.into(), data: vec![1, 2, 3] },
OutgoingMessage { target: 2.into(), data: vec![4, 5, 6] },
OutgoingMessage { target: 1.into(), data: vec![7, 8, 9] },
];
let root_1 = egress_trie_root(&[vec![1, 2, 3], vec![7, 8, 9]]);
let root_2 = egress_trie_root(&[vec![4, 5, 6]]);
let root_3 = egress_trie_root(&[vec![1, 1, 1]]);
assert!(check_and_compute_extrinsic(
messages.clone(),
&[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3)],
).is_ok());
// missing root.
assert!(check_and_compute_extrinsic(
messages.clone(),
&[(1.into(), root_1), (3.into(), root_3)],
).is_err());
// extra root.
assert!(check_and_compute_extrinsic(
messages.clone(),
&[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3), (4.into(), Default::default())],
).is_err());
// root mismatch.
assert!(check_and_compute_extrinsic(
messages.clone(),
&[(1.into(), root_2), (2.into(), root_1), (3.into(), root_3)],
).is_err());
}
#[test]
fn ext_rejects_local_message() {
let mut ext = Externalities {
parachain_index: 5.into(),
outgoing: Vec::new(),
};
assert!(ext.post_message(MessageRef { target: 1, data: &[] }).is_ok());
assert!(ext.post_message(MessageRef { target: 5, data: &[] }).is_err());
}
}
......@@ -41,6 +41,7 @@ extern crate substrate_primitives as primitives;
extern crate srml_support as runtime_support;
extern crate sr_primitives as runtime_primitives;
extern crate substrate_client as client;
extern crate substrate_trie as trie;
extern crate exit_future;
extern crate tokio;
......@@ -90,7 +91,7 @@ use futures::future::{self, Either};
use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion;
pub use self::collation::{validate_collation, Collators};
pub use self::collation::{validate_collation, egress_trie_root, Collators};
pub use self::error::{ErrorKind, Error};
pub use self::shared_table::{SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement, GenericStatement};
......@@ -113,8 +114,6 @@ pub trait TableRouter: Clone {
type Error;
/// Future that resolves when candidate data is fetched.
type FetchCandidate: IntoFuture<Item=BlockData,Error=Self::Error>;
/// Future that resolves when extrinsic candidate data is fetched.
type FetchExtrinsic: IntoFuture<Item=ParachainExtrinsic,Error=Self::Error>;
/// Call with local candidate data. This will make the data available on the network,
/// and sign, import, and broadcast a statement about the candidate.
......@@ -122,9 +121,6 @@ pub trait TableRouter: Clone {
/// Fetch block data for a specific candidate.
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate;
/// Fetch extrinsic data for a specific candidate.
fn fetch_extrinsic_data(&self, candidate: &CandidateReceipt) -> Self::FetchExtrinsic;
}
/// A long-lived network which can create parachain statement and BFT message routing processes on demand.
......@@ -229,7 +225,6 @@ struct ParachainConsensus<C, N, P> {
extrinsic_store: ExtrinsicStore,
/// Live agreements.
live_instances: Mutex<HashMap<Hash, Arc<AttestationTracker>>>,
}
impl<C, N, P> ParachainConsensus<C, N, P> where
......
......@@ -156,7 +156,7 @@ pub struct Validated {
/// Block data to ensure availability of.
pub block_data: BlockData,
/// Extrinsic data to ensure availability of.
pub extrinsic: Extrinsic,
pub extrinsic: Option<Extrinsic>,
}
/// Future that performs parachain validation work.
......@@ -172,7 +172,7 @@ impl<D: Future> ParachainWork<D> {
pub fn prime<P: ProvideRuntimeApi>(self, api: Arc<P>)
-> PrimedParachainWork<
D,
impl Send + FnMut(&BlockId, &Collation) -> bool,
impl Send + FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>,
>
where
P: Send + Sync + 'static,
......@@ -186,10 +186,10 @@ impl<D: Future> ParachainWork<D> {
);
match res {
Ok(_) => true,
Ok(e) => Ok(e),
Err(e) => {
debug!(target: "consensus", "Encountered bad collation: {}", e);
false
Err(())
}
}
};
......@@ -199,7 +199,7 @@ impl<D: Future> ParachainWork<D> {
/// Prime the parachain work with a custom validation function.
pub fn prime_with<F>(self, validate: F) -> PrimedParachainWork<D, F>
where F: FnMut(&BlockId, &Collation) -> bool
where F: FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>
{
PrimedParachainWork { inner: self, validate }
}
......@@ -219,7 +219,7 @@ pub struct PrimedParachainWork<D: Future, F> {
impl<D, F, Err> Future for PrimedParachainWork<D, F>
where
D: Future<Item=BlockData,Error=Err>,
F: FnMut(&BlockId, &Collation) -> bool,
F: FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>,
Err: From<::std::io::Error>,
{
type Item = Validated;
......@@ -230,28 +230,31 @@ impl<D, F, Err> Future for PrimedParachainWork<D, F>
let candidate = &work.candidate_receipt;
let block = try_ready!(work.fetch_block_data.poll());
let is_good = (self.validate)(
let validation_res = (self.validate)(
&BlockId::hash(self.inner.relay_parent),
&Collation { block_data: block.clone(), receipt: candidate.clone() },
);
let candidate_hash = candidate.hash();
debug!(target: "consensus", "Making validity statement about candidate {}: is_good? {:?}", candidate_hash, is_good);
let validity_statement = match is_good {
true => GenericStatement::Valid(candidate_hash),
false => GenericStatement::Invalid(candidate_hash),
debug!(target: "consensus", "Making validity statement about candidate {}: is_good? {:?}",
candidate_hash, validation_res.is_ok());
let (extrinsic, validity_statement) = match validation_res {
Err(()) => (None, GenericStatement::Invalid(candidate_hash)),
Ok(extrinsic) => {
self.inner.extrinsic_store.make_available(Data {
relay_parent: self.inner.relay_parent,
parachain_id: work.candidate_receipt.parachain_index,
candidate_hash,
block_data: block.clone(),
extrinsic: Some(extrinsic.clone()),
})?;
(Some(extrinsic), GenericStatement::Valid(candidate_hash))
}
};
let extrinsic = Extrinsic;
self.inner.extrinsic_store.make_available(Data {
relay_parent: self.inner.relay_parent,
parachain_id: work.candidate_receipt.parachain_index,
candidate_hash,
block_data: block.clone(),
extrinsic: Some(extrinsic.clone()),
})?;
Ok(Async::Ready(Validated {
validity: validity_statement,
block_data: block,
......@@ -444,7 +447,6 @@ mod tests {
impl TableRouter for DummyRouter {
type Error = ::std::io::Error;
type FetchCandidate = ::futures::future::FutureResult<BlockData,Self::Error>;
type FetchExtrinsic = ::futures::future::FutureResult<Extrinsic,Self::Error>;
fn local_candidate(&self, _candidate: CandidateReceipt, _block_data: BlockData, _extrinsic: Extrinsic) {
......@@ -452,9 +454,6 @@ mod tests {
fn fetch_block_data(&self, _candidate: &CandidateReceipt) -> Self::FetchCandidate {
future::ok(BlockData(vec![1, 2, 3, 4, 5]))
}
fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic {
future::ok(Extrinsic)
}
}
#[test]
......@@ -586,7 +585,9 @@ mod tests {
extrinsic_store: store.clone(),
};
let produced = producer.prime_with(|_, _| true).wait().unwrap();
let produced = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() }))
.wait()
.unwrap();
assert_eq!(produced.block_data, block_data);
assert_eq!(produced.validity, GenericStatement::Valid(hash));
......@@ -624,7 +625,9 @@ mod tests {
extrinsic_store: store.clone(),
};
let produced = producer.prime_with(|_, _| true).wait().unwrap();
let produced = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() }))
.wait()
.unwrap();
assert_eq!(produced.block_data, block_data);
......
......@@ -231,13 +231,21 @@ impl Knowledge {
/// Note a statement seen from another validator.
pub(crate) fn note_statement(&mut self, from: SessionKey, statement: &Statement) {
// those proposing the candidate or declaring it valid know everything.
// those claiming it invalid do not have the extrinsic data as it is
// generated by valid execution.
match *statement {
GenericStatement::Candidate(ref c) => {
let mut entry = self.candidates.entry(c.hash()).or_insert_with(Default::default);
entry.knows_block_data.push(from);
entry.knows_extrinsic.push(from);
}
GenericStatement::Valid(ref hash) | GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash)
GenericStatement::Valid(ref hash) => {
let mut entry = self.candidates.entry(*hash).or_insert_with(Default::default);
entry.knows_block_data.push(from);
entry.knows_extrinsic.push(from);
}
GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash)
.or_insert_with(Default::default)
.knows_block_data
.push(from),
......
......@@ -167,7 +167,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
knowledge.lock().note_candidate(
candidate_hash,
Some(produced.block_data),
Some(produced.extrinsic),
produced.extrinsic,
);
let mut gossip = network.consensus_gossip().write();
......@@ -188,7 +188,6 @@ impl<P: ProvideRuntimeApi + Send> TableRouter for Router<P>
{
type Error = io::Error;
type FetchCandidate = BlockDataReceiver;
type FetchExtrinsic = Result<Extrinsic, Self::Error>;
fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) {
// give to network to make available.
......@@ -207,10 +206,6 @@ impl<P: ProvideRuntimeApi + Send> TableRouter for Router<P>
let rx = self.network.with_spec(|spec, ctx| { spec.fetch_block_data(ctx, candidate, parent_hash) });
BlockDataReceiver { inner: rx }
}
fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic {
Ok(Extrinsic)
}
}
impl<P> Drop for Router<P> {
......
......@@ -15,4 +15,5 @@ tiny-keccak = "1.4"
[features]
default = ["std"]
wasm-api = []
std = ["parity-codec/std", "wasmi", "error-chain"]
......@@ -37,8 +37,8 @@
//! ^~~returned pointer
//! ```
//!
//! The `load_params` and `write_result` functions provide utilities for setting up
//! a parachain WASM module in Rust.
//! The `wasm_api` module (enabled only with the wasm-api feature) provides utilities
//! for setting up a parachain WASM module in Rust.
#![cfg_attr(not(feature = "std"), no_std)]
#![cfg_attr(not(feature = "std"), feature(alloc))]
......@@ -64,15 +64,17 @@ extern crate error_chain;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use codec::{Encode, Decode};
#[cfg(feature = "std")]
pub mod wasm;
pub mod wasm_executor;
#[cfg(feature = "wasm-api")]
pub mod wasm_api;
/// Validation parameters for evaluating the parachain validity function.
// TODO: consolidated ingress and balance downloads
#[derive(PartialEq, Eq, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug))]
#[derive(PartialEq, Eq, Decode)]
#[cfg_attr(feature = "std", derive(Debug, Encode))]
pub struct ValidationParams {
/// The collation body.
pub block_data: Vec<u8>,
......@@ -82,38 +84,19 @@ pub struct ValidationParams {
/// The result of parachain validation.
// TODO: egress and balance uploads
#[derive(PartialEq, Eq, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug))]
#[derive(PartialEq, Eq, Encode)]
#[cfg_attr(feature = "std", derive(Debug, Decode))]
pub struct ValidationResult {
/// New head data that should be included in the relay chain state.
pub head_data: Vec<u8>,
}
/// Load the validation params from memory when implementing a Rust parachain.
///
/// Offset and length must have been provided by the validation
/// function's entry point.
pub unsafe fn load_params(offset: usize, len: usize) -> ValidationParams {
let mut slice = ::core::slice::from_raw_parts(offset as *const u8, len);
ValidationParams::decode(&mut slice).expect("Invalid input data")
/// A reference to a message.
#[cfg(feature = "std")]
pub struct MessageRef<'a> {
/// The target parachain.
pub target: u32,
/// Underlying data of the message.
pub data: &'a [u8],
}
/// Allocate the validation result in memory, getting the return-pointer back.
///
/// As described in the crate docs, this is a pointer to the appended length
/// of the vector.
pub fn write_result(result: ValidationResult) -> usize {
let mut encoded = result.encode();
let len = encoded.len();
assert!(len <= u32::max_value() as usize, "Len too large for parachain-WASM abi");
(len as u32).using_encoded(|s| encoded.extend(