Commit 2f6a4d9c authored by Arkadiy Paronyan's avatar Arkadiy Paronyan Committed by GitHub
Browse files

[backport] Lower bft timeout (#484)

* v0.2.3

* Lower bft timeout

* force BFT delay in consensus service, not in proposer logic (#477)

* move forced delay to consensus service

* fiddle with logging
parent 1089a294
......@@ -258,8 +258,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
) -> Result<(Self::Proposer, Self::Input, Self::Output), Error> {
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
const DELAY_UNTIL: Duration = Duration::from_millis(5000);
let parent_hash = parent_header.hash().into();
let id = BlockId::hash(parent_hash);
......@@ -290,9 +288,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
self.parachain_empty_duration.clone(),
);
debug!(target: "bft", "Initialising consensus proposer. Refusing to evaluate for {:?} from now.",
DELAY_UNTIL);
let validation_para = match local_duty.validation {
Chain::Relay => None,
Chain::Parachain(id) => Some(id),
......@@ -315,7 +310,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
client: self.client.clone(),
dynamic_inclusion,
local_key: sign_with,
minimum_delay: now + DELAY_UNTIL,
parent_hash,
parent_id: id,
parent_number: parent_header.number,
......@@ -370,7 +364,6 @@ pub struct Proposer<C: PolkadotApi> {
client: Arc<C>,
dynamic_inclusion: DynamicInclusion,
local_key: Arc<ed25519::Pair>,
minimum_delay: Instant,
parent_hash: Hash,
parent_id: BlockId,
parent_number: BlockNumber,
......@@ -401,17 +394,10 @@ impl<C> bft::Proposer<Block> for Proposer<C>
initial_included,
).unwrap_or_else(|| now + Duration::from_millis(1));
let minimum_delay = if self.minimum_delay > now + ATTEMPT_PROPOSE_EVERY {
Some(Delay::new(self.minimum_delay))
} else {
None
};
let timing = ProposalTiming {
attempt_propose: Interval::new(now + ATTEMPT_PROPOSE_EVERY, ATTEMPT_PROPOSE_EVERY),
enough_candidates: Delay::new(enough_candidates),
dynamic_inclusion: self.dynamic_inclusion.clone(),
minimum_delay,
last_included: initial_included,
};
......@@ -484,11 +470,7 @@ impl<C> bft::Proposer<Block> for Proposer<C>
// delay casting vote until able according to minimum block time,
// timestamp delay, and count delay.
// construct a future from the maximum of the two durations.
let max_delay = [timestamp_delay, count_delay, Some(self.minimum_delay)]
.iter()
.cloned()
.max()
.expect("iterator not empty; thus max returns `Some`; qed");
let max_delay = ::std::cmp::max(timestamp_delay, count_delay);
let temporary_delay = match max_delay {
Some(duration) => future::Either::A(
......@@ -610,7 +592,6 @@ struct ProposalTiming {
attempt_propose: Interval,
dynamic_inclusion: DynamicInclusion,
enough_candidates: Delay,
minimum_delay: Option<Delay>,
last_included: usize,
}
......@@ -627,12 +608,6 @@ impl ProposalTiming {
x.expect("timer still alive; intervals never end; qed");
}
if let Some(ref mut min) = self.minimum_delay {
try_ready!(min.poll().map_err(ErrorKind::Timer));
}
self.minimum_delay = None; // after this point, the future must have completed.
if included == self.last_included {
return self.enough_candidates.poll().map_err(ErrorKind::Timer);
}
......
......@@ -38,7 +38,7 @@ use transaction_pool::TransactionPool;
use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle;
use tokio::runtime::TaskExecutor as ThreadPoolHandle;
use tokio::runtime::current_thread::Runtime as LocalRuntime;
use tokio::timer::Interval;
use tokio::timer::{Delay, Interval};
use super::{Network, Collators, ProposerFactory};
use error;
......@@ -49,8 +49,8 @@ const TIMER_INTERVAL_MS: u64 = 500;
// spin up an instance of BFT agreement on the current thread's executor.
// panics if there is no current thread executor.
fn start_bft<F, C>(
header: &Header,
bft_service: &BftService<Block, F, C>,
header: Header,
bft_service: Arc<BftService<Block, F, C>>,
) where
F: bft::Environment<Block> + 'static,
C: bft::BlockImport<Block> + bft::Authorities<Block> + 'static,
......@@ -58,14 +58,35 @@ fn start_bft<F, C>(
<F::Proposer as bft::Proposer<Block>>::Error: ::std::fmt::Display + Into<error::Error>,
<F as bft::Environment<Block>>::Error: ::std::fmt::Display
{
const DELAY_UNTIL: Duration = Duration::from_millis(5000);
let mut handle = LocalThreadHandle::current();
match bft_service.build_upon(&header) {
Ok(Some(bft)) => if let Err(e) = handle.spawn_local(Box::new(bft)) {
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
},
Ok(None) => {},
Err(e) => warn!(target: "bft", "BFT agreement error: {}", e),
}
let work = Delay::new(Instant::now() + DELAY_UNTIL)
.then(move |res| {
if let Err(e) = res {
warn!(target: "bft", "Failed to force delay of consensus: {:?}", e);
}
match bft_service.build_upon(&header) {
Ok(maybe_bft_work) => {
if maybe_bft_work.is_some() {
debug!(target: "bft", "Starting agreement. After forced delay for {:?}",
DELAY_UNTIL);
}
maybe_bft_work
}
Err(e) => {
warn!(target: "bft", "BFT agreement error: {}", e);
None
}
}
})
.map(|_| ());
if let Err(e) = handle.spawn_local(Box::new(work)) {
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
}
}
/// Consensus service. Starts working when created.
......@@ -113,7 +134,7 @@ impl Service {
client.import_notification_stream().for_each(move |notification| {
if notification.is_new_best {
start_bft(&notification.header, &*bft_service);
start_bft(notification.header, bft_service.clone());
}
Ok(())
})
......@@ -139,9 +160,9 @@ impl Service {
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.hash();
if hash == prev_best {
if hash == prev_best && s.live_agreement() != Some(hash) {
debug!("Starting consensus round after a timeout");
start_bft(&best_block, &*s);
start_bft(best_block, s.clone());
}
prev_best = hash;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment