Newer
Older
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.
extern crate futures;
extern crate substrate_client as client;
extern crate parity_codec as codec;
extern crate substrate_primitives as primitives;
use std::collections::{BTreeSet, BTreeMap, HashSet};
asynchronous rob
committed
use std::fmt;
use futures::{future, stream, Stream, Future, IntoFuture};
use client::BlockchainEvents;
use primitives::ed25519;
use polkadot_primitives::{AccountId, BlockId, SessionKey};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId};
use polkadot_cli::{PolkadotService, CustomConfiguration, CoreApi, ParachainHost};
use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi};
use tokio::timer::Timeout;
asynchronous rob
committed
pub use polkadot_cli::VersionInfo;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
asynchronous rob
committed
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;
/// Collation errors.
#[derive(Debug)]
pub enum Error<R> {
/// Error on the relay-chain side of things.
Polkadot(R),
/// Error on the collator side of things.
Collator(InvalidHead),
}
impl<R: fmt::Display> fmt::Display for Error<R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
}
}
}
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
/// Produce a candidate, given the latest ingress queue information and the last parachain head.
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&self,
asynchronous rob
committed
) -> Result<(BlockData, HeadData), InvalidHead>;
}
/// Relay chain context needed to collate.
/// This encapsulates a network and local database which may store
/// some of the input.
pub trait RelayChainContext {
type Error;
/// Future that resolves to the un-routed egress queues of a parachain.
/// The first item is the oldest.
type FutureEgress: IntoFuture<Item=Vec<Vec<Message>>, Error=Self::Error>;
/// Provide a set of all parachains meant to be routed to at a block.
fn routing_parachains(&self) -> BTreeSet<ParaId>;
/// Get un-routed egress queues from a parachain to the local parachain.
fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress;
}
fn key_to_account_id(key: &ed25519::Pair) -> AccountId {
let pubkey_bytes: [u8; 32] = key.public().into();
pubkey_bytes.into()
}
/// Collate the necessary ingress queue using the given context.
pub fn collate_ingress<'a, R>(relay_context: R)
-> impl Future<Item=ConsolidatedIngress, Error=R::Error> + 'a
where
R: RelayChainContext,
R::Error: 'a,
R::FutureEgress: 'a,
{
let mut egress_fetch = Vec::new();
for routing_parachain in relay_context.routing_parachains() {
let fetch = relay_context
.unrouted_egress(routing_parachain)
.into_future()
.map(move |egresses| (routing_parachain, egresses));
egress_fetch.push(fetch);
}
// create a map ordered first by the depth of the egress queue
// and then by the parachain ID.
//
// then transform that into the consolidated egress queue.
stream::futures_unordered(egress_fetch)
.fold(BTreeMap::new(), |mut map, (routing_id, egresses)| {
for (depth, egress) in egresses.into_iter().rev().enumerate() {
let depth = -(depth as i64);
map.insert((depth, routing_id), egress);
}
Ok(map)
})
.map(|ordered| ordered.into_iter().map(|((_, id), egress)| (id, egress)))
.map(|i| i.collect::<Vec<_>>())
.map(ConsolidatedIngress)
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
pub fn collate<'a, R, P>(
local_id: ParaId,
last_head: HeadData,
relay_context: R,
para_context: P,
key: Arc<ed25519::Pair>,
)
asynchronous rob
committed
-> impl Future<Item=parachain::Collation, Error=Error<R::Error>> + 'a
R: RelayChainContext + 'a,
R::Error: 'a,
R::FutureEgress: 'a,
P: ParachainContext + 'a,
{
asynchronous rob
committed
collate_ingress(relay_context).map_err(Error::Polkadot).and_then(move |ingress| {
let (block_data, head_data) = para_context.produce_candidate(
last_head,
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
asynchronous rob
committed
).map_err(Error::Collator)?;
let block_data_hash = block_data.hash();
let signature = key.sign(block_data_hash.as_ref()).into();
let receipt = parachain::CandidateReceipt {
collator: key_to_account_id(&*key),
signature,
head_data,
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 0,
asynchronous rob
committed
Ok(parachain::Collation {
asynchronous rob
committed
})
})
/// Polkadot-api context.
struct ApiContext;
impl RelayChainContext for ApiContext {
type FutureEgress = Result<Vec<Vec<Message>>, Self::Error>;
fn routing_parachains(&self) -> BTreeSet<ParaId> {
BTreeSet::new()
}
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
Ok(Vec::new())
}
}
struct CollationNode<P, E> {
parachain_context: P,
exit: E,
para_id: ParaId,
key: Arc<ed25519::Pair>,
}
impl<P, E> IntoExit for CollationNode<P, E> where
P: ParachainContext + Send + 'static,
E: Future<Item=(),Error=()> + Send + 'static
{
type Exit = E;
fn into_exit(self) -> Self::Exit {
self.exit
}
}
impl<P, E> Worker for CollationNode<P, E> where
P: ParachainContext + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static
type Work = Box<Future<Item=(),Error=()> + Send>;
fn configuration(&self) -> CustomConfiguration {
let mut config = CustomConfiguration::default();
config.collating_for = Some((
key_to_account_id(&*self.key),
self.para_id.clone(),
));
config
}
fn work<S>(self, service: &S) -> Self::Work
where S: PolkadotService,
{
let CollationNode { parachain_context, exit, para_id, key } = self;
let client = service.client();
let network = service.network();
let inner_exit = exit.clone();
let work = client.import_notification_stream()
.for_each(move |notification| {
macro_rules! try_fr {
($e:expr) => {
match $e {
Ok(x) => x,
asynchronous rob
committed
Err(e) => return future::Either::A(future::err(Error::Polkadot(e))),
}
let relay_parent = notification.hash;
let id = BlockId::hash(relay_parent);
let network = network.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();
let work = future::lazy(move || {
let last_head = match try_fr!(api.parachain_head(&id, para_id)) {
Some(last_head) => last_head,
None => return future::Either::A(future::ok(())),
};
let targets = compute_targets(
para_id,
try_fr!(api.authorities(&id)).as_slice(),
try_fr!(api.duty_roster(&id)),
);
let collation_work = collate(
para_id,
HeadData(last_head),
ApiContext,
parachain_context,
key,
).map(move |collation| {
network.with_spec(|spec, ctx| spec.add_local_collation(
ctx,
relay_parent,
targets,
collation,
));
});
future::Either::B(collation_work)
});
let deadlined = Timeout::new(work, COLLATION_TIMEOUT);
let silenced = deadlined.then(|res| match res {
Ok(()) => Ok(()),
Err(_) => {
warn!("Collation failure: timeout");
tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(())));
Ok(())
});
let work_and_exit = work.select(exit).then(|_| Ok(()));
Box::new(work_and_exit) as Box<_>
}
}
fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet<SessionKey> {
use polkadot_primitives::parachain::Chain;
roster.validator_duty.iter().enumerate()
.filter(|&(_, c)| c == &Chain::Parachain(para_id))
.filter_map(|(i, _)| session_keys.get(i))
.cloned()
.collect()
}
/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
asynchronous rob
committed
pub fn run_collator<P, E, I, ArgT>(
parachain_context: P,
para_id: ParaId,
exit: E,
key: Arc<ed25519::Pair>,
asynchronous rob
committed
args: I,
) -> polkadot_cli::error::Result<()> where
P: ParachainContext + Send + 'static,
asynchronous rob
committed
I: IntoIterator<Item=ArgT>,
ArgT: Into<std::ffi::OsString> + Clone,
{
let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key };
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
#[cfg(test)]
mod tests {
use super::*;
use std::collections::{HashMap, BTreeSet};
use futures::Future;
use polkadot_primitives::parachain::{Message, Id as ParaId};
pub struct DummyRelayChainCtx {
egresses: HashMap<ParaId, Vec<Vec<Message>>>,
currently_routing: BTreeSet<ParaId>,
}
impl RelayChainContext for DummyRelayChainCtx {
type Error = ();
type FutureEgress = Result<Vec<Vec<Message>>, ()>;
fn routing_parachains(&self) -> BTreeSet<ParaId> {
self.currently_routing.clone()
}
fn unrouted_egress(&self, id: ParaId) -> Result<Vec<Vec<Message>>, ()> {
Ok(self.egresses.get(&id).cloned().unwrap_or_default())
}
}
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
fn collates_ingress() {
let route_from = |x: &[ParaId]| {
let mut set = BTreeSet::new();
set.extend(x.iter().cloned());
set
};
let message = |x: Vec<u8>| vec![Message(x)];
let dummy_ctx = DummyRelayChainCtx {
currently_routing: route_from(&[2.into(), 3.into()]),
egresses: vec![
// egresses for `2`: last routed successfully 5 blocks ago.
(2.into(), vec![
message(vec![1, 2, 3]),
message(vec![4, 5, 6]),
message(vec![7, 8]),
message(vec![10]),
message(vec![12]),
]),
// egresses for `3`: last routed successfully 3 blocks ago.
(3.into(), vec![
message(vec![9]),
message(vec![11]),
message(vec![13]),
]),
].into_iter().collect(),
};
assert_eq!(
collate_ingress(dummy_ctx).wait().unwrap(),
ConsolidatedIngress(vec![
(2.into(), message(vec![1, 2, 3])),
(2.into(), message(vec![4, 5, 6])),
(2.into(), message(vec![7, 8])),
(3.into(), message(vec![9])),
(2.into(), message(vec![10])),
(3.into(), message(vec![11])),
(2.into(), message(vec![12])),
(3.into(), message(vec![13])),
]
))
}
}