From 48aa32bece9ed1bfbd0001262a8b0d1ea47876e4 Mon Sep 17 00:00:00 2001 From: DemiMarie-parity <48690212+DemiMarie-parity@users.noreply.github.com> Date: Sat, 22 Jun 2019 12:21:29 -0400 Subject: [PATCH] Relative slots (#2820) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Initial work on relative slots for BABE * More work * Update core/consensus/babe/src/lib.rs `Aura` → `Babe` Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com> * More work on relative slots * Add missing field in test-runtime * Bump `impl_version` and `authoring_version` * Fix compile errors and warnings * Upgrade dependencies * Update dependencies more * Revert some updates to dependencies Somehow, those broke the build * Fix compilation errors * `Duration` → `u128` in calculations * `slot_duration` is in milleseconds, not seconds * Median algorithm: ignore blocks with slot_num < sl * Fix silly compile error * Store a duration, rather than an instant It is more useful * Fix compilation errors * `INVERSE_NANO` → `NANOS_PER_SEC` Also: `1000_000_000` → `1_000_000_000` Suggested-by: Niklas Adolfsson <niklasadolfsson1@gmail.com> * Un-bump `authoring_version` * Disable median algorithm when `median_required_blocks` is 0 Otherwise it would panic. * Apply suggestions from code review Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com> * Simplify panic * Fix build error * Create `SignedDuration` struct for signed `Duration` values. Suggested-by: Bastian Köcher * Refactor median algorithm into separate function * Add issues for FIXMEs and respond to code review * Fix minor warnings --- substrate/core/client/db/src/utils.rs | 1 - substrate/core/consensus/aura/src/lib.rs | 7 +- .../core/consensus/babe/primitives/src/lib.rs | 9 ++ substrate/core/consensus/babe/src/lib.rs | 130 ++++++++++++++++-- substrate/core/consensus/slots/src/lib.rs | 2 +- substrate/core/consensus/slots/src/slots.rs | 49 ++++--- substrate/core/network/src/test/mod.rs | 2 +- substrate/core/test-runtime/src/lib.rs | 2 + 8 files changed, 167 insertions(+), 35 deletions(-) diff --git a/substrate/core/client/db/src/utils.rs b/substrate/core/client/db/src/utils.rs index a0b955a1a2f..a4ab82b5d8b 100644 --- a/substrate/core/client/db/src/utils.rs +++ b/substrate/core/client/db/src/utils.rs @@ -189,7 +189,6 @@ pub fn block_id_to_lookup_key<Block>( /// Maps database error to client error pub fn db_err(err: io::Error) -> client::error::Error { - use std::error::Error; client::error::Error::Backend(format!("{}", err)) } diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index 896b2d4b00d..c85e273a42b 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -63,7 +63,8 @@ use srml_aura::{ }; use substrate_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO}; -use slots::{CheckedHeader, SlotData, SlotWorker, SlotInfo, SlotCompatible, slot_now, check_equivocation}; +use slots::{CheckedHeader, SlotData, SlotWorker, SlotInfo, SlotCompatible}; +use slots::{SignedDuration, check_equivocation}; pub use aura_primitives::*; pub use consensus_common::SyncOracle; @@ -283,8 +284,8 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w Box::new(proposal_work.map(move |b| { // minor hack since we don't have access to the timestamp // that is actually set by the proposer. - let slot_after_building = slot_now(slot_duration); - if slot_after_building != Some(slot_num) { + let slot_after_building = SignedDuration::default().slot_now(slot_duration); + if slot_after_building != slot_num { info!( "Discarding proposal for slot {}; block production took too long", slot_num diff --git a/substrate/core/consensus/babe/primitives/src/lib.rs b/substrate/core/consensus/babe/primitives/src/lib.rs index cd056544e6a..f1d4a452d68 100644 --- a/substrate/core/consensus/babe/primitives/src/lib.rs +++ b/substrate/core/consensus/babe/primitives/src/lib.rs @@ -51,6 +51,15 @@ pub struct BabeConfiguration { /// /// Dynamic thresholds may be supported in the future. pub threshold: u64, + + /// The minimum number of blocks that must be received before running the + /// median algorithm to compute the offset between the on-chain time and the + /// local time. Currently, only the value provided by this type at genesis + /// will be used, but this is subject to change. + /// + /// Blocks less than `self.median_required_blocks` must be generated by an + /// *initial validator* ― that is, a node that was a validator at genesis. + pub median_required_blocks: u64, } #[cfg(feature = "std")] diff --git a/substrate/core/consensus/babe/src/lib.rs b/substrate/core/consensus/babe/src/lib.rs index 3e1f38077f8..0bab790c991 100644 --- a/substrate/core/consensus/babe/src/lib.rs +++ b/substrate/core/consensus/babe/src/lib.rs @@ -30,15 +30,20 @@ use digest::CompatibleDigestItem; pub use digest::{BabePreDigest, BABE_VRF_PREFIX}; pub use babe_primitives::*; pub use consensus_common::SyncOracle; +use consensus_common::import_queue::{ + SharedBlockImport, SharedJustificationImport, SharedFinalityProofImport, + SharedFinalityProofRequestBuilder, +}; use consensus_common::well_known_cache_keys::Id as CacheKeyId; use runtime_primitives::{generic, generic::{BlockId, OpaqueDigestItemId}, Justification}; use runtime_primitives::traits::{ Block, Header, DigestItemFor, ProvideRuntimeApi, - SimpleBitOps, + SimpleBitOps, Zero, }; -use std::{sync::Arc, u64, fmt::{Debug, Display}}; +use std::{sync::Arc, u64, fmt::{Debug, Display}, time::{Instant, Duration}}; use runtime_support::serde::{Serialize, Deserialize}; use parity_codec::{Decode, Encode}; +use parking_lot::Mutex; use primitives::{crypto::Pair, sr25519}; use merlin::Transcript; use inherents::{InherentDataProviders, InherentData}; @@ -77,7 +82,7 @@ use futures::{Future, IntoFuture, future}; use tokio_timer::Timeout; use log::{error, warn, debug, info, trace}; -use slots::{SlotWorker, SlotData, SlotInfo, SlotCompatible, slot_now}; +use slots::{SlotWorker, SlotData, SlotInfo, SlotCompatible, SignedDuration}; pub use babe_primitives::AuthorityId; @@ -332,8 +337,8 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w Box::new(proposal_work.map(move |b| { // minor hack since we don't have access to the timestamp // that is actually set by the proposer. - let slot_after_building = slot_now(slot_duration); - if slot_after_building != Some(slot_num) { + let slot_after_building = SignedDuration::default().slot_now(slot_duration); + if slot_after_building != slot_num { info!( target: "babe", "Discarding proposal for slot {}; block production took too long", @@ -512,7 +517,8 @@ fn check_header<B: Block + Sized, C: AuxStore>( pub struct BabeVerifier<C> { client: Arc<C>, inherent_data_providers: inherents::InherentDataProviders, - threshold: u64, + config: Config, + timestamps: Mutex<(Option<Duration>, Vec<(Instant, u64)>)>, } impl<C> BabeVerifier<C> { @@ -540,6 +546,38 @@ impl<C> BabeVerifier<C> { } } +fn median_algorithm( + median_required_blocks: u64, + slot_duration: u64, + slot_num: u64, + slot_now: u64, + timestamps: &mut (Option<Duration>, Vec<(Instant, u64)>), +) { + let num_timestamps = timestamps.1.len(); + if num_timestamps as u64 >= median_required_blocks && median_required_blocks > 0 { + let mut new_list: Vec<_> = timestamps.1.iter().map(|&(t, sl)| { + let offset: u128 = u128::from(slot_duration) + .checked_mul(1_000_000u128) // self.config.get() returns *milliseconds* + .and_then(|x| x.checked_mul(u128::from(slot_num).saturating_sub(u128::from(sl)))) + .expect("we cannot have timespans long enough for this to overflow; qed"); + const NANOS_PER_SEC: u32 = 1_000_000_000; + let nanos = (offset % u128::from(NANOS_PER_SEC)) as u32; + let secs = (offset / u128::from(NANOS_PER_SEC)) as u64; + t + Duration::new(secs, nanos) + }).collect(); + // FIXME #2926: use a selection algorithm instead of a full sorting algorithm. + new_list.sort_unstable(); + let &median = new_list + .get(num_timestamps / 2) + .expect("we have at least one timestamp, so this is a valid index; qed"); + timestamps.1.clear(); + // FIXME #2927: pass this to the block authoring logic somehow + timestamps.0.replace(Instant::now() - median); + } else { + timestamps.1.push((Instant::now(), slot_now)) + } +} + impl<B: Block, C> Verifier<B> for BabeVerifier<C> where C: ProvideRuntimeApi + Send + Sync + AuxStore + ProvideCache<B>, C::Api: BlockBuilderApi<B> + BabeApi<B>, @@ -582,7 +620,7 @@ impl<B: Block, C> Verifier<B> for BabeVerifier<C> where header, hash, &authorities[..], - self.threshold, + self.config.threshold(), )?; match checked_header { CheckedHeader::Checked(pre_header, (pre_digest, seal)) => { @@ -629,7 +667,13 @@ impl<B: Block, C> Verifier<B> for BabeVerifier<C> where auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, }; - + median_algorithm( + self.config.0.median_required_blocks, + self.config.get(), + slot_num, + slot_now, + &mut *self.timestamps.lock(), + ); // FIXME #1019 extract authorities Ok((import_block, maybe_keys)) } @@ -739,6 +783,72 @@ fn claim_slot( get_keypair(key).vrf_sign_n_check(transcript, |inout| check(inout, threshold)) } +fn initialize_authorities_cache<B, C>(client: &C) -> Result<(), ConsensusError> where + B: Block, + C: ProvideRuntimeApi + ProvideCache<B>, + C::Api: BabeApi<B>, +{ + // no cache => no initialization + let cache = match client.cache() { + Some(cache) => cache, + None => return Ok(()), + }; + + // check if we already have initialized the cache + let genesis_id = BlockId::Number(Zero::zero()); + let genesis_authorities: Option<Vec<AuthorityId>> = cache + .get_at(&well_known_cache_keys::AUTHORITIES, &genesis_id) + .and_then(|v| Decode::decode(&mut &v[..])); + if genesis_authorities.is_some() { + return Ok(()); + } + + let map_err = |error| consensus_common::Error::from(consensus_common::Error::ClientImport( + format!( + "Error initializing authorities cache: {}", + error, + ))); + let genesis_authorities = authorities(client, &genesis_id)?; + cache.initialize(&well_known_cache_keys::AUTHORITIES, genesis_authorities.encode()) + .map_err(map_err) +} + +/// Start an import queue for the Babe consensus algorithm. +pub fn import_queue<B, C, E>( + config: Config, + block_import: SharedBlockImport<B>, + justification_import: Option<SharedJustificationImport<B>>, + finality_proof_import: Option<SharedFinalityProofImport<B>>, + finality_proof_request_builder: Option<SharedFinalityProofRequestBuilder<B>>, + client: Arc<C>, + inherent_data_providers: InherentDataProviders, +) -> Result<BabeImportQueue<B>, consensus_common::Error> where + B: Block, + C: 'static + ProvideRuntimeApi + ProvideCache<B> + Send + Sync + AuxStore, + C::Api: BlockBuilderApi<B> + BabeApi<B>, + DigestItemFor<B>: CompatibleDigestItem, + E: 'static, +{ + register_babe_inherent_data_provider(&inherent_data_providers, config.get())?; + initialize_authorities_cache(&*client)?; + + let verifier = Arc::new( + BabeVerifier { + client: client, + inherent_data_providers, + timestamps: Default::default(), + config, + } + ); + Ok(BasicQueue::new( + verifier, + block_import, + justification_import, + finality_proof_import, + finality_proof_request_builder, + )) +} + #[cfg(test)] #[allow(dead_code, unused_imports, deprecated)] // FIXME #2532: need to allow deprecated until refactor is done @@ -753,7 +863,6 @@ mod tests { use network::test::{Block as TestBlock, PeersClient}; use runtime_primitives::traits::{Block as BlockT, DigestFor}; use network::config::ProtocolConfig; - use parking_lot::Mutex; use tokio::runtime::current_thread; use keyring::sr25519::Keyring; use super::generic::DigestItem; @@ -837,7 +946,8 @@ mod tests { Arc::new(BabeVerifier { client, inherent_data_providers, - threshold: config.threshold(), + config, + timestamps: Default::default(), }) } diff --git a/substrate/core/consensus/slots/src/lib.rs b/substrate/core/consensus/slots/src/lib.rs index 18ec2451beb..c26b6e2ff6c 100644 --- a/substrate/core/consensus/slots/src/lib.rs +++ b/substrate/core/consensus/slots/src/lib.rs @@ -25,7 +25,7 @@ mod slots; mod aux_schema; -pub use slots::{slot_now, SlotInfo, Slots}; +pub use slots::{SignedDuration, SlotInfo, Slots}; pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND}; use codec::{Decode, Encode}; diff --git a/substrate/core/consensus/slots/src/slots.rs b/substrate/core/consensus/slots/src/slots.rs index a848a967629..1bce98487ac 100644 --- a/substrate/core/consensus/slots/src/slots.rs +++ b/substrate/core/consensus/slots/src/slots.rs @@ -23,29 +23,44 @@ use consensus_common::Error; use futures::prelude::*; use futures::try_ready; use inherents::{InherentData, InherentDataProviders}; -use log::warn; + use std::marker::PhantomData; use std::time::{Duration, Instant}; use tokio_timer::Delay; /// Returns current duration since unix epoch. -pub fn duration_now() -> Option<Duration> { +pub fn duration_now() -> Duration { use std::time::SystemTime; - let now = SystemTime::now(); - now.duration_since(SystemTime::UNIX_EPOCH) - .map_err(|e| { - warn!( - "Current time {:?} is before unix epoch. Something is wrong: {:?}", - now, e - ); - }) - .ok() + now.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_else(|e| panic!( + "Current time {:?} is before unix epoch. Something is wrong: {:?}", + now, + e, + )) +} + + +/// A `Duration` with a sign (before or after). Immutable. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct SignedDuration { + offset: Duration, + is_positive: bool, } -/// Get the slot for now. -pub fn slot_now(slot_duration: u64) -> Option<u64> { - duration_now().map(|s| s.as_secs() / slot_duration) +impl SignedDuration { + /// Construct a `SignedDuration` + pub fn new(offset: Duration, is_positive: bool) -> Self { + Self { offset, is_positive } + } + + /// Get the slot for now. Panics if `slot_duration` is 0. + pub fn slot_now(&self, slot_duration: u64) -> u64 { + if self.is_positive { + duration_now() + self.offset + } else { + duration_now() - self.offset + }.as_secs() / slot_duration + } } /// Returns the duration until the next slot, based on current duration since @@ -112,11 +127,7 @@ impl<SC: SlotCompatible> Stream for Slots<SC> { self.inner_delay = match self.inner_delay.take() { None => { // schedule wait. - let wait_until = match duration_now() { - None => return Ok(Async::Ready(None)), - Some(now) => Instant::now() + time_until_next(now, slot_duration), - }; - + let wait_until = Instant::now() + time_until_next(duration_now(), slot_duration); Some(Delay::new(wait_until)) } Some(d) => Some(d), diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 9931826ae2f..58d8a91c2e5 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -517,7 +517,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { use_tokio: bool, network_to_protocol_sender: mpsc::UnboundedSender<FromNetworkMsg<Block>>, protocol_sender: mpsc::UnboundedSender<ProtocolMsg<Block, S>>, - network_sender: mpsc::UnboundedSender<NetworkMsg<Block>>, + _network_sender: mpsc::UnboundedSender<NetworkMsg<Block>>, network_port: mpsc::UnboundedReceiver<NetworkMsg<Block>>, data: D, ) -> Self { diff --git a/substrate/core/test-runtime/src/lib.rs b/substrate/core/test-runtime/src/lib.rs index 7a68439a07b..0916cad93d0 100644 --- a/substrate/core/test-runtime/src/lib.rs +++ b/substrate/core/test-runtime/src/lib.rs @@ -470,6 +470,7 @@ cfg_if! { slot_duration: 1, expected_block_time: 1, threshold: std::u64::MAX, + median_required_blocks: 100, } } fn authorities() -> Vec<BabeId> { system::authorities() } @@ -611,6 +612,7 @@ cfg_if! { impl consensus_babe::BabeApi<Block> for Runtime { fn startup_data() -> consensus_babe::BabeConfiguration { consensus_babe::BabeConfiguration { + median_required_blocks: 0, slot_duration: 1, expected_block_time: 1, threshold: core::u64::MAX, -- GitLab