Newer
Older
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.
use std::collections::HashSet;
asynchronous rob
committed
use std::fmt;
use futures::{future, Stream, Future, IntoFuture};
use polkadot_primitives::{
BlockId, SessionKey, Hash, Block,
parachain::{
self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic,
PoVBlock, Status as ParachainStatus,
}
};
use polkadot_cli::{
Worker, IntoExit, ProvideRuntimeApi, TaskExecutor, PolkadotService, CustomConfiguration,
ParachainHost,
asynchronous rob
committed
};
use polkadot_network::validation::{SessionParams, ValidationNetwork};
use polkadot_network::NetworkService;
use aura::AuraApi;
asynchronous rob
committed
pub use polkadot_cli::VersionInfo;
pub use polkadot_network::validation::Incoming;
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
pub use substrate_network::PeerId;
asynchronous rob
committed
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
/// An abstraction over the `Network` with useful functions for a `Collator`.
pub trait Network: Send + Sync {
/// Convert the given `CollatorId` to a `PeerId`.
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Item=Option<PeerId>, Error=()> + Send>;
/// Create a `Stream` of checked statements for the given `relay_parent`.
///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement, Error=()>>;
}
impl<P, E> Network for ValidationNetwork<P, E, NetworkService, TaskExecutor> where
P: 'static + Send + Sync,
E: 'static + Send + Sync,
{
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Item=Option<PeerId>, Error=()> + Send>
{
Box::new(Self::collator_id_to_peer_id(self, collator_id))
}
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement, Error=()>> {
Box::new(Self::checked_statements(self, relay_parent))
}
}
asynchronous rob
committed
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;
/// Collation errors.
#[derive(Debug)]
pub enum Error<R> {
/// Error on the relay-chain side of things.
Polkadot(R),
/// Error on the collator side of things.
Collator(InvalidHead),
}
impl<R: fmt::Display> fmt::Display for Error<R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
}
}
}
/// Something that can build a `ParachainContext`.
pub trait BuildParachainContext {
/// The parachain context produced by the `build` function.
type ParachainContext: self::ParachainContext;
/// Build the `ParachainContext`.
fn build(self, network: Arc<dyn Network>) -> Result<Self::ParachainContext, ()>;
}
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
type ProduceCandidate: IntoFuture<Item=(BlockData, HeadData, Extrinsic), Error=InvalidHead>;
/// Produce a candidate, given the relay parent hash, the latest ingress queue information
/// and the last parachain head.
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&self,
relay_parent: Hash,
}
/// Relay chain context needed to collate.
/// This encapsulates a network and local database which may store
/// some of the input.
pub trait RelayChainContext {
type Error: std::fmt::Debug;
/// Future that resolves to the un-routed egress queues of a parachain.
/// The first item is the oldest.
type FutureEgress: IntoFuture<Item=ConsolidatedIngress, Error=Self::Error>;
/// Get un-routed egress queues from a parachain to the local parachain.
asynchronous rob
committed
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress;
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
pub fn collate<'a, R, P>(
relay_parent: Hash,
parachain_status: ParachainStatus,
relay_context: R,
para_context: P,
key: Arc<ed25519::Pair>,
)
asynchronous rob
committed
-> impl Future<Item=parachain::Collation, Error=Error<R::Error>> + 'a
R: RelayChainContext,
R::FutureEgress: 'a,
P: ParachainContext + 'a,
<P::ProduceCandidate as IntoFuture>::Future: Send,
let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot);
ingress
.and_then(move |ingress| {
para_context.produce_candidate(
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
)
.into_future()
.map(move |x| (ingress, x))
.map_err(Error::Collator)
})
.and_then(move |(ingress, (block_data, head_data, mut extrinsic))| {
let block_data_hash = block_data.hash();
let signature = key.sign(block_data_hash.as_ref()).into();
let egress_queue_roots =
polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages);
let receipt = parachain::CandidateReceipt {
parachain_index: local_id,
collator: key.public(),
signature,
head_data,
egress_queue_roots,
fees: 0,
block_data_hash,
upward_messages: Vec::new(),
};
Ok(parachain::Collation {
receipt,
pov: PoVBlock {
block_data,
ingress,
},
})
asynchronous rob
committed
})
struct ApiContext<P, E> {
network: Arc<ValidationNetwork<P, E, NetworkService, TaskExecutor>>,
parent_hash: Hash,
authorities: Vec<SessionKey>,
}
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
P: ProvideRuntimeApi + Send + Sync,
P::Api: ParachainHost<Block>,
E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
{
type Error = String;
type FutureEgress = Box<dyn Future<Item=ConsolidatedIngress, Error=String> + Send>;
asynchronous rob
committed
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
// TODO: https://github.com/paritytech/polkadot/issues/253
//
// Fetch ingress and accumulate all unrounted egress
let _session = self.network.instantiate_session(SessionParams {
local_session_key: None,
parent_hash: self.parent_hash,
authorities: self.authorities.clone(),
}).map_err(|e| format!("unable to instantiate validation session: {:?}", e));
asynchronous rob
committed
Box::new(future::ok(ConsolidatedIngress(Vec::new())))
}
}
struct CollationNode<P, E> {
build_parachain_context: P,
exit: E,
para_id: ParaId,
key: Arc<ed25519::Pair>,
}
impl<P, E> IntoExit for CollationNode<P, E> where
E: Future<Item=(),Error=()> + Send + 'static
{
type Exit = E;
fn into_exit(self) -> Self::Exit {
self.exit
}
}
impl<P, E> Worker for CollationNode<P, E> where
P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static,
<<P::ParachainContext as ParachainContext>::ProduceCandidate as IntoFuture>::Future: Send + 'static,
E: Future<Item=(), Error=()> + Clone + Send + Sync + 'static,
type Work = Box<dyn Future<Item=(), Error=()> + Send>;
fn configuration(&self) -> CustomConfiguration {
let mut config = CustomConfiguration::default();
config.collating_for = Some((
self.para_id.clone(),
));
config
}
fn work<S>(self, service: &S, task_executor: TaskExecutor) -> Self::Work where
S: PolkadotService,
let CollationNode { build_parachain_context, exit, para_id, key } = self;
let network = service.network();
let known_oracle = client.clone();
let select_chain = if let Some(select_chain) = service.select_chain() {
select_chain
} else {
info!("The node cannot work because it can't select chain.");
return Box::new(future::err(()));
};
let message_validator = polkadot_network::gossip::register_validator(
asynchronous rob
committed
network.clone(),
move |block_hash: &Hash| {
use polkadot_network::gossip::Known;
match known_oracle.block_status(&BlockId::hash(*block_hash)) {
Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
Ok(BlockStatus::KnownBad) => Some(Known::Bad),
Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
Err(_) => None,
Ok(leaves) => if leaves.contains(block_hash) {
Some(Known::Leaf)
} else {
Some(Known::Old)
},
}
let validation_network = Arc::new(ValidationNetwork::new(
network.clone(),
exit.clone(),
message_validator,
client.clone(),
task_executor,
let parachain_context = build_parachain_context.build(validation_network.clone()).unwrap();
let inner_exit = exit.clone();
let work = client.import_notification_stream()
.for_each(move |notification| {
macro_rules! try_fr {
($e:expr) => {
match $e {
Ok(x) => x,
Err(e) => return future::Either::A(future::err(Error::Polkadot(
format!("{:?}", e)
))),
}
let relay_parent = notification.hash;
let id = BlockId::hash(relay_parent);
let network = network.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();
let validation_network = validation_network.clone();
let work = future::lazy(move || {
let status = match try_fr!(api.parachain_status(&id, para_id)) {
Some(status) => status,
None => return future::Either::A(future::ok(())),
};
let authorities = try_fr!(api.authorities(&id));
let targets = compute_targets(
para_id,
authorities.as_slice(),
try_fr!(api.duty_roster(&id)),
);
let context = ApiContext {
network: validation_network,
parent_hash: relay_parent,
authorities,
};
let collation_work = collate(
parachain_context,
key,
).map(move |collation| {
network.with_spec(move |spec, ctx| spec.add_local_collation(
ctx,
relay_parent,
targets,
collation,
));
});
future::Either::B(collation_work)
});
let deadlined = Timeout::new(work, COLLATION_TIMEOUT);
let silenced = deadlined.then(|res| match res {
Ok(()) => Ok(()),
Err(_) => {
warn!("Collation failure: timeout");
tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(())));
Ok(())
});
let work_and_exit = work.select(exit).then(|_| Ok(()));
Box::new(work_and_exit) as Box<_>
}
}
fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet<SessionKey> {
use polkadot_primitives::parachain::Chain;
roster.validator_duty.iter().enumerate()
.filter(|&(_, c)| c == &Chain::Parachain(para_id))
.filter_map(|(i, _)| session_keys.get(i))
.cloned()
.collect()
}
/// Run a collator node with the given `RelayChainContext` and `ParachainContext`
/// build by the given `BuildParachainContext` and arguments to the underlying polkadot node.
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
asynchronous rob
committed
pub fn run_collator<P, E, I, ArgT>(
build_parachain_context: P,
para_id: ParaId,
exit: E,
key: Arc<ed25519::Pair>,
asynchronous rob
committed
args: I,
) -> polkadot_cli::error::Result<()> where
P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static,
<<P::ParachainContext as ParachainContext>::ProduceCandidate as IntoFuture>::Future: Send + 'static,
E: IntoFuture<Item=(), Error=()>,
E::Future: Send + Clone + Sync + 'static,
asynchronous rob
committed
I: IntoIterator<Item=ArgT>,
ArgT: Into<std::ffi::OsString> + Clone,
let node_logic = CollationNode { build_parachain_context, exit: exit.into_future(), para_id, key };
use std::collections::HashMap;
use polkadot_primitives::parachain::{OutgoingMessage, FeeSchedule};
use keyring::AuthorityKeyring;
#[derive(Default, Clone)]
struct DummyRelayChainContext {
ingress: HashMap<ParaId, ConsolidatedIngress>
impl RelayChainContext for DummyRelayChainContext {
type FutureEgress = Box<dyn Future<Item=ConsolidatedIngress,Error=()>>;
fn unrouted_egress(&self, para_id: ParaId) -> Self::FutureEgress {
match self.ingress.get(¶_id) {
Some(ingress) => Box::new(future::ok(ingress.clone())),
None => Box::new(future::empty()),
}
#[derive(Clone)]
struct DummyParachainContext;
impl ParachainContext for DummyParachainContext {
type ProduceCandidate = Result<(BlockData, HeadData, Extrinsic), InvalidHead>;
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&self,
_relay_parent: Hash,
ingress: I,
) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> {
// send messages right back.
Ok((
BlockData(vec![1, 2, 3, 4, 5,]),
HeadData(vec![9, 9, 9]),
Extrinsic {
outgoing_messages: ingress.into_iter().map(|(id, msg)| OutgoingMessage {
target: id,
data: msg.0,
}).collect(),
}
))
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
fn collates_correct_queue_roots() {
let mut context = DummyRelayChainContext::default();
let id = ParaId::from(100);
let a = ParaId::from(123);
let b = ParaId::from(456);
let messages_from_a = vec![
Message(vec![1, 1, 1]),
Message(b"helloworld".to_vec()),
];
let messages_from_b = vec![
Message(b"dogglesworth".to_vec()),
Message(b"buy_1_chili_con_carne_here_is_my_cash".to_vec()),
];
let root_a = ::polkadot_validation::message_queue_root(
messages_from_a.iter().map(|msg| &msg.0)
);
let root_b = ::polkadot_validation::message_queue_root(
messages_from_b.iter().map(|msg| &msg.0)
);
context.ingress.insert(id, ConsolidatedIngress(vec![
(b, messages_from_b),
(a, messages_from_a),
]));
let collation = collate(
Default::default(),
ParachainStatus {
head_data: HeadData(vec![5]),
balance: 10,
fee_schedule: FeeSchedule {
base: 0,
per_byte: 1,
},
},
context.clone(),
DummyParachainContext,
AuthorityKeyring::Alice.pair().into(),
).wait().unwrap();
// ascending order by root.
assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]);