From f8bf17dc4905b8ae98e9bde7ca7bc5418ea8dca5 Mon Sep 17 00:00:00 2001 From: Wei Tang <hi@that.world> Date: Wed, 27 Nov 2019 17:44:34 +0100 Subject: [PATCH] Make block proposing remaining duration configurable (#4215) * Make proposing remaining duration configurable * Pass chain_head to proposing_remaining_duration and change default --- substrate/client/consensus/slots/src/lib.rs | 77 +++++++++++-------- substrate/client/consensus/slots/src/slots.rs | 16 +--- 2 files changed, 51 insertions(+), 42 deletions(-) diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs index a26f8ec6b9f..1e770b5dd8c 100644 --- a/substrate/client/consensus/slots/src/lib.rs +++ b/substrate/client/consensus/slots/src/lib.rs @@ -38,7 +38,7 @@ use inherents::{InherentData, InherentDataProviders}; use log::{debug, error, info, warn}; use sr_primitives::generic::BlockId; use sr_primitives::traits::{ApiRef, Block as BlockT, Header, ProvideRuntimeApi}; -use std::{fmt::Debug, ops::Deref, pin::Pin, sync::Arc}; +use std::{fmt::Debug, ops::Deref, pin::Pin, sync::Arc, time::{Instant, Duration}}; use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO}; use parking_lot::Mutex; use client_api; @@ -113,6 +113,25 @@ pub trait SimpleSlotWorker<B: BlockT> { /// Returns a `Proposer` to author on top of the given block. fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, consensus_common::Error>; + /// Remaining duration of the slot. + fn slot_remaining_duration(&self, slot_info: &SlotInfo) -> Duration { + let now = Instant::now(); + if now < slot_info.ends_at { + slot_info.ends_at.duration_since(now) + } else { + Duration::from_millis(0) + } + } + + /// Remaining duration for proposing. None means unlimited. + fn proposing_remaining_duration( + &self, + _head: &B::Header, + slot_info: &SlotInfo + ) -> Option<Duration> { + Some(self.slot_remaining_duration(slot_info)) + } + /// Implements the `on_slot` functionality from `SlotWorker`. fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Pin<Box<dyn Future<Output = Result<(), consensus_common::Error>> + Send>> where @@ -192,45 +211,43 @@ pub trait SimpleSlotWorker<B: BlockT> { }, }; - let remaining_duration = slot_info.remaining_duration(); + let slot_remaining_duration = self.slot_remaining_duration(&slot_info); + let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info); let logs = self.pre_digest_data(slot_number, &claim); // deadline our production to approx. the end of the slot - let proposal_work = futures::future::select( - proposer.propose( - slot_info.inherent_data, - sr_primitives::generic::Digest { - logs, + let proposing = proposer.propose( + slot_info.inherent_data, + sr_primitives::generic::Digest { + logs, + }, + slot_remaining_duration, + ).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e))); + let delay: Box<dyn Future<Output=()> + Unpin + Send> = match proposing_remaining_duration { + Some(r) => Box::new(Delay::new(r)), + None => Box::new(future::pending()), + }; + + let proposal_work = + Box::new(futures::future::select(proposing, delay).map(move |v| match v { + futures::future::Either::Left((b, _)) => b.map(|b| (b, claim)), + futures::future::Either::Right(_) => { + info!("Discarding proposal for slot {}; block production took too long", slot_number); + // If the node was compiled with debug, tell the user to use release optimizations. + #[cfg(build_type="debug")] + info!("Recompile your node in `--release` mode to mitigate this problem."); + telemetry!(CONSENSUS_INFO; "slots.discarding_proposal_took_too_long"; + "slot" => slot_number, + ); + Err(consensus_common::Error::ClientImport("Timeout in the Slots proposer".into())) }, - remaining_duration, - ).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e))), - Delay::new(remaining_duration) - ).map(|v| match v { - futures::future::Either::Left((b, _)) => b.map(|b| (b, claim)), - futures::future::Either::Right(_) => - Err(consensus_common::Error::ClientImport("Timeout in the Slots proposer".into())), - }); + })); let block_import_params_maker = self.block_import_params(); let block_import = self.block_import(); let logging_target = self.logging_target(); Box::pin(proposal_work.map_ok(move |(block, claim)| { - // minor hack since we don't have access to the timestamp - // that is actually set by the proposer. - let slot_after_building = SignedDuration::default().slot_now(slot_duration); - if slot_after_building != slot_number { - info!("Discarding proposal for slot {}; block production took too long", slot_number); - // If the node was compiled with debug, tell the user to use release optimizations. - #[cfg(build_type="debug")] - info!("Recompile your node in `--release` mode to mitigate this problem."); - telemetry!(CONSENSUS_INFO; "slots.discarding_proposal_took_too_long"; - "slot" => slot_number, - ); - - return; - } - let (header, body) = block.deconstruct(); let header_num = *header.number(); let header_hash = header.hash(); diff --git a/substrate/client/consensus/slots/src/slots.rs b/substrate/client/consensus/slots/src/slots.rs index 0166d09bb74..640b24ec1cd 100644 --- a/substrate/client/consensus/slots/src/slots.rs +++ b/substrate/client/consensus/slots/src/slots.rs @@ -71,6 +71,8 @@ pub fn time_until_next(now: Duration, slot_duration: u64) -> Duration { pub struct SlotInfo { /// The slot number. pub number: u64, + /// The last slot number produced. + pub last_number: u64, /// Current timestamp. pub timestamp: u64, /// The instant at which the slot ends. @@ -81,18 +83,6 @@ pub struct SlotInfo { pub duration: u64, } -impl SlotInfo { - /// Yields the remaining duration in the slot. - pub fn remaining_duration(&self) -> Duration { - let now = Instant::now(); - if now < self.ends_at { - self.ends_at.duration_since(now) - } else { - Duration::from_millis(0) - } - } -} - /// A stream that returns every time there is a new slot. pub(crate) struct Slots<SC> { last_slot: u64, @@ -160,11 +150,13 @@ impl<SC: SlotCompatible + Unpin> Stream for Slots<SC> { // never yield the same slot twice. if slot_num > self.last_slot { + let last_slot = self.last_slot; self.last_slot = slot_num; break Poll::Ready(Some(Ok(SlotInfo { number: slot_num, duration: self.slot_duration, + last_number: last_slot, timestamp, ends_at, inherent_data, -- GitLab