Commit 87e7b006 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Ensure all known BFT messages are imported when starting consensus (#147)

* a little more BFT tracing

* import cached BFT messages into the produced stream
parent 86505051
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::sync::Arc; use std::sync::Arc;
use std::collections::{HashMap, VecDeque};
use futures::{future, Future, Stream, Sink, Async, Canceled, Poll}; use futures::{future, Future, Stream, Sink, Async, Canceled, Poll};
use parking_lot::Mutex; use parking_lot::Mutex;
use substrate_network as net; use substrate_network as net;
...@@ -41,7 +40,6 @@ use error; ...@@ -41,7 +40,6 @@ use error;
const TIMER_DELAY_MS: u64 = 5000; const TIMER_DELAY_MS: u64 = 5000;
const TIMER_INTERVAL_MS: u64 = 500; const TIMER_INTERVAL_MS: u64 = 500;
const MESSAGE_LIFETIME_SEC: u64 = 10;
struct BftSink<E> { struct BftSink<E> {
network: Arc<net::ConsensusService>, network: Arc<net::ConsensusService>,
...@@ -49,49 +47,9 @@ struct BftSink<E> { ...@@ -49,49 +47,9 @@ struct BftSink<E> {
_e: ::std::marker::PhantomData<E>, _e: ::std::marker::PhantomData<E>,
} }
#[derive(Clone)]
struct SharedMessageCollection {
/// Messages for consensus over a block with known hash. Also holds timestamp of the first message.
messages: Arc<Mutex<HashMap<HeaderHash, (Instant, VecDeque<net::LocalizedBftMessage>)>>>,
}
impl SharedMessageCollection {
fn new() -> SharedMessageCollection {
SharedMessageCollection {
messages: Arc::new(Mutex::new(HashMap::new())),
}
}
fn select(&self, parent_hash: HeaderHash, stream: net::BftMessageStream, authorities: Vec<AuthorityId>) -> Messages {
Messages {
messages: self.messages.lock().remove(&parent_hash).map(|(_, m)| m).unwrap_or_else(VecDeque::new),
parent_hash,
network_stream: stream,
authorities: authorities,
collection: self.clone(),
}
}
fn push(&self, message: net::LocalizedBftMessage) {
self.messages.lock()
.entry(message.parent_hash)
.or_insert_with(|| (Instant::now(), VecDeque::new()))
.1.push_back(message);
}
fn collect_garbage(&self) {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SEC);
let now = Instant::now();
self.messages.lock().retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
}
}
struct Messages { struct Messages {
parent_hash: HeaderHash,
messages: VecDeque<net::LocalizedBftMessage>,
network_stream: net::BftMessageStream, network_stream: net::BftMessageStream,
authorities: Vec<AuthorityId>, authorities: Vec<AuthorityId>,
collection: SharedMessageCollection,
} }
impl Stream for Messages { impl Stream for Messages {
...@@ -99,14 +57,6 @@ impl Stream for Messages { ...@@ -99,14 +57,6 @@ impl Stream for Messages {
type Error = bft::Error; type Error = bft::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// push buffered messages first
while let Some(message) = self.messages.pop_front() {
match process_message(message, &self.authorities) {
Ok(message) => return Ok(Async::Ready(Some(message))),
Err(e) => debug!("Message validation failed: {:?}", e),
}
}
// check the network // check the network
loop { loop {
match self.network_stream.poll() { match self.network_stream.poll() {
...@@ -114,15 +64,11 @@ impl Stream for Messages { ...@@ -114,15 +64,11 @@ impl Stream for Messages {
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end. Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
Ok(Async::Ready(Some(message))) => { Ok(Async::Ready(Some(message))) => {
if message.parent_hash == self.parent_hash { match process_message(message, &self.authorities) {
match process_message(message, &self.authorities) { Ok(message) => return Ok(Async::Ready(Some(message))),
Ok(message) => return Ok(Async::Ready(Some(message))), Err(e) => {
Err(e) => { debug!("Message validation failed: {:?}", e);
debug!("Message validation failed: {:?}", e);
}
} }
} else {
self.collection.push(message);
} }
} }
} }
...@@ -226,18 +172,17 @@ fn start_bft<F, C>( ...@@ -226,18 +172,17 @@ fn start_bft<F, C>(
client: &bft::Authorities, client: &bft::Authorities,
network: Arc<net::ConsensusService>, network: Arc<net::ConsensusService>,
bft_service: &BftService<F, C>, bft_service: &BftService<F, C>,
messages: SharedMessageCollection
) where ) where
F: bft::ProposerFactory + 'static, F: bft::ProposerFactory + 'static,
C: bft::BlockImport + bft::Authorities + 'static, C: bft::BlockImport + bft::Authorities + 'static,
<F as bft::ProposerFactory>::Error: ::std::fmt::Debug, <F as bft::ProposerFactory>::Error: ::std::fmt::Debug,
<F::Proposer as bft::Proposer>::Error: ::std::fmt::Display + Into<error::Error>, <F::Proposer as bft::Proposer>::Error: ::std::fmt::Display + Into<error::Error>,
{ {
let hash = header.blake2_256().into(); let parent_hash = header.blake2_256().into();
if bft_service.live_agreement().map_or(false, |h| h == hash) { if bft_service.live_agreement().map_or(false, |h| h == parent_hash) {
return; return;
} }
let authorities = match client.authorities(&BlockId::Hash(hash)) { let authorities = match client.authorities(&BlockId::Hash(parent_hash)) {
Ok(authorities) => authorities, Ok(authorities) => authorities,
Err(e) => { Err(e) => {
debug!("Error reading authorities: {:?}", e); debug!("Error reading authorities: {:?}", e);
...@@ -245,12 +190,16 @@ fn start_bft<F, C>( ...@@ -245,12 +190,16 @@ fn start_bft<F, C>(
} }
}; };
let input = messages.select(hash, network.bft_messages(), authorities).map_err(|e| e.into()); let input = Messages {
let output = BftSink { network: network, parent_hash: hash.clone(), _e: Default::default() }; network_stream: network.bft_messages(parent_hash),
match bft_service.build_upon(&header, input, output) { authorities,
};
let output = BftSink { network: network, parent_hash: parent_hash, _e: Default::default() };
match bft_service.build_upon(&header, input.map_err(Into::into), output) {
Ok(Some(bft)) => handle.spawn(bft), Ok(Some(bft)) => handle.spawn(bft),
Ok(None) => {}, Ok(None) => {},
Err(e) => debug!(target: "bft","BFT agreement error: {:?}", e), Err(e) => debug!(target: "bft", "BFT agreement error: {:?}", e),
} }
} }
...@@ -281,7 +230,6 @@ impl Service { ...@@ -281,7 +230,6 @@ impl Service {
network: Network(network.clone()), network: Network(network.clone()),
handle: core.handle(), handle: core.handle(),
}; };
let messages = SharedMessageCollection::new();
let bft_service = Arc::new(BftService::new(client.clone(), key, factory)); let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
let notifications = { let notifications = {
...@@ -289,11 +237,10 @@ impl Service { ...@@ -289,11 +237,10 @@ impl Service {
let network = network.clone(); let network = network.clone();
let client = client.clone(); let client = client.clone();
let bft_service = bft_service.clone(); let bft_service = bft_service.clone();
let messages = messages.clone();
client.import_notification_stream().for_each(move |notification| { client.import_notification_stream().for_each(move |notification| {
if notification.is_new_best { if notification.is_new_best {
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone()); start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service);
} }
Ok(()) Ok(())
}) })
...@@ -316,16 +263,14 @@ impl Service { ...@@ -316,16 +263,14 @@ impl Service {
let c = client.clone(); let c = client.clone();
let s = bft_service.clone(); let s = bft_service.clone();
let n = network.clone(); let n = network.clone();
let m = messages.clone();
let handle = core.handle(); let handle = core.handle();
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| { interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() { if let Ok(best_block) = c.best_block_header() {
let hash = best_block.blake2_256(); let hash = best_block.blake2_256();
m.collect_garbage();
if hash == prev_best { if hash == prev_best {
debug!("Starting consensus round after a timeout"); debug!("Starting consensus round after a timeout");
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone()); start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s);
} }
prev_best = hash; prev_best = hash;
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment