diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index c9692cbde22c8f0104171b96bf626c1684d90651..e6a16df79bcdccc45c0baa2154700c8960751be1 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -38,11 +38,12 @@ extern crate substrate_consensus_common as consensus_common; extern crate tokio; extern crate sr_version as runtime_version; extern crate substrate_network as network; -extern crate futures; extern crate parking_lot; #[macro_use] extern crate log; +#[macro_use] +extern crate futures; #[cfg(test)] extern crate substrate_keyring as keyring; @@ -53,10 +54,10 @@ extern crate substrate_test_client as test_client; #[cfg(test)] extern crate env_logger; -pub use aura_primitives::*; +mod slots; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use codec::Encode; use consensus_common::{Authorities, BlockImport, Environment, Proposer}; @@ -69,10 +70,11 @@ use network::import_queue::{Verifier, BasicQueue}; use primitives::{AuthorityId, ed25519}; use futures::{Stream, Future, IntoFuture, future::{self, Either}}; -use tokio::timer::{Delay, Timeout}; +use tokio::timer::Timeout; use api::AuraApi; +use slots::Slots; -pub use aura_primitives::AuraConsensusData; +pub use aura_primitives::*; pub use consensus_common::SyncOracle; /// A handle to the network. This is generally implemented by providing some @@ -221,135 +223,117 @@ pub fn start_aura<B, C, E, I, SO, Error>( let sync_oracle = sync_oracle.clone(); let SlotDuration(slot_duration) = slot_duration; - fn time_until_next(now: Duration, slot_duration: u64) -> Duration { - let remaining_full_secs = slot_duration - (now.as_secs() % slot_duration) - 1; - let remaining_nanos = 1_000_000_000 - now.subsec_nanos(); - Duration::new(remaining_full_secs, remaining_nanos) - }; + // rather than use a timer interval, we schedule our waits ourselves + Slots::new(slot_duration) + .map_err(|e| debug!(target: "aura", "Faulty timer: {:?}", e)) + .for_each(move |slot_info| { + let client = client.clone(); + let pair = pair.clone(); + let block_import = block_import.clone(); + let env = env.clone(); + let sync_oracle = sync_oracle.clone(); + let public_key = pair.public(); + + // only propose when we are not syncing. + if sync_oracle.is_major_syncing() { + debug!(target: "aura", "Skipping proposal slot due to sync."); + return Either::B(future::ok(())); + } + + let (timestamp, slot_num) = (slot_info.timestamp, slot_info.number); + let chain_head = match client.best_block_header() { + Ok(x) => x, + Err(e) => { + warn!(target:"aura", "Unable to author block in slot {}. \ + no best block header: {:?}", slot_num, e); + return Either::B(future::ok(())) + } + }; - // rather than use an interval, we schedule our waits ourselves - future::loop_fn((), move |()| { - let next_slot_start = duration_now() - .map(|now| Instant::now() + time_until_next(now, slot_duration)) - .unwrap_or_else(|| Instant::now()); - - let client = client.clone(); - let pair = pair.clone(); - let block_import = block_import.clone(); - let env = env.clone(); - let sync_oracle = sync_oracle.clone(); - let public_key = pair.public(); - - Delay::new(next_slot_start) - .map_err(|e| debug!(target: "aura", "Faulty timer: {:?}", e)) - .and_then(move |_| { - // only propose when we are not syncing. - if sync_oracle.is_major_syncing() { - debug!(target: "aura", "Skipping proposal slot due to sync."); + let authorities = match client.authorities(&BlockId::Hash(chain_head.hash())) { + Ok(authorities) => authorities, + Err(e) => { + warn!("Unable to fetch authorities at\ + block {:?}: {:?}", chain_head.hash(), e); return Either::B(future::ok(())); } + }; - let pair = pair.clone(); - let (timestamp, slot_num) = match timestamp_and_slot_now(slot_duration) { - Some(n) => n, - None => return Either::B(future::err(())), - }; - - let chain_head = match client.best_block_header() { - Ok(x) => x, - Err(e) => { - warn!(target:"aura", "Unable to author block in slot {}. \ - no best block header: {:?}", slot_num, e); - return Either::B(future::ok(())) - } - }; - - let authorities = match client.authorities(&BlockId::Hash(chain_head.hash())) { - Ok(authorities) => authorities, - Err(e) => { - warn!("Unable to fetch authorities at\ - block {:?}: {:?}", chain_head.hash(), e); - return Either::B(future::ok(())); - } - }; - - let proposal_work = match slot_author(slot_num, &authorities) { - None => return Either::B(future::ok(())), - Some(author) => if author.0 == public_key.0 { - debug!(target: "aura", "Starting authorship at slot {}; timestamp = {}", - slot_num, timestamp); - - // we are the slot author. make a block and sign it. - let proposer = match env.init(&chain_head, &authorities, pair.clone()) { - Ok(p) => p, - Err(e) => { - warn!("Unable to author block in slot {:?}: {:?}", slot_num, e); - return Either::B(future::ok(())) - } - }; - - let consensus_data = AuraConsensusData { - timestamp, - slot: slot_num, - slot_duration, - }; - - // deadline our production to approx. the end of the - // slot - Timeout::new( - proposer.propose(consensus_data).into_future(), - time_until_next(Duration::from_secs(timestamp), slot_duration), - ) - } else { - return Either::B(future::ok(())); - } - }; - - let block_import = block_import.clone(); - Either::A(proposal_work - .map(move |b| { - // minor hack since we don't have access to the timestamp - // that is actually set by the proposer. - let slot_after_building = slot_now(slot_duration); - if slot_after_building != Some(slot_num) { - info!("Discarding proposal for slot {}; block production took too long", - slot_num); - return + let proposal_work = match slot_author(slot_num, &authorities) { + None => return Either::B(future::ok(())), + Some(author) => if author.0 == public_key.0 { + debug!(target: "aura", "Starting authorship at slot {}; timestamp = {}", + slot_num, timestamp); + + // we are the slot author. make a block and sign it. + let proposer = match env.init(&chain_head, &authorities, pair.clone()) { + Ok(p) => p, + Err(e) => { + warn!("Unable to author block in slot {:?}: {:?}", slot_num, e); + return Either::B(future::ok(())) } + }; + + let consensus_data = AuraConsensusData { + timestamp, + slot: slot_num, + slot_duration, + }; + + // deadline our production to approx. the end of the + // slot + Timeout::new( + proposer.propose(consensus_data).into_future(), + slot_info.remaining_duration(), + ) + } else { + return Either::B(future::ok(())); + } + }; - let (header, body) = b.deconstruct(); - let pre_hash = header.hash(); - let parent_hash = header.parent_hash().clone(); - - // sign the pre-sealed hash of the block and then - // add it to a digest item. - let to_sign = (slot_num, pre_hash).encode(); - let signature = pair.sign(&to_sign[..]); - let item = <DigestItemFor<B> as CompatibleDigestItem>::aura_seal( - slot_num, - signature, - ); - - let import_block = ImportBlock { - origin: BlockOrigin::Own, - header, - justification: None, - post_digests: vec![item], - body: Some(body), - finalized: false, - auxiliary: Vec::new(), - }; - - if let Err(e) = block_import.import_block(import_block, None) { - warn!(target: "aura", "Error with block built on {:?}: {:?}", - parent_hash, e); - } - }) - .map_err(|e| warn!("Failed to construct block: {:?}", e)) - ) - }) - .map(|_| future::Loop::Continue(())) - }) + let block_import = block_import.clone(); + Either::A(proposal_work + .map(move |b| { + // minor hack since we don't have access to the timestamp + // that is actually set by the proposer. + let slot_after_building = slot_now(slot_duration); + if slot_after_building != Some(slot_num) { + info!("Discarding proposal for slot {}; block production took too long", + slot_num); + return + } + + let (header, body) = b.deconstruct(); + let pre_hash = header.hash(); + let parent_hash = header.parent_hash().clone(); + + // sign the pre-sealed hash of the block and then + // add it to a digest item. + let to_sign = (slot_num, pre_hash).encode(); + let signature = pair.sign(&to_sign[..]); + let item = <DigestItemFor<B> as CompatibleDigestItem>::aura_seal( + slot_num, + signature, + ); + + let import_block = ImportBlock { + origin: BlockOrigin::Own, + header, + justification: None, + post_digests: vec![item], + body: Some(body), + finalized: false, + auxiliary: Vec::new(), + }; + + if let Err(e) = block_import.import_block(import_block, None) { + warn!(target: "aura", "Error with block built on {:?}: {:?}", + parent_hash, e); + } + }) + .map_err(|e| warn!("Failed to construct block: {:?}", e)) + ) + }) }; let work = future::loop_fn((), move |()| { diff --git a/substrate/core/consensus/aura/src/slots.rs b/substrate/core/consensus/aura/src/slots.rs new file mode 100644 index 0000000000000000000000000000000000000000..37db3b17636b40880573ff37c9d8baaac61095b4 --- /dev/null +++ b/substrate/core/consensus/aura/src/slots.rs @@ -0,0 +1,122 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +//! Utility stream for yielding slots in a loop. +//! +//! This is used instead of `tokio_timer::Interval` because it was unreliable. + +use std::time::{Instant, Duration}; +use tokio::timer::Delay; +use futures::prelude::*; + +/// Returns the duration until the next slot, based on current duration since +pub(crate) fn time_until_next(now: Duration, slot_duration: u64) -> Duration { + let remaining_full_secs = slot_duration - (now.as_secs() % slot_duration) - 1; + let remaining_nanos = 1_000_000_000 - now.subsec_nanos(); + Duration::new(remaining_full_secs, remaining_nanos) +} + +/// Information about a slot. +#[derive(Debug, Clone)] +pub(crate) struct SlotInfo { + /// The slot number. + pub(crate) number: u64, + /// Current timestamp. + pub(crate) timestamp: u64, + /// The instant at which the slot ends. + pub(crate) ends_at: Instant, +} + +impl SlotInfo { + /// Yields the remaining duration in the slot. + pub(crate) fn remaining_duration(&self) -> Duration { + let now = Instant::now(); + if now < self.ends_at { + self.ends_at.duration_since(now) + } else { + Duration::from_secs(0) + } + } +} + +/// A stream that returns every time there is a new slot. +pub(crate) struct Slots { + last_slot: u64, + slot_duration: u64, + inner_delay: Option<Delay>, +} + +impl Slots { + /// Create a new `slots` stream. + pub(crate) fn new(slot_duration: u64) -> Self { + Slots { + last_slot: 0, + slot_duration, + inner_delay: None, + } + } +} + +impl Stream for Slots { + type Item = SlotInfo; + type Error = tokio::timer::Error; + + fn poll(&mut self) -> Poll<Option<SlotInfo>, Self::Error> { + let slot_duration = self.slot_duration; + self.inner_delay = match self.inner_delay.take() { + None => { + // schedule wait. + let wait_until = match ::duration_now() { + None => return Ok(Async::Ready(None)), + Some(now) => Instant::now() + time_until_next(now, slot_duration), + }; + + Some(Delay::new(wait_until)) + } + Some(d) => Some(d), + }; + + if let Some(ref mut inner_delay) = self.inner_delay { + try_ready!(inner_delay.poll()); + } + + // timeout has fired. + + let (timestamp, slot_num) = match ::timestamp_and_slot_now(slot_duration) { + None => return Ok(Async::Ready(None)), + Some(x) => x, + }; + + // reschedule delay for next slot. + let ends_at = Instant::now() + + time_until_next(Duration::from_secs(timestamp), slot_duration); + self.inner_delay = Some(Delay::new(ends_at)); + + // never yield the same slot twice. + if slot_num > self.last_slot { + self.last_slot = slot_num; + + Ok(Async::Ready(Some(SlotInfo { + number: slot_num, + timestamp, + ends_at, + }))) + } else { + // re-poll until we get a new slot. + self.poll() + } + } +}