Commit 4711e1e0 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Collator for the "adder" (formerly basic-add) parachain and various small fixes (#438)

* update basic_add wasm

* wasm feature and collator feature

* move test parachains around a little

* fix wasm build for basic_add

* move basic_add to adder, introduce README

* minimal basic_add collator

* ensure collator messages are sent in the right order

* more logging

* route consensus statements to all peers

* minor bugfixes for parachains

* genesis builder accounts for parachain heads

* fix parachains tests

* targets for txpool

* tweak runtime + collator

* fix version in adder-collator

* consistency for overflowing

* adjust comment

* fix stable test run

* remove dummy registration test

* final grumbles
parent f40ef3b7
......@@ -60,6 +60,7 @@ extern crate polkadot_primitives;
extern crate log;
use std::collections::{BTreeSet, BTreeMap, HashSet};
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
......@@ -68,12 +69,36 @@ use client::BlockchainEvents;
use polkadot_api::PolkadotApi;
use polkadot_primitives::{AccountId, BlockId, SessionKey};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId};
use polkadot_cli::{ServiceComponents, Service, CustomConfiguration, VersionInfo};
use polkadot_cli::{ServiceComponents, Service, CustomConfiguration};
use polkadot_cli::{Worker, IntoExit};
use tokio::timer::Deadline;
pub use polkadot_cli::VersionInfo;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;
/// Collation errors.
#[derive(Debug)]
pub enum Error<R> {
/// Error on the relay-chain side of things.
Polkadot(R),
/// Error on the collator side of things.
Collator(InvalidHead),
}
impl<R: fmt::Display> fmt::Display for Error<R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
}
}
}
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
......@@ -84,7 +109,7 @@ pub trait ParachainContext: Clone {
&self,
last_head: HeadData,
ingress: I,
) -> (BlockData, HeadData);
) -> Result<(BlockData, HeadData), InvalidHead>;
}
/// Relay chain context needed to collate.
......@@ -154,18 +179,18 @@ pub fn collate<'a, R, P>(
para_context: P,
key: Arc<ed25519::Pair>,
)
-> impl Future<Item=parachain::Collation, Error=R::Error> + 'a
-> impl Future<Item=parachain::Collation, Error=Error<R::Error>> + 'a
where
R: RelayChainContext + 'a,
R::Error: 'a,
R::FutureEgress: 'a,
P: ParachainContext + 'a,
{
collate_ingress(relay_context).map(move |ingress| {
collate_ingress(relay_context).map_err(Error::Polkadot).and_then(move |ingress| {
let (block_data, head_data) = para_context.produce_candidate(
last_head,
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
);
).map_err(Error::Collator)?;
let block_data_hash = block_data.hash();
let signature = key.sign(&block_data_hash.0[..]).into();
......@@ -181,10 +206,10 @@ pub fn collate<'a, R, P>(
block_data_hash,
};
parachain::Collation {
Ok(parachain::Collation {
receipt,
block_data,
}
})
})
}
......@@ -248,7 +273,7 @@ impl<P, E> Worker for CollationNode<P, E> where
($e:expr) => {
match $e {
Ok(x) => x,
Err(e) => return future::Either::A(future::err(e)),
Err(e) => return future::Either::A(future::err(Error::Polkadot(e))),
}
}
}
......@@ -323,17 +348,19 @@ fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRos
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
pub fn run_collator<P, E>(
pub fn run_collator<P, E, I, ArgT>(
parachain_context: P,
para_id: ParaId,
exit: E,
key: Arc<ed25519::Pair>,
args: Vec<::std::ffi::OsString>,
args: I,
version: VersionInfo,
) -> polkadot_cli::error::Result<()> where
P: ParachainContext + Send + 'static,
E: IntoFuture<Item=(),Error=()>,
E::Future: Send + Clone + 'static,
I: IntoIterator<Item=ArgT>,
ArgT: Into<std::ffi::OsString> + Clone,
{
let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key };
polkadot_cli::run(args, node_logic, version)
......
......@@ -273,8 +273,13 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
sign_with.public().into(),
)?;
info!("Starting consensus session on top of parent {:?}. Local parachain duty is {:?}",
parent_hash, local_duty.validation);
let active_parachains = self.client.active_parachains(&id)?;
debug!(target: "consensus", "Active parachains: {:?}", active_parachains);
let n_parachains = active_parachains.len();
let table = Arc::new(SharedTable::new(group_info, sign_with.clone(), parent_hash));
let (router, input, output) = self.network.communication_for(
......
......@@ -143,6 +143,7 @@ impl SharedTableInner {
fetch_block_data,
fetch_extrinsic,
evaluate: checking_validity,
ensure_available: checking_availability,
})
}
}
......@@ -206,6 +207,7 @@ struct Work<D: Future, E: Future> {
fetch_block_data: future::Fuse<D>,
fetch_extrinsic: Option<future::Fuse<E>>,
evaluate: bool,
ensure_available: bool,
}
/// Primed statement producer.
......@@ -235,31 +237,35 @@ impl<D, E, C, Err> Future for PrimedStatementProducer<D, E, C>
});
let hash = work.candidate_receipt.hash();
debug!(target: "consensus", "Making validity statement about candidate {}: is_good? {:?}", hash, is_good);
self.inner.produced_statements.validity = match is_good {
Some(true) => Some(GenericStatement::Valid(hash)),
Some(false) => Some(GenericStatement::Invalid(hash)),
None => None,
};
}
}
if let Some(ref mut fetch_extrinsic) = work.fetch_extrinsic {
if let Async::Ready(extrinsic) = fetch_extrinsic.poll()? {
self.inner.produced_statements.extrinsic = Some(extrinsic);
work.evaluate = false;
}
}
let done = self.inner.produced_statements.block_data.is_some() && {
if work.evaluate {
true
} else if self.inner.produced_statements.extrinsic.is_some() {
if let Async::Ready(Some(extrinsic)) = work.fetch_extrinsic.poll()? {
if work.ensure_available {
let hash = work.candidate_receipt.hash();
debug!(target: "consensus", "Claiming candidate {} available.", hash);
// TODO: actually wait for block data and then ensure availability.
self.inner.produced_statements.extrinsic = Some(extrinsic);
self.inner.produced_statements.availability =
Some(GenericStatement::Available(work.candidate_receipt.hash()));
Some(GenericStatement::Available(hash));
true
} else {
false
work.ensure_available = false;
}
}
let done = match (work.evaluate, work.ensure_available) {
(false, false) => true,
_ => false,
};
if done {
......@@ -356,10 +362,25 @@ impl SharedTable {
}
/// Sign and import a local statement.
pub fn sign_and_import(&self, statement: table::Statement) -> SignedStatement {
let proposed_digest = match statement {
GenericStatement::Candidate(ref c) => Some(c.hash()),
_ => None,
///
/// For candidate statements, this may also produce a second signed statement
/// concerning the availability of the candidate data.
pub fn sign_and_import(&self, statement: table::Statement)
-> (SignedStatement, Option<SignedStatement>)
{
let (proposed_digest, availability) = match statement {
GenericStatement::Candidate(ref c) => {
let mut availability = None;
let hash = c.hash();
// TODO: actually store the data in an availability store of some kind.
if self.context.is_availability_guarantor_of(&self.context.local_id(), &c.parachain_index) {
availability = Some(self.context.sign_statement(GenericStatement::Available(hash)));
}
(Some(hash), availability)
}
_ => (None, None),
};
let signed_statement = self.context.sign_statement(statement);
......@@ -370,7 +391,13 @@ impl SharedTable {
}
inner.table.import_statement(&*self.context, signed_statement.clone());
signed_statement
// ensure the availability statement is imported after the candidate.
if let Some(a) = availability.clone() {
inner.table.import_statement(&*self.context, a);
}
(signed_statement, availability)
}
/// Execute a closure using a specific candidate.
......@@ -543,5 +570,6 @@ mod tests {
assert!(producer.work.fetch_extrinsic.is_some(), "should fetch extrinsic when guaranteeing availability");
assert!(!producer.work.evaluate, "should not evaluate validity");
assert!(producer.work.ensure_available);
}
}
......@@ -176,6 +176,7 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> MessageProcessTask<P> {
}
}
ConsensusMessage::ChainSpecific(msg, _) => {
debug!(target: "consensus", "Processing consensus statement for live consensus");
if let Some(Message::Statement(parent_hash, statement)) = Decode::decode(&mut msg.as_slice()) {
if ::polkadot_consensus::check_statement(&statement.statement, &statement.signature, statement.sender, &parent_hash) {
self.table_router.import_statement(statement);
......
......@@ -111,10 +111,36 @@ struct BlockDataRequest {
sender: oneshot::Sender<BlockData>,
}
// ensures collator-protocol messages are sent in correct order.
// session key must be sent before collator role.
enum CollatorState {
Fresh,
RolePending(Role),
Primed,
}
impl CollatorState {
fn send_key<F: FnMut(Message)>(&mut self, key: SessionKey, mut f: F) {
f(Message::SessionKey(key));
if let CollatorState::RolePending(role) = ::std::mem::replace(self, CollatorState::Primed) {
f(Message::CollatorRole(role));
}
}
fn set_role<F: FnMut(Message)>(&mut self, role: Role, mut f: F) {
if let CollatorState::Primed = *self {
f(Message::CollatorRole(role));
} else {
*self = CollatorState::RolePending(role);
}
}
}
struct PeerInfo {
collating_for: Option<(AccountId, ParaId)>,
validator_key: Option<SessionKey>,
claimed_validator: bool,
collator_state: CollatorState,
}
#[derive(Default)]
......@@ -281,8 +307,8 @@ impl PolkadotProtocol {
}
}
/// Send a statement to a validator.
fn send_statement(&mut self, ctx: &mut Context<Block>, _val: SessionKey, parent_hash: Hash, statement: SignedStatement) {
/// Gossip a consensus statement.
fn gossip_statement(&mut self, ctx: &mut Context<Block>, parent_hash: Hash, statement: SignedStatement) {
// TODO: something more targeted than gossip.
let raw = Message::Statement(parent_hash, statement).encode();
self.consensus_gossip.multicast_chain_specific(ctx, raw, parent_hash);
......@@ -309,14 +335,14 @@ impl PolkadotProtocol {
let old_data = self.live_consensus.as_ref().map(|c| (c.parent_hash, c.local_session_key));
if Some(&consensus.local_session_key) != old_data.as_ref().map(|&(_, ref key)| key) {
for (id, _) in self.peers.iter()
for (id, peer_data) in self.peers.iter_mut()
.filter(|&(_, ref info)| info.claimed_validator || info.collating_for.is_some())
{
send_polkadot_message(
peer_data.collator_state.send_key(consensus.local_session_key, |msg| send_polkadot_message(
ctx,
*id,
Message::SessionKey(consensus.local_session_key)
);
msg
));
}
}
......@@ -452,12 +478,15 @@ impl PolkadotProtocol {
}
};
debug!(target: "p_net", "New collator role {:?} from {}", role, who);
match info.validator_key {
None => ctx.report_peer(
who,
Severity::Bad("Sent collator role without registering first as validator"),
),
Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) {
debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent);
send_polkadot_message(
ctx,
who,
......@@ -481,38 +510,41 @@ impl Specialization<Block> for PolkadotProtocol {
}
};
let validator = status.roles.contains(substrate_network::Roles::AUTHORITY);
let send_key = validator || local_status.collating_for.is_some();
let mut peer_info = PeerInfo {
collating_for: local_status.collating_for,
validator_key: None,
claimed_validator: validator,
collator_state: CollatorState::Fresh,
};
if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
if self.collator_peer_id(acc_id.clone()).is_some() {
if self.collator_peer(acc_id.clone()).is_some() {
ctx.report_peer(who, Severity::Useless("Unknown Polkadot-specific reason"));
return
}
let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
send_polkadot_message(
peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message(
ctx,
who,
Message::CollatorRole(collator_role),
);
msg,
));
}
let validator = status.roles.contains(substrate_network::Roles::AUTHORITY);
let send_key = validator || local_status.collating_for.is_some();
self.peers.insert(who, PeerInfo {
collating_for: local_status.collating_for,
validator_key: None,
claimed_validator: validator,
});
self.consensus_gossip.new_peer(ctx, who, status.roles);
if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) {
send_polkadot_message(
peer_info.collator_state.send_key(consensus.local_session_key, |msg| send_polkadot_message(
ctx,
who,
Message::SessionKey(consensus.local_session_key)
);
msg,
));
}
self.peers.insert(who, peer_info);
self.consensus_gossip.new_peer(ctx, who, status.roles);
self.dispatch_pending_requests(ctx);
}
......@@ -520,14 +552,14 @@ impl Specialization<Block> for PolkadotProtocol {
if let Some(info) = self.peers.remove(&who) {
if let Some((acc_id, _)) = info.collating_for {
let new_primary = self.collators.on_disconnect(acc_id)
.and_then(|new_primary| self.collator_peer_id(new_primary));
.and_then(|new_primary| self.collator_peer(new_primary));
if let Some(new_primary) = new_primary {
send_polkadot_message(
if let Some((new_primary, primary_info)) = new_primary {
primary_info.collator_state.set_role(Role::Primary, |msg| send_polkadot_message(
ctx,
new_primary,
Message::CollatorRole(Role::Primary),
)
msg,
));
}
}
......@@ -592,12 +624,12 @@ impl Specialization<Block> for PolkadotProtocol {
for collator_action in self.collators.maintain_peers() {
match collator_action {
Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator),
Action::NewRole(account_id, role) => if let Some(collator) = self.collator_peer_id(account_id) {
send_polkadot_message(
Action::NewRole(account_id, role) => if let Some((collator, info)) = self.collator_peer(account_id) {
info.collator_state.set_role(role, |msg| send_polkadot_message(
ctx,
collator,
Message::CollatorRole(role),
)
msg,
))
},
}
}
......@@ -622,6 +654,7 @@ impl PolkadotProtocol {
Some((ref acc_id, ref para_id)) => {
let structurally_valid = para_id == &collation_para && acc_id == &collated_acc;
if structurally_valid && collation.receipt.check_signature().is_ok() {
debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from);
self.collators.on_collation(acc_id.clone(), relay_parent, collation)
} else {
ctx.report_peer(from, Severity::Bad("Sent malformed collation"))
......@@ -633,27 +666,28 @@ impl PolkadotProtocol {
fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver<Collation> {
let (tx, rx) = oneshot::channel();
debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
self.collators.await_collation(relay_parent, para_id, tx);
rx
}
// get connected peer with given account ID for collation.
fn collator_peer_id(&self, account_id: AccountId) -> Option<NodeIndex> {
fn collator_peer(&mut self, account_id: AccountId) -> Option<(NodeIndex, &mut PeerInfo)> {
let check_info = |info: &PeerInfo| info
.collating_for
.as_ref()
.map_or(false, |&(ref acc_id, _)| acc_id == &account_id);
self.peers
.iter()
.filter(|&(_, info)| check_info(info))
.map(|(who, _)| *who)
.iter_mut()
.filter(|&(_, ref info)| check_info(&**info))
.map(|(who, info)| (*who, info))
.next()
}
// disconnect a collator by account-id.
fn disconnect_bad_collator(&self, ctx: &mut Context<Block>, account_id: AccountId) {
if let Some(who) = self.collator_peer_id(account_id) {
fn disconnect_bad_collator(&mut self, ctx: &mut Context<Block>, account_id: AccountId) {
if let Some((who, _)) = self.collator_peer(account_id) {
ctx.report_peer(who, Severity::Bad("Consensus layer determined the given collator misbehaved"))
}
}
......@@ -668,13 +702,19 @@ impl PolkadotProtocol {
targets: HashSet<SessionKey>,
collation: Collation,
) {
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
relay_parent, collation.receipt.parachain_index);
for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
match self.validators.get(&primary) {
Some(who) => send_polkadot_message(
ctx,
*who,
Message::Collation(relay_parent, cloned_collation),
),
Some(who) => {
debug!(target: "p_net", "Sending local collation to {:?}", primary);
send_polkadot_message(
ctx,
*who,
Message::Collation(relay_parent, cloned_collation),
)
},
None =>
warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary),
}
......
......@@ -25,7 +25,7 @@
use polkadot_api::{PolkadotApi, LocalPolkadotApi};
use polkadot_consensus::{SharedTable, TableRouter, SignedStatement, GenericStatement, StatementProducer};
use polkadot_primitives::{Hash, BlockId, SessionKey};
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt, Id as ParaId};
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt};
use futures::prelude::*;
use tokio::runtime::TaskExecutor;
......@@ -89,14 +89,16 @@ impl<P: PolkadotApi> Clone for Router<P> {
impl<P: LocalPolkadotApi + Send + Sync + 'static> Router<P> {
/// Import a statement whose signature has been checked already.
pub(crate) fn import_statement(&self, statement: SignedStatement) {
trace!(target: "p_net", "importing consensus statement {:?}", statement.statement);
// defer any statements for which we haven't imported the candidate yet
let (c_hash, parachain_index) = {
let c_hash = {
let candidate_data = match statement.statement {
GenericStatement::Candidate(ref c) => Some((c.hash(), c.parachain_index)),
GenericStatement::Candidate(ref c) => Some(c.hash()),
GenericStatement::Valid(ref hash)
| GenericStatement::Invalid(ref hash)
| GenericStatement::Available(ref hash)
=> self.table.with_candidate(hash, |c| c.map(|c| (*hash, c.parachain_index))),
=> self.table.with_candidate(hash, |c| c.map(|_| *hash)),
};
match candidate_data {
Some(x) => x,
......@@ -115,6 +117,7 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Router<P> {
};
// prepend the candidate statement.
debug!(target: "consensus", "Importing statements about candidate {:?}", c_hash);
statements.insert(0, statement);
let producers: Vec<_> = self.table.import_remote_statements(
self,
......@@ -122,17 +125,16 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Router<P> {
);
// dispatch future work as necessary.
for (producer, statement) in producers.into_iter().zip(statements) {
let producer = match producer {
Some(p) => p,
None => continue, // statement redundant
};
self.knowledge.lock().note_statement(statement.sender, &statement.statement);
self.dispatch_work(c_hash, producer, parachain_index);
if let Some(producer) = producer {
trace!(target: "consensus", "driving statement work to completion");
self.dispatch_work(c_hash, producer);
}
}
}
fn dispatch_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>, parachain: ParaId) where
fn dispatch_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>) where
D: Future<Item=BlockData,Error=()> + Send + 'static,
E: Future<Item=Extrinsic,Error=()> + Send + 'static,
{
......@@ -160,13 +162,13 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Router<P> {
// propagate the statements
if let Some(validity) = produced.validity {
let signed = table.sign_and_import(validity.clone());
route_statement(&*network, &*table, parachain, parent_hash, signed);
let signed = table.sign_and_import(validity.clone()).0;
network.with_spec(|spec, ctx| spec.gossip_statement(ctx, parent_hash, signed));
}
if let Some(availability) = produced.availability {
let signed = table.sign_and_import(availability);
route_statement(&*network, &*table, parachain, parent_hash, signed);
let signed = table.sign_and_import(availability).0;
network.with_spec(|spec, ctx| spec.gossip_statement(ctx, parent_hash, signed));
}
});
......@@ -182,11 +184,15 @@ impl<P: LocalPolkadotApi + Send> TableRouter for Router<P> {
fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) {
// give to network to make available.
let hash = receipt.hash();
let para_id = receipt.parachain_index;
let signed = self.table.sign_and_import(GenericStatement::Candidate(receipt));
let (candidate, availability) = self.table.sign_and_import(GenericStatement::Candidate(receipt));
self.knowledge.lock().note_candidate(hash, Some(block_data), Some(extrinsic));
route_statement(&*self.network, &*self.table, para_id, self.parent_hash, signed);
self.network.with_spec(|spec, ctx| {
spec.gossip_statement(ctx, self.parent_hash, candidate);
if let Some(availability) = availability {
spec.gossip_statement(ctx, self.parent_hash, availability);
}
});
}
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver {
......@@ -217,32 +223,6 @@ impl Future for BlockDataReceiver {
}
}
// get statement to relevant validators.
fn route_statement(network: &NetworkService, table: &SharedTable, para_id: ParaId, parent_hash: Hash, statement: SignedStatement) {
let broadcast = |i: &mut Iterator<Item=&SessionKey>| {