Unverified Commit a06b9def authored by Gavin Wood's avatar Gavin Wood Committed by GitHub
Browse files

Update Substrate (#661)

* Make compat with exit-future updates

* Update exit-future entirely

* Tidy

* Bump Substrate
parent 84ece424
Pipeline #71032 passed with stages
in 26 minutes and 59 seconds
This diff is collapsed.
...@@ -15,7 +15,7 @@ log = "0.4.8" ...@@ -15,7 +15,7 @@ log = "0.4.8"
futures01 = "0.1.17" futures01 = "0.1.17"
futures = { package = "futures", version = "0.3.1", features = ["compat"] } futures = { package = "futures", version = "0.3.1", features = ["compat"] }
tokio = "0.1.7" tokio = "0.1.7"
exit-future = "0.1" exit-future = "0.2.0"
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] } codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
......
...@@ -37,9 +37,8 @@ use polkadot_primitives::parachain::{ ...@@ -37,9 +37,8 @@ use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId, CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, AvailableMessages, BlockData, ErasureChunk, ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
}; };
use futures01::Future;
use futures::channel::{mpsc, oneshot}; use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, Sink, SinkExt, TryFutureExt, StreamExt}; use futures::{FutureExt, Sink, SinkExt, TryFutureExt, StreamExt, future::select};
use keystore::KeyStorePtr; use keystore::KeyStorePtr;
use tokio::runtime::current_thread::{Handle, Runtime as LocalRuntime}; use tokio::runtime::current_thread::{Handle, Runtime as LocalRuntime};
...@@ -166,7 +165,7 @@ impl WorkerHandle { ...@@ -166,7 +165,7 @@ impl WorkerHandle {
impl Drop for WorkerHandle { impl Drop for WorkerHandle {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() { if let Some(signal) = self.exit_signal.take() {
signal.fire(); let _ = signal.fire();
} }
if let Some(thread) = self.thread.take() { if let Some(thread) = self.thread.take() {
...@@ -296,7 +295,7 @@ where ...@@ -296,7 +295,7 @@ where
impl<PGM> Drop for Worker<PGM> { impl<PGM> Drop for Worker<PGM> {
fn drop(&mut self) { fn drop(&mut self) {
for (_, signal) in self.registered_gossip_streams.drain() { for (_, signal) in self.registered_gossip_streams.drain() {
signal.fire(); let _ = signal.fire();
} }
} }
} }
...@@ -356,13 +355,10 @@ where ...@@ -356,13 +355,10 @@ where
self.registered_gossip_streams.insert(topic, signal); self.registered_gossip_streams.insert(topic, signal);
let _ = runtime_handle.spawn( let _ = runtime_handle.spawn(
fut select(fut.boxed(), exit)
.unit_error() .map(|_| Ok(()))
.boxed()
.compat() .compat()
.select(exit) );
.then(|_| Ok(()))
);
Ok(()) Ok(())
} }
...@@ -423,7 +419,7 @@ where ...@@ -423,7 +419,7 @@ where
let topic = erasure_coding_topic(relay_parent, receipt.erasure_root, chunk.index); let topic = erasure_coding_topic(relay_parent, receipt.erasure_root, chunk.index);
// need to remove gossip listener and stop it. // need to remove gossip listener and stop it.
if let Some(signal) = self.registered_gossip_streams.remove(&topic) { if let Some(signal) = self.registered_gossip_streams.remove(&topic) {
signal.fire(); let _ = signal.fire();
} }
} }
...@@ -594,15 +590,12 @@ where ...@@ -594,15 +590,12 @@ where
}; };
runtime.spawn( runtime.spawn(
process_notification futures::future::select(process_notification.boxed(), exit.clone())
.unit_error() .map(|_| Ok(()))
.boxed()
.compat() .compat()
.select(exit.clone())
.then(|_| Ok(()))
); );
if let Err(e) = runtime.block_on(exit) { if let Err(e) = runtime.block_on(exit.unit_error().compat()) {
warn!(target: LOG_TARGET, "Availability worker error {:?}", e); warn!(target: LOG_TARGET, "Availability worker error {:?}", e);
} }
...@@ -636,7 +629,7 @@ pub struct AvailabilityBlockImport<I, P> { ...@@ -636,7 +629,7 @@ pub struct AvailabilityBlockImport<I, P> {
impl<I, P> Drop for AvailabilityBlockImport<I, P> { impl<I, P> Drop for AvailabilityBlockImport<I, P> {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() { if let Some(signal) = self.exit_signal.take() {
signal.fire(); let _ = signal.fire();
} }
} }
} }
...@@ -775,12 +768,10 @@ impl<I, P> AvailabilityBlockImport<I, P> { ...@@ -775,12 +768,10 @@ impl<I, P> AvailabilityBlockImport<I, P> {
// dependent on the types of client and executor, which would prove // dependent on the types of client and executor, which would prove
// not not so handy in the testing code. // not not so handy in the testing code.
let mut exit_signal = Some(signal); let mut exit_signal = Some(signal);
let prune_available = prune_unneeded_availability(client.clone(), to_worker.clone()) let prune_available = select(
.unit_error() prune_unneeded_availability(client.clone(), to_worker.clone()).boxed(),
.boxed() exit.clone()
.compat() ).map(|_| Ok(())).compat();
.select(exit.clone())
.then(|_| Ok(()));
if let Err(_) = thread_pool.execute(Box::new(prune_available)) { if let Err(_) = thread_pool.execute(Box::new(prune_available)) {
error!(target: LOG_TARGET, "Failed to spawn availability pruning task"); error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
......
...@@ -51,7 +51,7 @@ use std::time::Duration; ...@@ -51,7 +51,7 @@ use std::time::Duration;
use futures::{ use futures::{
future, Future, Stream, FutureExt, TryFutureExt, StreamExt, future, Future, Stream, FutureExt, TryFutureExt, StreamExt,
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt} compat::{Future01CompatExt, Stream01CompatExt}
}; };
use futures01::{Future as _}; use futures01::{Future as _};
use log::{warn, error}; use log::{warn, error};
...@@ -248,7 +248,7 @@ struct ApiContext<P, E> { ...@@ -248,7 +248,7 @@ struct ApiContext<P, E> {
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
P: ProvideRuntimeApi + Send + Sync, P: ProvideRuntimeApi + Send + Sync,
P::Api: ParachainHost<Block>, P::Api: ParachainHost<Block>,
E: futures01::Future<Item=(),Error=()> + Clone + Send + Sync + 'static, E: futures::Future<Output=()> + Clone + Send + Sync + 'static,
{ {
type Error = String; type Error = String;
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>; type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>;
...@@ -277,11 +277,11 @@ struct CollationNode<P, E> { ...@@ -277,11 +277,11 @@ struct CollationNode<P, E> {
} }
impl<P, E> IntoExit for CollationNode<P, E> where impl<P, E> IntoExit for CollationNode<P, E> where
E: futures01::Future<Item=(),Error=()> + Unpin + Send + 'static E: futures::Future<Output=()> + Unpin + Send + 'static
{ {
type Exit = future::Map<Compat01As03<E>, fn (Result<(), ()>) -> ()>; type Exit = E;
fn into_exit(self) -> Self::Exit { fn into_exit(self) -> Self::Exit {
self.exit.compat().map(drop) self.exit
} }
} }
...@@ -289,7 +289,7 @@ impl<P, E> Worker for CollationNode<P, E> where ...@@ -289,7 +289,7 @@ impl<P, E> Worker for CollationNode<P, E> where
P: BuildParachainContext + Send + 'static, P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static, P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static, <P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
E: futures01::Future<Item=(),Error=()> + Clone + Unpin + Send + Sync + 'static, E: futures::Future<Output=()> + Clone + Unpin + Send + Sync + 'static,
{ {
type Work = Box<dyn Future<Output=()> + Unpin + Send>; type Work = Box<dyn Future<Output=()> + Unpin + Send>;
...@@ -433,7 +433,8 @@ impl<P, E> Worker for CollationNode<P, E> where ...@@ -433,7 +433,8 @@ impl<P, E> Worker for CollationNode<P, E> where
outgoing, outgoing,
); );
tokio::spawn(res.select(inner_exit_2.clone()).then(|_| Ok(()))); let exit = inner_exit_2.clone().unit_error().compat();
tokio::spawn(res.select(exit).then(|_| Ok(())));
}) })
}); });
...@@ -454,17 +455,15 @@ impl<P, E> Worker for CollationNode<P, E> where ...@@ -454,17 +455,15 @@ impl<P, E> Worker for CollationNode<P, E> where
let future = future::select( let future = future::select(
silenced, silenced,
inner_exit.clone().map(|_| Ok::<_, ()>(())).compat() inner_exit.clone()
).map(|_| Ok::<_, ()>(())).compat(); ).map(|_| Ok::<_, ()>(())).compat();
tokio::spawn(future); tokio::spawn(future);
future::ready(()) future::ready(())
}); });
let work_and_exit = future::select( let work_and_exit = future::select(work, exit)
work, .map(|_| ());
exit.map(|_| Ok::<_, ()>(())).compat()
).map(|_| ());
Box::new(work_and_exit) Box::new(work_and_exit)
} }
...@@ -495,7 +494,7 @@ pub fn run_collator<P, E>( ...@@ -495,7 +494,7 @@ pub fn run_collator<P, E>(
P: BuildParachainContext + Send + 'static, P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static, P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static, <P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
E: futures01::Future<Item = (),Error=()> + Unpin + Send + Clone + Sync + 'static, E: futures::Future<Output = ()> + Unpin + Send + Clone + Sync + 'static,
{ {
let node_logic = CollationNode { build_parachain_context, exit, para_id, key }; let node_logic = CollationNode { build_parachain_context, exit, para_id, key };
polkadot_cli::run(node_logic, version) polkadot_cli::run(node_logic, version)
......
...@@ -19,7 +19,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkad ...@@ -19,7 +19,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkad
futures = "0.1" futures = "0.1"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] } futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
log = "0.4.8" log = "0.4.8"
exit-future = "0.1.4" exit-future = "0.2.0"
sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
......
...@@ -130,7 +130,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w ...@@ -130,7 +130,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>, P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: NetworkService, N: NetworkService,
T: Clone + Executor + Send + 'static, T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static, E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
{ {
/// Import a statement whose signature has been checked already. /// Import a statement whose signature has been checked already.
pub(crate) fn import_statement(&self, statement: SignedStatement) { pub(crate) fn import_statement(&self, statement: SignedStatement) {
...@@ -174,7 +174,8 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w ...@@ -174,7 +174,8 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) { if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion"); trace!(target: "validation", "driving statement work to completion");
let work = work.select2(self.fetcher.exit().clone()).then(|_| Ok(())); let exit = self.fetcher.exit().clone().unit_error().compat();
let work = work.select2(exit).then(|_| Ok(()));
self.fetcher.executor().spawn(work); self.fetcher.executor().spawn(work);
} }
} }
...@@ -224,7 +225,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh ...@@ -224,7 +225,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
P::Api: ParachainHost<Block>, P::Api: ParachainHost<Block>,
N: NetworkService, N: NetworkService,
T: Clone + Executor + Send + 'static, T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static, E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
{ {
type Error = io::Error; type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver; type FetchValidationProof = validation::PoVReceiver;
......
...@@ -33,6 +33,7 @@ use polkadot_primitives::parachain::{ ...@@ -33,6 +33,7 @@ use polkadot_primitives::parachain::{
use futures::prelude::*; use futures::prelude::*;
use futures::future::{self, Executor as FutureExecutor}; use futures::future::{self, Executor as FutureExecutor};
use futures::sync::oneshot::{self, Receiver}; use futures::sync::oneshot::{self, Receiver};
use futures03::{FutureExt as _, TryFutureExt as _};
use std::collections::hash_map::{HashMap, Entry}; use std::collections::hash_map::{HashMap, Entry};
use std::io; use std::io;
...@@ -123,7 +124,7 @@ impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> { ...@@ -123,7 +124,7 @@ impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static, P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>, P::Api: ParachainHost<Block>,
E: Clone + Future<Item=(),Error=()> + Send + Sync + 'static, E: Clone + futures03::Future<Output=()> + Send + Sync + 'static,
N: NetworkService, N: NetworkService,
T: Clone + Executor + Send + Sync + 'static, T: Clone + Executor + Send + Sync + 'static,
{ {
...@@ -206,7 +207,7 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService { ...@@ -206,7 +207,7 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static, P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>, P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
E: Clone + Future<Item=(),Error=()> + Send + Sync + 'static, E: Clone + futures03::Future<Output=()> + Send + Sync + Unpin + 'static,
N: NetworkService, N: NetworkService,
T: Clone + Executor + Send + Sync + 'static, T: Clone + Executor + Send + Sync + 'static,
{ {
...@@ -242,8 +243,12 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where ...@@ -242,8 +243,12 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let table_router_clone = table_router.clone(); let table_router_clone = table_router.clone();
let work = table_router.checked_statements() let work = table_router.checked_statements()
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) }); .for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) })
executor.spawn(work.select(exit.clone()).map(|_| ()).map_err(|_| ())); .select(exit.clone().unit_error().compat())
.map(|_| ())
.map_err(|_| ());
executor.spawn(work);
table_router table_router
}); });
...@@ -670,7 +675,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where ...@@ -670,7 +675,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
P::Api: ParachainHost<Block>, P::Api: ParachainHost<Block>,
N: NetworkService, N: NetworkService,
T: Clone + Executor + Send + 'static, T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static, E: futures03::Future<Output=()> + Clone + Send + 'static,
{ {
/// Fetch PoV block for the given candidate receipt. /// Fetch PoV block for the given candidate receipt.
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver { pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
......
...@@ -31,7 +31,7 @@ sc-executor = { git = "https://github.com/paritytech/substrate", branch = "polka ...@@ -31,7 +31,7 @@ sc-executor = { git = "https://github.com/paritytech/substrate", branch = "polka
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
grandpa = { package = "sc-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } grandpa = { package = "sc-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
grandpa_primitives = { package = "sp-finality-granpda", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } grandpa_primitives = { package = "sp-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
inherents = { package = "sp-inherents", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } inherents = { package = "sp-inherents", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
telemetry = { package = "sc-telemetry", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } telemetry = { package = "sc-telemetry", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
......
...@@ -15,4 +15,4 @@ client-api = { package = "sc-client-api", git = "https://github.com/paritytech/s ...@@ -15,4 +15,4 @@ client-api = { package = "sc-client-api", git = "https://github.com/paritytech/s
parking_lot = "0.9.0" parking_lot = "0.9.0"
ctrlc = { version = "3.1.3", features = ["termination"] } ctrlc = { version = "3.1.3", features = ["termination"] }
futures = "0.3.1" futures = "0.3.1"
exit-future = "0.1.4" exit-future = "0.2.0"
...@@ -145,7 +145,7 @@ fn main() { ...@@ -145,7 +145,7 @@ fn main() {
let exit_send_cell = RefCell::new(Some(exit_send)); let exit_send_cell = RefCell::new(Some(exit_send));
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
if let Some(exit_send) = exit_send_cell.try_borrow_mut().expect("signal handler not reentrant; qed").take() { if let Some(exit_send) = exit_send_cell.try_borrow_mut().expect("signal handler not reentrant; qed").take() {
exit_send.fire(); let _ = exit_send.fire();
} }
}).expect("Error setting up ctrl-c handler"); }).expect("Error setting up ctrl-c handler");
......
...@@ -13,7 +13,7 @@ parking_lot = "0.9.0" ...@@ -13,7 +13,7 @@ parking_lot = "0.9.0"
tokio = "0.1.22" tokio = "0.1.22"
derive_more = "0.14.1" derive_more = "0.14.1"
log = "0.4.8" log = "0.4.8"
exit-future = "0.1.4" exit-future = "0.2.0"
tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] } tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] } codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
availability_store = { package = "polkadot-availability-store", path = "../availability-store" } availability_store = { package = "polkadot-availability-store", path = "../availability-store" }
......
...@@ -30,7 +30,7 @@ use sp_blockchain::HeaderBackend; ...@@ -30,7 +30,7 @@ use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi; use block_builder::BlockBuilderApi;
use consensus::SelectChain; use consensus::SelectChain;
use futures::prelude::*; use futures::prelude::*;
use futures03::{TryStreamExt as _, StreamExt as _}; use futures03::{TryStreamExt as _, StreamExt as _, FutureExt as _, TryFutureExt as _};
use log::error; use log::error;
use polkadot_primitives::Block; use polkadot_primitives::Block;
use polkadot_primitives::parachain::ParachainHost; use polkadot_primitives::parachain::ParachainHost;
...@@ -107,7 +107,7 @@ pub(crate) fn start<C, N, P, SC>( ...@@ -107,7 +107,7 @@ pub(crate) fn start<C, N, P, SC>(
} }
Ok(()) Ok(())
}) })
.select(exit.clone()) .select(exit.clone().unit_error().compat())
.then(|_| Ok(())) .then(|_| Ok(()))
}; };
...@@ -130,7 +130,7 @@ pub(crate) fn start<C, N, P, SC>( ...@@ -130,7 +130,7 @@ pub(crate) fn start<C, N, P, SC>(
} }
}) })
.map_err(|e| warn!("Timer error {:?}", e)) .map_err(|e| warn!("Timer error {:?}", e))
.select(exit.clone()) .select(exit.clone().unit_error().compat())
.then(|_| Ok(())) .then(|_| Ok(()))
}; };
...@@ -139,7 +139,7 @@ pub(crate) fn start<C, N, P, SC>( ...@@ -139,7 +139,7 @@ pub(crate) fn start<C, N, P, SC>(
error!("Failed to spawn old sessions pruning task"); error!("Failed to spawn old sessions pruning task");
} }
if let Err(e) = runtime.block_on(exit) { if let Err(e) = runtime.block_on(exit.unit_error().compat()) {
debug!("BFT event loop error {:?}", e); debug!("BFT event loop error {:?}", e);
} }
}); });
...@@ -153,7 +153,7 @@ pub(crate) fn start<C, N, P, SC>( ...@@ -153,7 +153,7 @@ pub(crate) fn start<C, N, P, SC>(
impl Drop for ServiceHandle { impl Drop for ServiceHandle {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() { if let Some(signal) = self.exit_signal.take() {
signal.fire(); let _ = signal.fire();
} }
if let Some(thread) = self.thread.take() { if let Some(thread) = self.thread.take() {
......
...@@ -456,7 +456,7 @@ impl<C, N, P> ParachainValidation<C, N, P> where ...@@ -456,7 +456,7 @@ impl<C, N, P> ParachainValidation<C, N, P> where
}) })
.and_then(with_router) .and_then(with_router)
.then(|_| Ok(())) .then(|_| Ok(()))
.select(exit) .select(exit.unit_error().compat())
.then(|_| Ok(())); .then(|_| Ok(()));
// spawn onto thread pool. // spawn onto thread pool.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment