Unverified Commit 66c9580c authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Interchain message-passing (#117)

* compute ingress and routing in polkadot runtime

* extract parent candidates from block when beginning consensus

* fetch incoming messages when validating

* fix consensus tests

* parachain wasm execution uses messages

* update parachain tests to check if messages are executed

* abstract out network service to make room for network tests

* skeleton for incoming data fetch

* collate ingress from consensus-gossip

* keep track of validated candidates in the shared-table

* add some shared_table tests for new behavior

* broadcast egress messages on gossip

* test compute_ingress

* move network tests to module folder

* dummy network for consensus-network tests

* make consensus network generic over executor

* test egress broadcast and ingress fetch

* fix test compilation

* address some grumbles

* address grumbles and fix parachain shuffle

* remove broadcast parameter from consensus network trait
parent c30e0000
Pipeline #31107 passed with stages
in 17 minutes and 51 seconds
......@@ -2257,8 +2257,10 @@ dependencies = [
"polkadot-availability-store 0.1.0",
"polkadot-consensus 0.1.0",
"polkadot-primitives 0.1.0",
"rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"slice-group-by 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sr-primitives 0.1.0 (git+https://github.com/paritytech/substrate)",
"substrate-client 0.1.0 (git+https://github.com/paritytech/substrate)",
"substrate-keyring 0.1.0 (git+https://github.com/paritytech/substrate)",
"substrate-network 0.1.0 (git+https://github.com/paritytech/substrate)",
"substrate-primitives 0.1.0 (git+https://github.com/paritytech/substrate)",
"tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -2681,16 +2683,6 @@ dependencies = [
"winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rhododendron"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ring"
version = "0.14.5"
......@@ -2819,6 +2811,11 @@ name = "scopeguard"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "sdset"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "secp256k1"
version = "0.12.2"
......@@ -2952,6 +2949,14 @@ name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "slice-group-by"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"sdset 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "slog"
version = "2.4.1"
......@@ -4963,7 +4968,6 @@ dependencies = [
"checksum regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "37e7cbbd370869ce2e8dff25c7018702d10b21a20ef7135316f8daecd6c25b7f"
"checksum regex-syntax 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8c2f35eedad5295fdf00a63d7d4b238135723f92b434ec06774dad15c7ab0861"
"checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5"
"checksum rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e20523445e693f394c0e487113ae656071311c9ee4c1e914441bece8c929b21d"
"checksum ring 0.14.5 (registry+https://github.com/rust-lang/crates.io-index)" = "148fc853f6d85f53f5f315d46701eaacc565cdfb3cb1959730c96e81e7e49999"
"checksum rocksdb 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "39be726e556e6f21d54d21cdf1be9f6df30c0411a5856c1abf3f4bb12498f2ed"
"checksum rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f1651697fefd273bfb4fd69466cc2a9d20de557a0213b97233b22b5e95924b5e"
......@@ -4980,6 +4984,7 @@ dependencies = [
"checksum schnorrkel 0.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fe554f318830b48e5da8ab1ccb1ffd02b79228364dac7766b7cd1ec461ca5116"
"checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28"
"checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27"
"checksum sdset 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c6959a7341a17cbff280a619c3a3c0001d2d6b54661e6d04c3741c3af07cc2c5"
"checksum secp256k1 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bfaccd3a23619349e0878d9a241f34b1982343cdf67367058cd7d078d326b63e"
"checksum security-framework 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bfab8dda0e7a327c696d893df9ffa19cadc4bd195797997f5223cf5831beaf05"
"checksum security-framework-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3d6696852716b589dff9e886ff83778bb635150168e83afa8ac6b8a78cb82abc"
......@@ -4995,6 +5000,7 @@ dependencies = [
"checksum sha3 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "34a5e54083ce2b934bf059fdf38e7330a154177e029ab6c4e18638f2f624053a"
"checksum shell32-sys 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9ee04b46101f57121c9da2b151988283b6beb79b34f5bb29a58ee48cb695122c"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
"checksum slice-group-by 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "35538e9d2853b9f041156791bf1b871f27d45f0a2fc816fd90ebea6c63bb3f93"
"checksum slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e1a2eec401952cd7b12a84ea120e2d57281329940c3f93c2bf04f462539508e"
"checksum slog-async 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e544d16c6b230d84c866662fe55e31aacfca6ae71e6fc49ae9a311cb379bfc2f"
"checksum slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ddc0d2aff1f8f325ef660d9a0eb6e6dcd20b30b3f581a5897f58bf42d061c37a"
......
......@@ -27,22 +27,41 @@ use std::thread;
use std::time::{Duration, Instant};
use std::sync::Arc;
use client::{BlockchainEvents, ChainHead, BlockBody};
use client::{error::Result as ClientResult, BlockchainEvents, ChainHead, BlockBody};
use client::block_builder::api::BlockBuilder;
use client::blockchain::HeaderBackend;
use client::runtime_api::Core;
use primitives::ed25519;
use futures::prelude::*;
use polkadot_primitives::{Block, BlockId};
use polkadot_primitives::parachain::ParachainHost;
use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost};
use extrinsic_store::Store as ExtrinsicStore;
use runtime_primitives::traits::ProvideRuntimeApi;
use runtime_primitives::traits::{ProvideRuntimeApi, Header as HeaderT};
use tokio::runtime::TaskExecutor;
use tokio::runtime::current_thread::Runtime as LocalRuntime;
use tokio::timer::Interval;
use super::{Network, Collators};
use super::{Network, Collators, TableRouter};
/// Gets a list of the candidates in a block.
pub(crate) fn fetch_candidates<P: BlockBody<Block>>(client: &P, block: &BlockId)
-> ClientResult<Option<impl Iterator<Item=CandidateReceipt>>>
{
use codec::{Encode, Decode};
use polkadot_runtime::{Call, ParachainsCall, UncheckedExtrinsic as RuntimeExtrinsic};
let extrinsics = client.block_body(block)?;
Ok(extrinsics
.into_iter()
.filter_map(|ex| RuntimeExtrinsic::decode(&mut ex.encode().as_slice()))
.filter_map(|ex| match ex.function {
Call::Parachains(ParachainsCall::set_heads(heads)) =>
Some(heads.into_iter().map(|c| c.candidate)),
_ => None,
})
.next())
}
// creates a task to prune redundant entries in availability store upon block finalization
//
......@@ -52,47 +71,20 @@ fn prune_unneeded_availability<P>(client: Arc<P>, extrinsic_store: ExtrinsicStor
-> impl Future<Item=(),Error=()> + Send
where P: Send + Sync + BlockchainEvents<Block> + BlockBody<Block> + 'static
{
use codec::{Encode, Decode};
use polkadot_primitives::BlockId;
enum NotifyError {
BodyFetch(::client::error::Error),
}
impl NotifyError {
fn log(&self, hash: &::polkadot_primitives::Hash) {
match *self {
NotifyError::BodyFetch(ref err) => warn!("Failed to fetch block body for imported block {:?}: {:?}", hash, err),
}
}
}
client.finality_notification_stream()
.for_each(move |notification| {
use polkadot_runtime::{Call, ParachainsCall, UncheckedExtrinsic as RuntimeExtrinsic};
let hash = notification.hash;
let parent_hash = notification.header.parent_hash;
let extrinsics = client.block_body(&BlockId::hash(hash))
.map_err(NotifyError::BodyFetch);
let extrinsics = match extrinsics {
Ok(r) => r,
Err(e) => { e.log(&hash); return Ok(()) }
};
let candidate_hashes = match extrinsics
.iter()
.filter_map(|ex| RuntimeExtrinsic::decode(&mut ex.encode().as_slice()))
.filter_map(|ex| match ex.function {
Call::Parachains(ParachainsCall::set_heads(ref heads)) =>
Some(heads.iter().map(|c| c.candidate.hash()).collect()),
_ => None,
})
.next()
{
Some(x) => x,
None => return Ok(()),
let candidate_hashes = match fetch_candidates(&*client, &BlockId::hash(hash)) {
Ok(Some(candidates)) => candidates.map(|c| c.hash()).collect(),
Ok(None) => {
warn!("Could not extract candidates from block body of imported block {:?}", hash);
return Ok(())
}
Err(e) => {
warn!("Failed to fetch block body for imported block {:?}: {:?}", hash, e);
return Ok(())
}
};
if let Err(e) = extrinsic_store.candidates_finalized(parent_hash, candidate_hashes) {
......@@ -125,6 +117,7 @@ pub(crate) fn start<C, N, P>(
P::Api: ParachainHost<Block> + Core<Block> + BlockBuilder<Block>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
<<N::TableRouter as TableRouter>::FetchIncoming as IntoFuture>::Future: Send + 'static,
{
const TIMER_DELAY: Duration = Duration::from_secs(5);
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
......@@ -148,6 +141,7 @@ pub(crate) fn start<C, N, P>(
.and_then(|authorities| {
consensus.get_or_instantiate(
parent_hash,
notification.header.parent_hash().clone(),
&authorities,
key.clone(),
)
......
......@@ -26,6 +26,7 @@ use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, Outgoin
use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost};
use runtime_primitives::traits::ProvideRuntimeApi;
use parachain::{wasm_executor::{self, ExternalitiesError}, MessageRef};
use super::Incoming;
use futures::prelude::*;
......@@ -34,7 +35,7 @@ use futures::prelude::*;
/// This is expected to be a lightweight, shared type like an `Arc`.
pub trait Collators: Clone {
/// Errors when producing collations.
type Error;
type Error: std::fmt::Debug;
/// A full collation.
type Collation: IntoFuture<Item=Collation,Error=Self::Error>;
......@@ -54,25 +55,33 @@ pub trait Collators: Clone {
/// A future which resolves when a collation is available.
///
/// This future is fused.
pub struct CollationFetch<C: Collators, P: ProvideRuntimeApi> {
pub struct CollationFetch<C: Collators, P> {
parachain: ParaId,
relay_parent_hash: Hash,
relay_parent: BlockId,
collators: C,
incoming: Incoming,
live_fetch: Option<<C::Collation as IntoFuture>::Future>,
client: Arc<P>,
}
impl<C: Collators, P: ProvideRuntimeApi> CollationFetch<C, P> {
impl<C: Collators, P> CollationFetch<C, P> {
/// Create a new collation fetcher for the given chain.
pub fn new(parachain: ParaId, relay_parent: BlockId, relay_parent_hash: Hash, collators: C, client: Arc<P>) -> Self {
pub fn new(
parachain: ParaId,
relay_parent_hash: Hash,
collators: C,
client: Arc<P>,
incoming: Incoming,
) -> Self {
CollationFetch {
relay_parent: BlockId::hash(relay_parent_hash),
relay_parent_hash,
relay_parent,
collators,
client,
parachain,
live_fetch: None,
incoming,
}
}
......@@ -80,6 +89,11 @@ impl<C: Collators, P: ProvideRuntimeApi> CollationFetch<C, P> {
pub fn relay_parent(&self) -> Hash {
self.relay_parent_hash
}
/// Access the local parachain ID.
pub fn parachain(&self) -> ParaId {
self.parachain
}
}
impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P>
......@@ -100,7 +114,7 @@ impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P>
try_ready!(poll)
};
match validate_collation(&*self.client, &self.relay_parent, &x) {
match validate_collation(&*self.client, &self.relay_parent, &x, &self.incoming) {
Ok(e) => {
return Ok(Async::Ready((x, e)))
}
......@@ -148,14 +162,39 @@ error_chain! {
}
}
/// Compute the egress trie root for a set of messages.
pub fn egress_trie_root<A, I: IntoIterator<Item=A>>(messages: I) -> Hash
/// Compute a trie root for a set of messages.
pub fn message_queue_root<A, I: IntoIterator<Item=A>>(messages: I) -> Hash
where A: AsRef<[u8]>
{
::trie::ordered_trie_root::<primitives::Blake2Hasher, _, _>(messages)
}
fn check_and_compute_extrinsic(
/// Compute the set of egress roots for all given outgoing messages.
pub fn egress_roots(mut outgoing: Vec<OutgoingMessage>) -> Vec<(ParaId, Hash)> {
// stable sort messages by parachain ID.
outgoing.sort_by_key(|msg| ParaId::from(msg.target));
let mut egress_roots = Vec::new();
{
let mut messages_iter = outgoing.iter().peekable();
while let Some(batch_target) = messages_iter.peek().map(|o| o.target) {
// we borrow the iterator mutably to ensure it advances so the
// next iteration of the loop starts with `messages_iter` pointing to
// the next batch.
let messages_to = messages_iter
.clone()
.take_while(|o| o.target == batch_target)
.map(|o| { let _ = messages_iter.next(); &o.data[..] });
let computed_root = message_queue_root(messages_to);
egress_roots.push((batch_target, computed_root));
}
}
egress_roots
}
fn check_extrinsic(
mut outgoing: Vec<OutgoingMessage>,
expected_egress_roots: &[(ParaId, Hash)],
) -> Result<Extrinsic, Error> {
......@@ -183,7 +222,7 @@ fn check_and_compute_extrinsic(
.take_while(|o| o.target == batch_target)
.map(|o| { let _ = messages_iter.next(); &o.data[..] });
let computed_root = egress_trie_root(messages_to);
let computed_root = message_queue_root(messages_to);
if &computed_root != expected_root {
return Err(ErrorKind::EgressRootMismatch(
batch_target,
......@@ -231,7 +270,7 @@ impl Externalities {
self,
candidate: &CandidateReceipt,
) -> Result<Extrinsic, Error> {
check_and_compute_extrinsic(
check_extrinsic(
self.outgoing,
&candidate.egress_queue_roots[..],
)
......@@ -242,15 +281,17 @@ impl Externalities {
///
/// This assumes that basic validity checks have been done:
/// - Block data hash is the same as linked in candidate receipt.
/// - incoming messages have been validated against canonical ingress roots
pub fn validate_collation<P>(
client: &P,
relay_parent: &BlockId,
collation: &Collation
collation: &Collation,
incoming: &Incoming,
) -> Result<Extrinsic, Error> where
P: ProvideRuntimeApi,
P::Api: ParachainHost<Block>,
{
use parachain::ValidationParams;
use parachain::{IncomingMessage, ValidationParams};
let api = client.runtime_api();
let para_id = collation.receipt.parachain_index;
......@@ -263,6 +304,15 @@ pub fn validate_collation<P>(
let params = ValidationParams {
parent_head: chain_head,
block_data: collation.block_data.0.clone(),
ingress: incoming.iter()
.flat_map(|&(para_id, ref messages)| {
let source: u32 = para_id.into();
messages.iter().map(move |msg| IncomingMessage {
source,
data: msg.0.clone(),
})
})
.collect()
};
let mut ext = Externalities {
......@@ -291,7 +341,7 @@ mod tests {
use parachain::wasm_executor::Externalities as ExternalitiesTrait;
#[test]
fn egress_roots() {
fn compute_and_check_egress() {
let messages = vec![
OutgoingMessage { target: 3.into(), data: vec![1, 1, 1] },
OutgoingMessage { target: 1.into(), data: vec![1, 2, 3] },
......@@ -299,29 +349,36 @@ mod tests {
OutgoingMessage { target: 1.into(), data: vec![7, 8, 9] },
];
let root_1 = egress_trie_root(&[vec![1, 2, 3], vec![7, 8, 9]]);
let root_2 = egress_trie_root(&[vec![4, 5, 6]]);
let root_3 = egress_trie_root(&[vec![1, 1, 1]]);
let root_1 = message_queue_root(&[vec![1, 2, 3], vec![7, 8, 9]]);
let root_2 = message_queue_root(&[vec![4, 5, 6]]);
let root_3 = message_queue_root(&[vec![1, 1, 1]]);
assert!(check_and_compute_extrinsic(
assert!(check_extrinsic(
messages.clone(),
&[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3)],
).is_ok());
let egress_roots = egress_roots(messages.clone());
assert!(check_extrinsic(
messages.clone(),
&egress_roots[..],
).is_ok());
// missing root.
assert!(check_and_compute_extrinsic(
assert!(check_extrinsic(
messages.clone(),
&[(1.into(), root_1), (3.into(), root_3)],
).is_err());
// extra root.
assert!(check_and_compute_extrinsic(
assert!(check_extrinsic(
messages.clone(),
&[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3), (4.into(), Default::default())],
).is_err());
// root mismatch.
assert!(check_and_compute_extrinsic(
assert!(check_extrinsic(
messages.clone(),
&[(1.into(), root_2), (2.into(), root_1), (3.into(), root_3)],
).is_err());
......
<
......@@ -77,13 +77,11 @@ use parking_lot::Mutex;
use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, SessionKey};
use polkadot_primitives::parachain::{
Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt,
CandidateSignature
};
use polkadot_primitives::parachain::{
AttestedCandidate, ParachainHost, Statement as PrimitiveStatement
CandidateSignature, ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message,
OutgoingMessage,
};
use primitives::{Ed25519AuthorityId as AuthorityId, ed25519};
use runtime_primitives::{traits::ProvideRuntimeApi, ApplyError};
use runtime_primitives::{traits::{ProvideRuntimeApi, Header as HeaderT}, ApplyError};
use tokio::runtime::TaskExecutor;
use tokio::timer::{Delay, Interval};
use transaction_pool::txpool::{Pool, ChainApi as PoolChainApi};
......@@ -97,11 +95,11 @@ use inherents::InherentData;
use runtime_aura::timestamp::TimestampInherentData;
use aura::SlotDuration;
pub use self::collation::{validate_collation, egress_trie_root, Collators};
pub use self::collation::{validate_collation, message_queue_root, egress_roots, Collators};
pub use self::error::{ErrorKind, Error};
pub use self::shared_table::{
SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement,
GenericStatement
GenericStatement,
};
mod attestation_service;
......@@ -115,14 +113,40 @@ pub mod collation;
// block size limit.
const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
/// Incoming messages; a series of sorted (ParaId, Message) pairs.
pub type Incoming = Vec<(ParaId, Vec<Message>)>;
/// Outgoing messages from various candidates.
pub type Outgoing = Vec<MessagesFrom>;
/// Some messages from a parachain.
pub struct MessagesFrom {
/// The parachain originating the messages.
pub from: ParaId,
/// The messages themselves.
pub messages: ParachainExtrinsic,
}
impl MessagesFrom {
/// Construct from the raw messages.
pub fn from_messages(from: ParaId, messages: Vec<OutgoingMessage>) -> Self {
MessagesFrom {
from,
messages: ParachainExtrinsic { outgoing_messages: messages },
}
}
}
/// A handle to a statement table router.
///
/// This is expected to be a lightweight, shared type like an `Arc`.
pub trait TableRouter: Clone {
/// Errors when fetching data from the network.
type Error;
type Error: std::fmt::Debug;
/// Future that resolves when candidate data is fetched.
type FetchCandidate: IntoFuture<Item=BlockData,Error=Self::Error>;
/// Fetch incoming messages for a candidate.
type FetchIncoming: IntoFuture<Item=Incoming,Error=Self::Error>;
/// Call with local candidate data. This will make the data available on the network,
/// and sign, import, and broadcast a statement about the candidate.
......@@ -130,6 +154,14 @@ pub trait TableRouter: Clone {
/// Fetch block data for a specific candidate.
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate;
/// Fetches the incoming message data to a parachain from the network. Incoming data should be
/// checked.
///
/// The `ParachainHost::ingress` function can be used to fetch incoming roots,
/// and the `message_queue_root` function can be used to check that messages actually have
/// expected root.
fn fetch_incoming(&self, id: ParaId) -> Self::FetchIncoming;
}
/// A long-lived network which can create parachain statement and BFT message routing processes on demand.
......@@ -138,12 +170,12 @@ pub trait Network {
/// routing statements to peers, and driving completion of any `StatementProducers`.
type TableRouter: TableRouter;
/// Instantiate a table router using the given shared table and task executor.
/// Instantiate a table router using the given shared table.
/// Also pass through any outgoing messages to be broadcast to peers.
fn communication_for(
&self,
validators: &[SessionKey],
table: Arc<SharedTable>,
task_executor: TaskExecutor
outgoing: Outgoing,
) -> Self::TableRouter;
}
......@@ -179,7 +211,12 @@ pub fn check_statement(statement: &Statement, signature: &CandidateSignature, si
signature.verify(&encoded[..], &signer.into())
}
fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId], local_id: AuthorityId) -> Result<(HashMap<ParaId, GroupInfo>, LocalDuty), Error> {
/// Compute group info out of a duty roster and a local authority set.
pub fn make_group_info(
roster: DutyRoster,
authorities: &[AuthorityId],
local_id: AuthorityId,
) -> Result<(HashMap<ParaId, GroupInfo>, LocalDuty), Error> {
if roster.validator_duty.len() != authorities.len() {
bail!(ErrorKind::InvalidDutyRosterLength(authorities.len(), roster.validator_duty.len()))
}
......@@ -232,25 +269,31 @@ struct ParachainConsensus<C, N, P> {
handle: TaskExecutor,
/// Store for extrinsic data.
extrinsic_store: ExtrinsicStore,
/// Live agreements.
/// Live agreements. Maps relay chain parent hashes to attestation
/// instances.
live_instances: Mutex<HashMap<Hash, Arc<AttestationTracker>>>,
}
impl<C, N, P> ParachainConsensus<C, N, P> where
C: Collators + Send + 'static,
N: Network,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P: ProvideRuntimeApi + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + BlockBuilderApi<Block>,
<C::Collation as IntoFuture>::Future: Send + 'static,
N::TableRouter: Send + 'static,
<<N::TableRouter as TableRouter>::FetchIncoming as IntoFuture>::Future: Send + 'static,
{
/// Get an attestation table for given parent hash.
///
/// This starts a parachain agreement process for given parent hash if
/// This starts a parachain agreement process on top of the parent hash if
/// one has not already started.
///
/// Additionally, this will trigger broadcast of data to the new block's duty
/// roster.
fn get_or_instantiate(
&self,
parent_hash: Hash,
grandparent_hash: Hash,
authorities: &[AuthorityId],
sign_with: Arc<ed25519::Pair>,
)
......@@ -262,6 +305,29 @@ impl<C, N, P> ParachainConsensus<C, N, P> where
}
let id = BlockId::hash(parent_hash);
// compute the parent candidates, if we know of them.
// this will allow us to circulate outgoing messages to other peers as necessary.
let parent_candidates: Vec<_> = ::attestation_service::fetch_candidates(&*self.client, &id)
.ok()
.and_then(|x| x)
.map(|x| x.collect())
.unwrap_or_default();
let outgoing: Vec<_> = {
// extract all extrinsic data that we have and propagate to peers.
live_instances.get(&grandparent_hash).map(|parent_consensus| {
parent_candidates.iter().filter_map(|c| {
let para_id = c.parachain_index;
let hash = c.hash();
parent_consensus.table.extrinsic_data(&hash).map(|ex| MessagesFrom {
from: para_id,
messages: ex,
})
}).collect()
}).unwrap_or_default()
};
let duty_roster = self.client.runtime_api().duty_roster(&id)?;
let (group_info, local_duty) = make_group_info(
......@@ -279,31 +345,19 @@ impl<C, N, P> ParachainConsensus<C, N, P> where
let table = Arc::new(SharedTable::new(group_info, sign_with.clone(), parent_hash, self.extrinsic_store.clone()));
let router = self.network.communication_for(
authorities,
table.clone(),
self.handle.clone()
outgoing,
);
let validation_para = match local_duty.validation {
let drop_signal = match local_duty.validation {
Chain::Parachain(id) => Some(self.launch_work(
parent_hash,