Commit 465e2c3c authored by Arkadiy Paronyan's avatar Arkadiy Paronyan Committed by Gav Wood
Browse files

Consensus message buffering and more (#114)

* CLI options and keystore integration

* Replace multiqueue with future::mpsc

* BFT gossip

* Revert to app_dirs

* generate_from_seed commented

* Refactor event loop

* Start consensus by timer

* Message buffering

* Minor fixes

* Work around duty-roster issue.

* some more minor fixes

* fix compilation

* more consistent formatting

* make bft input stream never conclude

* Minor fixes

* add timestamp module to executive

* more cleanups and logging

* Fixed message propagation
parent b3f026f1
......@@ -10,6 +10,7 @@ polkadot-runtime = { path = "../runtime" }
polkadot-primitives = { path = "../primitives" }
substrate-codec = { path = "../../substrate/codec" }
substrate-runtime-io = { path = "../../substrate/runtime-io" }
substrate-runtime-executive = { path = "../../substrate/runtime/executive" }
substrate-client = { path = "../../substrate/client" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-executor = { path = "../../substrate/executor" }
......
......@@ -24,6 +24,7 @@ extern crate substrate_codec as codec;
extern crate substrate_runtime_io as runtime_io;
extern crate substrate_client as client;
extern crate substrate_executor as substrate_executor;
extern crate substrate_runtime_executive;
extern crate substrate_primitives;
extern crate substrate_state_machine as state_machine;
......@@ -323,16 +324,19 @@ impl<S: state_machine::Backend> BlockBuilder for ClientBlockBuilder<S>
}
fn bake(mut self) -> Block {
use substrate_runtime_executive::extrinsics_root;
let mut ext = state_machine::Ext {
overlay: &mut self.changes,
backend: &self.state,
};
let final_header = ::substrate_executor::with_native_environment(
let mut final_header = ::substrate_executor::with_native_environment(
&mut ext,
move || runtime::Executive::finalise_block()
).expect("all inherent extrinsics pushed; all other extrinsics executed correctly; qed");
final_header.extrinsics_root = extrinsics_root::<runtime_io::BlakeTwo256, _>(&self.extrinsics);
Block {
header: final_header,
extrinsics: self.extrinsics,
......@@ -404,6 +408,7 @@ mod tests {
let block = block_builder.bake();
assert_eq!(block.header.number, 1);
assert!(block.header.extrinsics_root != Default::default());
}
#[test]
......
......@@ -12,7 +12,7 @@ log = "0.3"
hex-literal = "0.1"
triehash = "0.1"
ed25519 = { path = "../../substrate/ed25519" }
app_dirs = "1.2.1"
app_dirs = "1.2"
substrate-client = { path = "../../substrate/client" }
substrate-codec = { path = "../../substrate/codec" }
substrate-runtime-io = { path = "../../substrate/runtime-io" }
......
......@@ -43,6 +43,7 @@ pub mod error;
use std::path::{Path, PathBuf};
use std::net::SocketAddr;
use std::sync::mpsc;
/// Parse command line arguments and start the node.
///
......@@ -52,7 +53,7 @@ use std::net::SocketAddr;
/// 9556-9591 Unassigned
/// 9803-9874 Unassigned
/// 9926-9949 Unassigned
pub fn run<I, T>(args: I) -> error::Result<()> where
pub fn run<I, T>(args: I, exit: mpsc::Receiver<()>) -> error::Result<()> where
I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone,
{
......@@ -116,9 +117,9 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
address.set_port(rpc_port);
}
let handler = rpc::rpc_handler(service.client());
let server = rpc::start_http(&address, handler)?;
let _server = rpc::start_http(&address, handler)?;
server.wait();
exit.recv().ok();
Ok(())
}
......
......@@ -6,7 +6,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1.17"
parking_lot = "0.4"
tokio-timer = "0.1.2"
tokio-core = "0.1.12"
ed25519 = { path = "../../substrate/ed25519" }
error-chain = "0.11"
log = "0.4"
......@@ -21,6 +21,5 @@ substrate-codec = { path = "../../substrate/codec" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-runtime-support = { path = "../../substrate/runtime-support" }
substrate-network = { path = "../../substrate/network" }
tokio-core = "0.1.12"
substrate-keyring = { path = "../../substrate/keyring" }
substrate-client = { path = "../../substrate/client" }
......@@ -32,7 +32,6 @@
extern crate futures;
extern crate ed25519;
extern crate parking_lot;
extern crate tokio_timer;
extern crate polkadot_api;
extern crate polkadot_collator as collator;
extern crate polkadot_statement_table as table;
......@@ -532,6 +531,8 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
type Evaluate = Result<bool, Error>;
fn propose(&self) -> Result<SubstrateBlock, Error> {
debug!(target: "bft", "proposing block on top of parent ({}, {:?})", self.parent_number, self.parent_hash);
// TODO: handle case when current timestamp behind that in state.
let mut block_builder = self.client.build_block(
&self.parent_id,
......@@ -577,7 +578,18 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
// TODO: certain kinds of errors here should lead to a misbehavior report.
fn evaluate(&self, proposal: &SubstrateBlock) -> Result<bool, Error> {
evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash, &self.parent_id)
debug!(target: "bft", "evaluating block on top of parent ({}, {:?})", self.parent_number, self.parent_hash);
match evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash, &self.parent_id) {
Ok(x) => Ok(x),
Err(e) => match *e.kind() {
ErrorKind::PolkadotApi(polkadot_api::ErrorKind::Executor(_)) => Ok(false),
ErrorKind::ProposalNotForPolkadot => Ok(false),
ErrorKind::TimestampInFuture => Ok(false),
ErrorKind::WrongParentHash(_, _) => Ok(false),
ErrorKind::ProposalTooLarge(_) => Ok(false),
_ => Err(e),
}
}
}
fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) {
......
......@@ -20,12 +20,14 @@
/// candidate agreement over the network.
use std::thread;
use std::time::{Duration, Instant};
use std::sync::Arc;
use futures::{future, Future, Stream, Sink, Async, Canceled};
use std::collections::{HashMap, VecDeque};
use futures::{future, Future, Stream, Sink, Async, Canceled, Poll};
use parking_lot::Mutex;
use substrate_network as net;
use tokio_core::reactor;
use client::BlockchainEvents;
use client::{BlockchainEvents, ChainHead};
use runtime_support::Hashable;
use primitives::{Hash, AuthorityId};
use primitives::block::{Id as BlockId, HeaderHash, Header};
......@@ -35,15 +37,101 @@ use bft::{self, BftService};
use transaction_pool::TransactionPool;
use ed25519;
use super::{TableRouter, SharedTable, ProposerFactory};
use error::Error;
use error;
const TIMER_DELAY_MS: u64 = 5000;
const TIMER_INTERVAL_MS: u64 = 500;
const MESSAGE_LIFETIME_SEC: u64 = 10;
struct BftSink<E> {
network: Arc<net::ConsensusService>,
parent_hash: HeaderHash,
_e: ::std::marker::PhantomData<E>,
}
fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_hash: HeaderHash) -> Result<bft::Communication, bft::Error> {
Ok(match msg {
#[derive(Clone)]
struct SharedMessageCollection {
/// Messages for consensus over a block with known hash. Also holds timestamp of the first message.
messages: Arc<Mutex<HashMap<HeaderHash, (Instant, VecDeque<net::LocalizedBftMessage>)>>>,
}
impl SharedMessageCollection {
fn new() -> SharedMessageCollection {
SharedMessageCollection {
messages: Arc::new(Mutex::new(HashMap::new())),
}
}
fn select(&self, parent_hash: HeaderHash, stream: net::BftMessageStream, authorities: Vec<AuthorityId>) -> Messages {
Messages {
messages: self.messages.lock().remove(&parent_hash).map(|(_, m)| m).unwrap_or_else(VecDeque::new),
parent_hash,
network_stream: stream,
authorities: authorities,
collection: self.clone(),
}
}
fn push(&self, message: net::LocalizedBftMessage) {
self.messages.lock()
.entry(message.parent_hash)
.or_insert_with(|| (Instant::now(), VecDeque::new()))
.1.push_back(message);
}
fn collect_garbage(&self) {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SEC);
let now = Instant::now();
self.messages.lock().retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
}
}
struct Messages {
parent_hash: HeaderHash,
messages: VecDeque<net::LocalizedBftMessage>,
network_stream: net::BftMessageStream,
authorities: Vec<AuthorityId>,
collection: SharedMessageCollection,
}
impl Stream for Messages {
type Item = bft::Communication;
type Error = bft::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// push buffered messages first
while let Some(message) = self.messages.pop_front() {
match process_message(message, &self.authorities) {
Ok(message) => return Ok(Async::Ready(Some(message))),
Err(e) => debug!("Message validation failed: {:?}", e),
}
}
// check the network
match self.network_stream.poll() {
Err(_) => Err(bft::InputStreamConcluded.into()),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
Ok(Async::Ready(Some(message))) => {
if message.parent_hash == self.parent_hash {
match process_message(message, &self.authorities) {
Ok(message) => Ok(Async::Ready(Some(message))),
Err(e) => {
debug!("Message validation failed: {:?}", e);
Ok(Async::NotReady)
}
}
} else {
self.collection.push(message);
Ok(Async::NotReady)
}
}
}
}
}
fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -> Result<bft::Communication, bft::Error> {
Ok(match msg.message {
net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
let proposal = bft::generic::LocalizedProposal {
......@@ -60,7 +148,7 @@ fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_has
signer: ed25519::Public(proposal.sender),
}
};
bft::check_proposal(authorities, &parent_hash, &proposal)?;
bft::check_proposal(authorities, &msg.parent_hash, &proposal)?;
proposal
}),
net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
......@@ -76,14 +164,14 @@ fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_has
net::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize),
}
};
bft::check_vote(authorities, &parent_hash, &vote)?;
bft::check_vote(authorities, &msg.parent_hash, &vote)?;
vote
}),
}),
net::BftMessage::Auxiliary(a) => {
let justification = bft::UncheckedJustification::from(a);
// TODO: get proper error
let justification: Result<_, bft::Error> = bft::check_prepare_justification(authorities, parent_hash, justification)
let justification: Result<_, bft::Error> = bft::check_prepare_justification(authorities, msg.parent_hash, justification)
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
bft::generic::Communication::Auxiliary(justification?)
},
......@@ -96,27 +184,30 @@ impl<E> Sink for BftSink<E> {
type SinkError = E;
fn start_send(&mut self, message: bft::Communication) -> ::futures::StartSend<bft::Communication, E> {
let network_message = match message {
bft::generic::Communication::Consensus(c) => net::BftMessage::Consensus(match c {
bft::generic::LocalizedMessage::Propose(proposal) => net::SignedConsensusMessage::Propose(net::SignedConsensusProposal {
round_number: proposal.round_number as u32,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: proposal.digest_signature.signature,
full_signature: proposal.full_signature.signature,
let network_message = net::LocalizedBftMessage {
message: match message {
bft::generic::Communication::Consensus(c) => net::BftMessage::Consensus(match c {
bft::generic::LocalizedMessage::Propose(proposal) => net::SignedConsensusMessage::Propose(net::SignedConsensusProposal {
round_number: proposal.round_number as u32,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: proposal.digest_signature.signature,
full_signature: proposal.full_signature.signature,
}),
bft::generic::LocalizedMessage::Vote(vote) => net::SignedConsensusMessage::Vote(net::SignedConsensusVote {
sender: vote.sender,
signature: vote.signature.signature,
vote: match vote.vote {
bft::generic::Vote::Prepare(r, h) => net::ConsensusVote::Prepare(r as u32, h),
bft::generic::Vote::Commit(r, h) => net::ConsensusVote::Commit(r as u32, h),
bft::generic::Vote::AdvanceRound(r) => net::ConsensusVote::AdvanceRound(r as u32),
}
}),
}),
bft::generic::LocalizedMessage::Vote(vote) => net::SignedConsensusMessage::Vote(net::SignedConsensusVote {
sender: vote.sender,
signature: vote.signature.signature,
vote: match vote.vote {
bft::generic::Vote::Prepare(r, h) => net::ConsensusVote::Prepare(r as u32, h),
bft::generic::Vote::Commit(r, h) => net::ConsensusVote::Commit(r as u32, h),
bft::generic::Vote::AdvanceRound(r) => net::ConsensusVote::AdvanceRound(r as u32),
}
}),
}),
bft::generic::Communication::Auxiliary(justification) => net::BftMessage::Auxiliary(justification.uncheck().into()),
bft::generic::Communication::Auxiliary(justification) => net::BftMessage::Auxiliary(justification.uncheck().into()),
},
parent_hash: self.parent_hash,
};
self.network.send_bft_message(network_message);
Ok(::futures::AsyncSink::Ready)
......@@ -134,17 +225,49 @@ pub struct Service {
struct Network(Arc<net::ConsensusService>);
fn start_bft<F, C>(
header: &Header,
handle: reactor::Handle,
client: &bft::Authorities,
network: Arc<net::ConsensusService>,
bft_service: &BftService<F, C>,
messages: SharedMessageCollection
) where
F: bft::ProposerFactory + 'static,
C: bft::BlockImport + bft::Authorities + 'static,
<F as bft::ProposerFactory>::Error: ::std::fmt::Debug,
<F::Proposer as bft::Proposer>::Error: ::std::fmt::Display + Into<error::Error>,
{
let hash = header.blake2_256().into();
if bft_service.live_agreement().map_or(false, |h| h == hash) {
return;
}
let authorities = match client.authorities(&BlockId::Hash(hash)) {
Ok(authorities) => authorities,
Err(e) => {
debug!("Error reading authorities: {:?}", e);
return;
}
};
let input = messages.select(hash, network.bft_messages(), authorities).map_err(|e| e.into());
let output = BftSink { network: network, parent_hash: hash.clone(), _e: Default::default() };
match bft_service.build_upon(&header, input, output) {
Ok(Some(bft)) => handle.spawn(bft),
Ok(None) => {},
Err(e) => debug!("BFT agreement error: {:?}", e),
}
}
impl Service {
/// Create and start a new instance.
pub fn new<C>(
client: Arc<C>,
network: Arc<net::ConsensusService>,
transaction_pool: Arc<Mutex<TransactionPool>>,
key: ed25519::Pair,
best_header: &Header) -> Service
where C: BlockchainEvents + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
key: ed25519::Pair
) -> Service
where C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
{
let best_header = best_header.clone();
let thread = thread::spawn(move || {
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
let key = Arc::new(key);
......@@ -153,31 +276,44 @@ impl Service {
transaction_pool: transaction_pool.clone(),
network: Network(network.clone()),
};
let bft_service = BftService::new(client.clone(), key, factory);
let build_bft = |header: &Header| -> Result<_, Error> {
let hash = header.blake2_256().into();
let authorities = client.authorities(&BlockId::Hash(hash))?;
let input = network.bft_messages()
.filter_map(move |message| {
process_message(message, &authorities, hash.clone())
.map_err(|e| debug!("Message validation failed: {:?}", e))
.ok()
})
.map_err(|_| bft::InputStreamConcluded.into());
let output = BftSink { network: network.clone(), _e: Default::default() };
Ok(bft_service.build_upon(&header, input, output)?)
let messages = SharedMessageCollection::new();
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
let handle = core.handle();
let notifications = client.import_notification_stream().for_each(|notification| {
if notification.is_new_best {
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
}
Ok(())
});
let interval = reactor::Interval::new_at(Instant::now() + Duration::from_millis(TIMER_DELAY_MS), Duration::from_millis(TIMER_INTERVAL_MS), &handle).unwrap();
let mut prev_best = match client.best_block_header() {
Ok(header) => header.blake2_256(),
Err(e) => {
warn!("Cant's start consensus service. Error reading best block header: {:?}", e);
return;
}
};
// Kickstart BFT agreement on start.
if let Err(e) = build_bft(&best_header)
.map_err(|e| debug!("Error creating initial BFT agreement: {:?}", e))
.and_then(|bft| core.run(bft))
{
debug!("Error starting initial BFT agreement: {:?}", e);
}
let bft = client.import_notification_stream().and_then(|notification| {
build_bft(&notification.header).map_err(|e| debug!("BFT agreement error: {:?}", e))
}).for_each(|f| f);
if let Err(e) = core.run(bft) {
let c = client.clone();
let s = bft_service.clone();
let n = network.clone();
let m = messages.clone();
let handle = core.handle();
let timed = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.blake2_256();
m.collect_garbage();
if hash == prev_best {
debug!("Starting consensus round after a timeout");
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
}
prev_best = hash;
}
Ok(())
});
core.handle().spawn(timed);
if let Err(e) = core.run(notifications) {
debug!("BFT event loop error {:?}", e);
}
});
......@@ -235,3 +371,4 @@ impl TableRouter for Router {
future::ok(Extrinsic)
}
}
......@@ -146,7 +146,7 @@ pub type UncheckedExtrinsic = generic::UncheckedExtrinsic<AccountId, Index, Call
pub type Extrinsic = generic::Extrinsic<AccountId, Index, Call>;
/// Executive: handles dispatch to the various modules.
pub type Executive = executive::Executive<Concrete, Block, Staking,
((((((), Parachains), Council), Democracy), Staking), Session)>;
(((((((), Parachains), Council), Democracy), Staking), Session), Timestamp)>;
impl_outer_config! {
pub struct GenesisConfig for Concrete {
......
......@@ -44,7 +44,7 @@ impl<T: Trait> Module<T> {
pub fn calculate_duty_roster() -> DutyRoster {
let parachain_count = Self::count();
let validator_count = <session::Module<T>>::validator_count();
let validators_per_parachain = (validator_count - 1) / parachain_count;
let validators_per_parachain = if parachain_count != 0 { (validator_count - 1) / parachain_count } else { 0 };
let mut roles_val = (0..validator_count).map(|i| match i {
i if i < parachain_count * validators_per_parachain =>
......
......@@ -7,7 +7,6 @@ authors = ["Parity Technologies <admin@parity.io>"]
futures = "0.1.17"
parking_lot = "0.4"
tokio-timer = "0.1.2"
hex-literal = "0.1"
error-chain = "0.11"
log = "0.4"
tokio-core = "0.1.12"
......
......@@ -37,8 +37,6 @@ extern crate substrate_executor;
extern crate tokio_core;
extern crate substrate_client as client;
#[macro_use]
extern crate hex_literal;
#[macro_use]
extern crate error_chain;
#[macro_use]
......@@ -141,8 +139,12 @@ impl Service {
}
let god_keys = vec![
hex!["f09c0d1467d6952c92c343672bfb06a24560f400af8cf98b93df7d40b4efe1b6"],
hex!["84718cd2894bcda83beeca3a7842caf269fe93cacde0bdee0e3cbce6de253f0e"]
ed25519::Pair::from_seed(b"Alice ").public().into(),
ed25519::Pair::from_seed(b"Bob ").public().into(),
// ed25519::Pair::from_seed(b"Charlie ").public().into(),
// ed25519::Pair::from_seed(b"Dave ").public().into(),
// ed25519::Pair::from_seed(b"Eve ").public().into(),
// ed25519::Pair::from_seed(b"Ferdie ").public().into(),
];
let genesis_config = GenesisConfig {
......@@ -190,15 +192,16 @@ impl Service {
let prepare_genesis = || {
storage = genesis_config.build_externalities();
let block = genesis::construct_genesis_block(&storage);
with_externalities(&mut storage, ||
with_externalities(&mut storage, || {
// TODO: use api.rs to dispatch instead
polkadot_runtime::System::initialise_genesis_state(&block.header)
);
polkadot_runtime::System::initialise_genesis_state(&block.header);
info!("Genesis header hash: {}", polkadot_runtime::System::block_hash(0));
});
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
};
let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?);
let best_header = client.header(&BlockId::Hash(client.info()?.chain.best_hash))?.expect("Best header always exists; qed");
let best_header = client.best_block_header()?;
info!("Starting Polkadot. Best block is #{}", best_header.number);
let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool)));
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
......@@ -220,7 +223,7 @@ impl Service {
// Load the first available key. Code above makes sure it exisis.
let key = keystore.load(&keystore.contents()?[0], "")?;
info!("Using authority key {:?}", key.public());
Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key, &best_header))
Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key))
} else {
None
};
......
......@@ -22,9 +22,16 @@ extern crate polkadot_cli as cli;
#[macro_use]
extern crate error_chain;
extern crate ctrlc;
use std::sync::mpsc;
quick_main!(run);
fn run() -> cli::error::Result<()> {
cli::run(::std::env::args())
let (exit_send, exit_receive) = mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.send(()).expect("Error sending exit notification");
});
cli::run(::std::env::args(), exit_receive)
}
......@@ -63,10 +63,10 @@ impl PolkadotBlock {
return Err(unchecked);
}
match unchecked.extrinsics[0].extrinsic.function {
Call::Timestamp(TimestampCall::set(_)) => return Err(unchecked),
_ => {}
Call::Timestamp(TimestampCall::set(_)) => {},
_ => return Err(unchecked),
}
// any further checks...
Ok(PolkadotBlock { block: unchecked, location: None })
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment