diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs index 02f6c15b79492edf4eced4ef1ea4daeba3176ad4..9819e1487d066f6f4cb5071b35c76acae8b46722 100644 --- a/substrate/client/consensus/slots/src/lib.rs +++ b/substrate/client/consensus/slots/src/lib.rs @@ -122,6 +122,19 @@ pub trait SimpleSlotWorker<B: BlockT> { let (timestamp, slot_number, slot_duration) = (slot_info.timestamp, slot_info.number, slot_info.duration); + { + let slot_now = SignedDuration::default().slot_now(slot_duration); + if slot_now > slot_number { + // if this is behind, return. + debug!(target: self.logging_target(), + "Skipping proposal slot {} since our current view is {}", + slot_number, slot_now, + ); + + return Box::pin(future::ready(Ok(()))); + } + } + let epoch_data = match self.epoch_data(&chain_head, slot_number) { Ok(epoch_data) => epoch_data, Err(err) => { diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 049dc5d92e897236e992425acc745f7b58b81472..03eea9eb3f3065a240f107785dd6baa46a7a8fcb 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -49,7 +49,7 @@ use sr_primitives::traits::{ use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use std::{ io::{Read, Write, Seek}, - marker::PhantomData, sync::Arc, sync::atomic::AtomicBool, time::SystemTime + marker::PhantomData, sync::Arc, time::SystemTime }; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use tel::{telemetry, SUBSTRATE_INFO}; @@ -823,6 +823,9 @@ ServiceBuilder< let (to_spawn_tx, to_spawn_rx) = mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>(); + // A side-channel for essential tasks to communicate shutdown. + let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded(); + let import_queue = Box::new(import_queue); let chain_info = client.info().chain; @@ -1163,7 +1166,7 @@ ServiceBuilder< let _ = to_spawn_tx.unbounded_send(Box::new(future)); } - + // Instrumentation if let Some(tracing_targets) = config.tracing_targets.as_ref() { let subscriber = substrate_tracing::ProfilingSubscriber::new( @@ -1183,7 +1186,8 @@ ServiceBuilder< transaction_pool, exit, signal: Some(signal), - essential_failed: Arc::new(AtomicBool::new(false)), + essential_failed_tx, + essential_failed_rx, to_spawn_tx, to_spawn_rx, to_poll: Vec::new(), diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index dd982e5a2ea74803662485d73deaa29078dc835c..6ddc6ab797d59527db968967cc0da28dbbae6d75 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -31,7 +31,6 @@ use std::io; use std::marker::PhantomData; use std::net::SocketAddr; use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; use futures::sync::mpsc; use parking_lot::Mutex; @@ -85,9 +84,11 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> { exit: exit_future::Exit, /// A signal that makes the exit future above resolve, fired on service drop. signal: Option<Signal>, - /// Set to `true` when a spawned essential task has failed. The next time + /// Send a signal when a spawned essential task has concluded. The next time /// the service future is polled it should complete with an error. - essential_failed: Arc<AtomicBool>, + essential_failed_tx: mpsc::UnboundedSender<()>, + /// A receiver for spawned essential-tasks concluding. + essential_failed_rx: mpsc::UnboundedReceiver<()>, /// Sender for futures that must be spawned as background tasks. to_spawn_tx: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>, /// Receiver for futures that must be spawned as background tasks. @@ -239,12 +240,12 @@ where } fn spawn_essential_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) { - let essential_failed = self.essential_failed.clone(); + let essential_failed = self.essential_failed_tx.clone(); let essential_task = std::panic::AssertUnwindSafe(task) .catch_unwind() .then(move |_| { error!("Essential task failed. Shutting down service."); - essential_failed.store(true, Ordering::Relaxed); + let _ = essential_failed.send(()); Ok(()) }); let task = essential_task.select(self.on_exit()).then(|_| Ok(())); @@ -297,8 +298,13 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for type Error = Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.essential_failed.load(Ordering::Relaxed) { - return Err(Error::Other("Essential task failed.".into())); + match self.essential_failed_rx.poll() { + Ok(Async::NotReady) => {}, + Ok(Async::Ready(_)) | Err(_) => { + // Ready(None) should not be possible since we hold a live + // sender. + return Err(Error::Other("Essential task failed.".into())); + } } while let Ok(Async::Ready(Some(task_to_spawn))) = self.to_spawn_rx.poll() {