Commit 6d8720ac authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Minimal parachains part 2: Parachain statement and data routing (#173)

* dynamic inclusion threshold calculator

* collators interface

* collation helpers

* initial proposal-creation future

* create proposer when asked to propose

* remove local_availability duty

* statement table tracks includable parachain count

* beginnings of timing future

* finish proposal logic

* remove stray println

* extract shared table to separate module

* change ordering

* includability tracking

* fix doc

* initial changes to parachains module

* initialise dummy block before API calls

* give polkadot control over round proposer based on random seed

* propose only after enough candidates

* flesh out parachains module a bit more

* set_heads

* actually introduce set_heads to runtime

* update block_builder to accept parachains

* split block validity errors from real errors in evaluation

* update WASM runtimes

* polkadot-api methods for parachains additions

* delay evaluation until candidates are ready

* comments

* fix dynamic inclusion with zero initial

* test for includability tracker

* wasm validation of parachain candidates

* move primitives to primitives crate

* remove runtime-std dependency from codec

* adjust doc

* polkadot-parachain-primitives

* kill legacy polkadot-validator crate

* basic-add test chain

* test for basic_add parachain

* move to test-chains dir

* use wasm-build

* new wasm directory layout

* reorganize a bit more

* Fix for rh-minimal-parachain (#141)

* Remove extern "C"

We already encountered such behavior (bug?) in pwasm-std, I believe.

* Fix `panic_fmt` signature by adding `_col`

Wrong `panic_fmt` signature can inhibit some optimizations in LTO mode.

* Add linker flags and use wasm-gc in build script

Pass --import-memory to LLD to emit wasm binary with imported memory.

Also use wasm-gc instead of wasm-build.

* Fix effective_max.

I'm not sure why it was the way it was actually.

* Recompile wasm.

* Fix indent

* more basic_add tests

* validate parachain WASM

* produce statements on receiving statements

* tests for reactive statement production

* fix build

* add OOM lang item to runtime-io

* use dynamic_inclusion when evaluating as well

* fix update_includable_count

* remove dead code

* grumbles

* actually defer round_proposer logic

* update wasm

* address a few more grumbles

* schedule collation work as soon as BFT is started

* impl future in collator

* fix comment

* governance proposals for adding and removing parachains

* bump protocol version

* tear out polkadot-specific pieces of substrate-network

* extract out polkadot-specific stuff from substrate-network

* begin polkadot network subsystem

* grumbles

* update WASM checkins

* parse status from polkadot peer

* allow invoke of network specialization

* begin statement router implementation

* remove dependency on tokio-timer

* fix sanity check and have proposer factory create communication streams

* pull out statement routing from consensus library

* fix comments

* adjust typedefs

* extract consensus_gossip out of main network protocol handler

* port substrate-bft to new tokio

* port polkadot-consensus to new tokio

* fix typo

* start message processing task

* initial consensus network implementation

* remove known tracking from statement-table crate

* extract router into separate module

* defer statements until later

* double signature is invalid

* propagating statements

* grumbles

* request block data

* fix compilation

* embed new consensus network into service

* port demo CLI to tokio

* all test crates compile

* some tests for fetching block data

* whitespace

* adjusting some tokio stuff

* update exit-future

* remove overly noisy warning

* clean up collation work a bit

* address review grumbles

* fix lock order in protocol handler

* rebuild wasm artifacts

* tag AuthorityId::from_slice for std only

* address formatting grumbles

* rename event_loop to executor

* some more docs for polkadot-network crate
parent a828ffcc
......@@ -18,13 +18,14 @@ lazy_static = "1.0"
triehash = "0.1"
ed25519 = { path = "../../substrate/ed25519" }
app_dirs = "1.2"
tokio-core = "0.1.12"
tokio = "0.1.7"
futures = "0.1.17"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
fdlimit = "0.1"
parking_lot = "0.4"
serde_json = "1.0"
serde = "1.0"
exit-future = "0.1"
substrate-client = { path = "../../substrate/client" }
substrate-codec = { path = "../../substrate/codec" }
substrate-network = { path = "../../substrate/network" }
......
......@@ -17,9 +17,10 @@
//! Console informant. Prints sync progress and block events. Runs on the calling thread.
use std::time::{Duration, Instant};
use futures::stream::Stream;
use futures::{Future, Stream};
use service::{Service, Components};
use tokio_core::reactor;
use tokio::runtime::TaskExecutor;
use tokio::timer::Interval;
use network::{SyncState, SyncProvider};
use polkadot_primitives::Block;
use state_machine;
......@@ -28,13 +29,12 @@ use client::{self, BlockchainEvents};
const TIMER_INTERVAL_MS: u64 = 5000;
/// Spawn informant on the event loop
pub fn start<C>(service: &Service<C>, handle: reactor::Handle)
pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExecutor)
where
C: Components,
client::error::Error: From<<<<C as Components>::Backend as client::backend::Backend<Block>>::State as state_machine::Backend>::Error>,
{
let interval = reactor::Interval::new_at(Instant::now(), Duration::from_millis(TIMER_INTERVAL_MS), &handle)
.expect("Error creating informant timer");
let interval = Interval::new(Instant::now(), Duration::from_millis(TIMER_INTERVAL_MS));
let network = service.network();
let client = service.client();
......@@ -73,8 +73,8 @@ pub fn start<C>(service: &Service<C>, handle: reactor::Handle)
telemetry!("txpool.import"; "mem_usage" => status.mem_usage, "count" => status.transaction_count, "sender" => status.senders);
Ok(())
});
handle.spawn(display_notifications);
handle.spawn(display_block_import);
handle.spawn(display_txpool_import);
let informant_work = display_notifications.join3(display_block_import, display_txpool_import);
handle.spawn(exit.until(informant_work).map(|_| ()));
}
......@@ -25,7 +25,7 @@ extern crate ansi_term;
extern crate regex;
extern crate time;
extern crate futures;
extern crate tokio_core;
extern crate tokio;
extern crate ctrlc;
extern crate fdlimit;
extern crate ed25519;
......@@ -50,6 +50,7 @@ extern crate slog; // needed until we can reexport `slog_info` from `substrate_t
#[macro_use]
extern crate substrate_telemetry;
extern crate polkadot_transaction_pool as txpool;
extern crate exit_future;
#[macro_use]
extern crate lazy_static;
......@@ -76,9 +77,8 @@ use codec::Slicable;
use client::BlockOrigin;
use runtime_primitives::generic::SignedBlock;
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use tokio_core::reactor;
use futures::Future;
use tokio::runtime::Runtime;
use service::PruningMode;
const DEFAULT_TELEMETRY_URL: &str = "ws://telemetry.polkadot.io:1024";
......@@ -188,13 +188,14 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let role =
if matches.is_present("collator") {
info!("Starting collator");
service::Role::COLLATOR
// TODO [rob]: collation node implementation
service::Role::FULL
} else if matches.is_present("light") {
info!("Starting (light)");
service::Role::LIGHT
} else if matches.is_present("validator") || matches.is_present("dev") {
info!("Starting validator");
service::Role::VALIDATOR
service::Role::AUTHORITY
} else {
info!("Starting (heavy)");
service::Role::FULL
......@@ -231,6 +232,9 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
chain_name: config.chain_spec.name().to_owned(),
};
let mut runtime = Runtime::new()?;
let executor = runtime.executor();
let _guard = if matches.is_present("telemetry") || matches.value_of("telemetry-url").is_some() {
let name = config.name.clone();
let chain_name = config.chain_spec.name().to_owned();
......@@ -250,11 +254,14 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
None
};
let core = reactor::Core::new().expect("tokio::Core could not be created");
match role == service::Role::LIGHT {
true => run_until_exit(core, service::new_light(config)?, &matches, sys_conf),
false => run_until_exit(core, service::new_full(config)?, &matches, sys_conf),
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf)?,
}
// TODO: hard exit if this stalls?
runtime.shutdown_on_idle().wait().expect("failed to shut down event loop");
Ok(())
}
fn build_spec(matches: &clap::ArgMatches) -> error::Result<()> {
......@@ -370,29 +377,37 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}
fn run_until_exit<C>(mut core: reactor::Core, service: service::Service<C>, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()>
fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()>
where
C: service::Components,
client::error::Error: From<<<<C as service::Components>::Backend as client::backend::Backend<Block>>::State as state_machine::Backend>::Error>,
{
let exit = {
// can't use signal directly here because CtrlC takes only `Fn`.
let (exit_send, exit) = mpsc::channel(1);
let (exit_send, exit) = exit_future::signal();
let exit_send = ::std::cell::RefCell::new(Some(exit_send));
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
let exit_send = exit_send
.try_borrow_mut()
.expect("only borrowed in non-reetrant signal handler; qed")
.take();
if let Some(signal) = exit_send {
signal.fire();
}
});
exit
};
informant::start(&service, core.handle());
let executor = runtime.executor();
informant::start(&service, exit.clone(), executor.clone());
let _rpc_servers = {
let http_address = parse_address("127.0.0.1:9933", "rpc-port", matches)?;
let ws_address = parse_address("127.0.0.1:9944", "ws-port", matches)?;
let handler = || {
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
let chain = rpc::apis::chain::Chain::new(service.client(), executor.clone());
let author = rpc::apis::author::Author::new(service.client(), service.transaction_pool());
rpc::rpc_handler::<Block, _, _, _, _>(
service.client(),
......@@ -407,7 +422,7 @@ fn run_until_exit<C>(mut core: reactor::Core, service: service::Service<C>, matc
)
};
core.run(exit.into_future()).expect("Error running informant event loop");
let _ = exit.wait();
Ok(())
}
......
......@@ -9,5 +9,5 @@ futures = "0.1.17"
substrate-codec = { path = "../../substrate/codec", version = "0.1" }
substrate-primitives = { path = "../../substrate/primitives", version = "0.1" }
polkadot-runtime = { path = "../runtime", version = "0.1" }
polkadot-parachain = { path = "../parachain", version = "0.1" }
polkadot-primitives = { path = "../primitives", version = "0.1" }
polkadot-parachain = { path = "../parachain", version = "0.1" }
......@@ -85,7 +85,7 @@ pub trait RelayChainContext {
/// Collate the necessary ingress queue using the given context.
pub fn collate_ingress<'a, R>(relay_context: R)
-> Box<Future<Item=ConsolidatedIngress, Error=R::Error> + 'a>
-> impl Future<Item=ConsolidatedIngress, Error=R::Error> + 'a
where
R: RelayChainContext,
R::Error: 'a,
......@@ -106,7 +106,7 @@ pub fn collate_ingress<'a, R>(relay_context: R)
// and then by the parachain ID.
//
// then transform that into the consolidated egress queue.
Box::new(stream::futures_unordered(egress_fetch)
stream::futures_unordered(egress_fetch)
.fold(BTreeMap::new(), |mut map, (routing_id, egresses)| {
for (depth, egress) in egresses.into_iter().rev().enumerate() {
let depth = -(depth as i64);
......@@ -117,19 +117,19 @@ pub fn collate_ingress<'a, R>(relay_context: R)
})
.map(|ordered| ordered.into_iter().map(|((_, id), egress)| (id, egress)))
.map(|i| i.collect::<Vec<_>>())
.map(ConsolidatedIngress))
.map(ConsolidatedIngress)
}
/// Produce a candidate for the parachain.
pub fn collate<'a, R, P>(local_id: ParaId, relay_context: R, para_context: P)
-> Box<Future<Item=parachain::Candidate, Error=R::Error> + 'a>
pub fn collate<'a, R: 'a, P>(local_id: ParaId, relay_context: R, para_context: P)
-> impl Future<Item=parachain::Candidate, Error=R::Error> + 'a
where
R: RelayChainContext,
R::Error: 'a,
R::FutureEgress: 'a,
P: ParachainContext + 'a,
{
Box::new(collate_ingress(relay_context).map(move |ingress| {
collate_ingress(relay_context).map(move |ingress| {
let (block_data, _, signature) = para_context.produce_candidate(
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
);
......@@ -140,7 +140,7 @@ pub fn collate<'a, R, P>(local_id: ParaId, relay_context: R, para_context: P)
block: block_data,
unprocessed_ingress: ingress,
}
}))
})
}
#[cfg(test)]
......
......@@ -6,7 +6,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1.17"
parking_lot = "0.4"
tokio-core = "0.1.12"
tokio = "0.1.7"
ed25519 = { path = "../../substrate/ed25519" }
error-chain = "0.12"
log = "0.3"
......@@ -22,7 +22,8 @@ substrate-bft = { path = "../../substrate/bft" }
substrate-codec = { path = "../../substrate/codec" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-runtime-support = { path = "../../substrate/runtime-support" }
substrate-network = { path = "../../substrate/network" }
substrate-keyring = { path = "../../substrate/keyring" }
substrate-client = { path = "../../substrate/client" }
substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" }
[dev-dependencies]
substrate-keyring = { path = "../../substrate/keyring" }
......@@ -23,7 +23,7 @@ use std::sync::Arc;
use polkadot_api::PolkadotApi;
use polkadot_primitives::{Hash, AccountId, BlockId};
use polkadot_primitives::parachain::{Id as ParaId, Chain, BlockData, Extrinsic, CandidateReceipt};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use futures::prelude::*;
......@@ -55,7 +55,7 @@ pub trait Collators: Clone {
///
/// This future is fused.
pub struct CollationFetch<C: Collators, P: PolkadotApi> {
parachain: Option<ParaId>,
parachain: ParaId,
relay_parent_hash: Hash,
relay_parent: BlockId,
collators: C,
......@@ -65,16 +65,13 @@ pub struct CollationFetch<C: Collators, P: PolkadotApi> {
impl<C: Collators, P: PolkadotApi> CollationFetch<C, P> {
/// Create a new collation fetcher for the given chain.
pub fn new(parachain: Chain, relay_parent: BlockId, relay_parent_hash: Hash, collators: C, client: Arc<P>) -> Self {
pub fn new(parachain: ParaId, relay_parent: BlockId, relay_parent_hash: Hash, collators: C, client: Arc<P>) -> Self {
CollationFetch {
relay_parent_hash,
relay_parent,
collators,
client,
parachain: match parachain {
Chain::Parachain(id) => Some(id),
Chain::Relay => None,
},
parachain,
live_fetch: None,
}
}
......@@ -85,26 +82,19 @@ impl<C: Collators, P: PolkadotApi> Future for CollationFetch<C, P> {
type Error = C::Error;
fn poll(&mut self) -> Poll<(Collation, Extrinsic), C::Error> {
let parachain = match self.parachain.as_ref() {
Some(p) => p.clone(),
None => return Ok(Async::NotReady),
};
loop {
let x = {
let parachain = self.parachain.clone();
let (r, c) = (self.relay_parent_hash, &self.collators);
let poll = self.live_fetch
.get_or_insert_with(move || c.collate(parachain, r).into_future())
.poll();
if let Err(_) = poll { self.parachain = None }
try_ready!(poll)
};
match validate_collation(&*self.client, &self.relay_parent, &x) {
Ok(()) => {
self.parachain = None;
// TODO: generate extrinsic while verifying.
return Ok(Async::Ready((x, Extrinsic)));
}
......
......@@ -61,7 +61,7 @@ impl DynamicInclusion {
/// would be enough, or `None` if it is sufficient now.
///
/// Panics if `now` is earlier than the `start`.
pub fn acceptable_in(&self, now: Instant, included: usize) -> Option<Duration> {
pub fn acceptable_in(&self, now: Instant, included: usize) -> Option<Instant> {
let elapsed = now.duration_since(self.start);
let elapsed = duration_to_micros(&elapsed);
......@@ -70,7 +70,8 @@ impl DynamicInclusion {
if elapsed >= valid_after {
None
} else {
Some(Duration::from_millis((valid_after - elapsed) as u64 / 1000))
let until = Duration::from_millis((valid_after - elapsed) as u64 / 1000);
Some(now + until)
}
}
}
......@@ -104,7 +105,7 @@ mod tests {
Duration::from_millis(4000),
);
assert_eq!(dynamic.acceptable_in(now, 5), Some(Duration::from_millis(2000)));
assert_eq!(dynamic.acceptable_in(now, 5), Some(now + Duration::from_millis(2000)));
assert!(dynamic.acceptable_in(now + Duration::from_millis(2000), 5).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(3000), 5).is_none());
assert!(dynamic.acceptable_in(now + Duration::from_millis(4000), 5).is_none());
......
......@@ -37,7 +37,7 @@ error_chain! {
description("Proposer destroyed before finishing proposing or evaluating"),
display("Proposer destroyed before finishing proposing or evaluating"),
}
Timer(e: String) {
Timer(e: ::tokio::timer::Error) {
description("Failed to register or resolve async timer."),
display("Timer failed: {}", e),
}
......
This diff is collapsed.
......@@ -18,6 +18,10 @@
/// Consensus service. A long runnung service that manages BFT agreement and parachain
/// candidate agreement over the network.
///
/// This uses a handle to an underlying thread pool to dispatch heavy work
/// such as candidate verification while performing event-driven work
/// on a local event loop.
use std::thread;
use std::time::{Duration, Instant};
......@@ -27,197 +31,37 @@ use bft::{self, BftService};
use client::{BlockchainEvents, ChainHead};
use ed25519;
use futures::prelude::*;
use futures::{future, Canceled};
use polkadot_api::LocalPolkadotApi;
use polkadot_primitives::{BlockId, Block, Header, Hash, AccountId};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use primitives::AuthorityId;
use runtime_support::Hashable;
use substrate_network as net;
use tokio_core::reactor;
use polkadot_primitives::{Block, Header};
use transaction_pool::TransactionPool;
use super::{TableRouter, SharedTable, ProposerFactory};
use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle;
use tokio::runtime::TaskExecutor as ThreadPoolHandle;
use tokio::runtime::current_thread::Runtime as LocalRuntime;
use tokio::timer::Interval;
use super::{Network, Collators, ProposerFactory};
use error;
const TIMER_DELAY_MS: u64 = 5000;
const TIMER_INTERVAL_MS: u64 = 500;
struct BftSink<E> {
network: Arc<net::ConsensusService<Block>>,
parent_hash: Hash,
_e: ::std::marker::PhantomData<E>,
}
struct Messages {
network_stream: net::BftMessageStream<Block>,
local_id: AuthorityId,
authorities: Vec<AuthorityId>,
}
impl Stream for Messages {
type Item = bft::Communication<Block>;
type Error = bft::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// check the network
loop {
match self.network_stream.poll() {
Err(_) => return Err(bft::InputStreamConcluded.into()),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
Ok(Async::Ready(Some(message))) => {
match process_message(message, &self.local_id, &self.authorities) {
Ok(Some(message)) => return Ok(Async::Ready(Some(message))),
Ok(None) => {} // ignored local message.
Err(e) => {
debug!("Message validation failed: {:?}", e);
}
}
}
}
}
}
}
fn process_message(msg: net::LocalizedBftMessage<Block>, local_id: &AuthorityId, authorities: &[AuthorityId]) -> Result<Option<bft::Communication<Block>>, bft::Error> {
Ok(Some(match msg.message {
net::generic_message::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
net::generic_message::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
if &proposal.sender == local_id { return Ok(None) }
let proposal = bft::generic::LocalizedProposal {
round_number: proposal.round_number as usize,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: ed25519::LocalizedSignature {
signature: proposal.digest_signature,
signer: proposal.sender.into(),
},
full_signature: ed25519::LocalizedSignature {
signature: proposal.full_signature,
signer: proposal.sender.into(),
}
};
bft::check_proposal(authorities, &msg.parent_hash, &proposal)?;
trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, proposal.sender);
proposal
}),
net::generic_message::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
if &vote.sender == local_id { return Ok(None) }
let vote = bft::generic::LocalizedVote {
sender: vote.sender,
signature: ed25519::LocalizedSignature {
signature: vote.signature,
signer: vote.sender.into(),
},
vote: match vote.vote {
net::generic_message::ConsensusVote::Prepare(r, h) => bft::generic::Vote::Prepare(r as usize, h),
net::generic_message::ConsensusVote::Commit(r, h) => bft::generic::Vote::Commit(r as usize, h),
net::generic_message::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize),
}
};
bft::check_vote::<Block>(authorities, &msg.parent_hash, &vote)?;
trace!(target: "bft", "importing vote {:?} from {}", vote.vote, vote.sender);
vote
}),
}),
net::generic_message::BftMessage::Auxiliary(a) => {
let justification = bft::UncheckedJustification::<Hash>::from(a);
// TODO: get proper error
let justification: Result<_, bft::Error> = bft::check_prepare_justification::<Block>(authorities, msg.parent_hash, justification)
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
bft::generic::Communication::Auxiliary(justification?)
},
}))
}
impl<E> Sink for BftSink<E> {
type SinkItem = bft::Communication<Block>;
// TODO: replace this with the ! type when that's stabilized
type SinkError = E;
fn start_send(&mut self, message: bft::Communication<Block>) -> ::futures::StartSend<bft::Communication<Block>, E> {
let network_message = net::generic_message::LocalizedBftMessage {
message: match message {
bft::generic::Communication::Consensus(c) => net::generic_message::BftMessage::Consensus(match c {
bft::generic::LocalizedMessage::Propose(proposal) => net::generic_message::SignedConsensusMessage::Propose(net::generic_message::SignedConsensusProposal {
round_number: proposal.round_number as u32,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: proposal.digest_signature.signature,
full_signature: proposal.full_signature.signature,
}),
bft::generic::LocalizedMessage::Vote(vote) => net::generic_message::SignedConsensusMessage::Vote(net::generic_message::SignedConsensusVote {
sender: vote.sender,
signature: vote.signature.signature,
vote: match vote.vote {
bft::generic::Vote::Prepare(r, h) => net::generic_message::ConsensusVote::Prepare(r as u32, h),
bft::generic::Vote::Commit(r, h) => net::generic_message::ConsensusVote::Commit(r as u32, h),
bft::generic::Vote::AdvanceRound(r) => net::generic_message::ConsensusVote::AdvanceRound(r as u32),
}
}),
}),
bft::generic::Communication::Auxiliary(justification) => net::generic_message::BftMessage::Auxiliary(justification.uncheck().into()),
},
parent_hash: self.parent_hash,
};
self.network.send_bft_message(network_message);
Ok(::futures::AsyncSink::Ready)
}
fn poll_complete(&mut self) -> ::futures::Poll<(), E> {
Ok(Async::Ready(()))
}
}
struct Network(Arc<net::ConsensusService<Block>>);
impl super::Network for Network {
type TableRouter = Router;
fn table_router(&self, _table: Arc<SharedTable>) -> Self::TableRouter {
Router {
network: self.0.clone()
}
}
}
// spin up an instance of BFT agreement on the current thread's executor.
// panics if there is no current thread executor.
fn start_bft<F, C>(
header: &Header,
handle: reactor::Handle,
client: &bft::Authorities<Block>,
network: Arc<net::ConsensusService<Block>>,
bft_service: &BftService<Block, F, C>,
) where
F: bft::ProposerFactory<Block> + 'static,
F: bft::Environment<Block> + 'static,
C: bft::BlockImport<Block> + bft::Authorities<Block> + 'static,
<F as bft::ProposerFactory<Block>>::Error: ::std::fmt::Debug,
F::Error: ::std::fmt::Debug,
<F::Proposer as bft::Proposer<Block>>::Error: ::std::fmt::Display + Into<error::Error>,
{
let parent_hash = header.hash();
if bft_service.live_agreement().map_or(false, |h| h == parent_hash) {
return;
}
let authorities = match client.authorities(&BlockId::hash(parent_hash)) {
Ok(authorities) => authorities,
Err(e) => {
debug!("Error reading authorities: {:?}", e);
return;
}
};
let input = Messages {
network_stream: network.bft_messages(parent_hash),
local_id: bft_service.local_id(),
authorities,
};
let output = BftSink { network: network, parent_hash: parent_hash, _e: Default::default() };
match bft_service.build_upon(&header, input.map_err(Into::into), output) {
Ok(Some(bft)) => handle.spawn(bft),
let mut handle = LocalThreadHandle::current();
match bft_service.build_upon(&header) {
Ok(Some(bft)) => if let Err(e) = handle.spawn_local(Box::new(bft)) {
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
},
Ok(None) => {},
Err(e) => debug!(target: "bft", "BFT agreement error: {:?}", e),
}
......@@ -231,54 +75,56 @@ pub struct Service {