diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index f22e20930c0fd83e009719d63c0f60862e9c6529..c9692cbde22c8f0104171b96bf626c1684d90651 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -179,16 +179,15 @@ pub fn start_aura_thread<B, C, E, I, SO, Error>( } }; - runtime.spawn(start_aura( + let _ = runtime.block_on(start_aura( slot_duration, local_key, client, block_import, env, sync_oracle, + on_exit, )); - - runtime.block_on(on_exit).expect("Exit future should not fail"); }); } @@ -200,6 +199,7 @@ pub fn start_aura<B, C, E, I, SO, Error>( block_import: Arc<I>, env: Arc<E>, sync_oracle: SO, + on_exit: impl Future<Item=(),Error=()>, ) -> impl Future<Item=(),Error=()> where B: Block, C: Authorities<B> + ChainHead<B>, @@ -352,7 +352,7 @@ pub fn start_aura<B, C, E, I, SO, Error>( }) }; - future::loop_fn((), move |()| { + let work = future::loop_fn((), move |()| { let authorship_task = ::std::panic::AssertUnwindSafe(make_authorship()); authorship_task.catch_unwind().then(|res| { match res { @@ -369,7 +369,9 @@ pub fn start_aura<B, C, E, I, SO, Error>( Ok(future::Loop::Continue(())) }) - }) + }); + + work.select(on_exit).then(|_| Ok(())) } // a header which has been checked @@ -760,6 +762,7 @@ mod tests { client, environ.clone(), DummyOracle, + futures::empty(), ); runtime.spawn(aura); diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index dfbea1fe8135a9a046882c8b4c610cd1ad681ea6..3e699ba3823ae085bff87eefa321d98e702082d0 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -1186,6 +1186,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>( config: Config, link: LinkHalf<B, E, Block, RA>, network: N, + on_exit: impl Future<Item=(),Error=()> + Send + 'static, ) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where Block::Hash: Ord, B: Backend<Block, Blake2Hasher> + 'static, @@ -1312,5 +1313,5 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>( })) }).map_err(|e| warn!("GRANDPA Voter failed: {:?}", e)); - Ok(voter_work) + Ok(voter_work.select(on_exit).then(|_| Ok(()))) } diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index d01a48d62f3b749f4e54172c0a81344723de53be..de6c4f1cb3977a85b8d893cf70588ab68b8380d2 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -376,6 +376,7 @@ fn finalize_3_voters_no_observers() { }, link, MessageRouting::new(net.clone(), peer_id), + futures::empty(), ).expect("all in order with client and network"); assert_send(&voter); @@ -436,6 +437,7 @@ fn finalize_3_voters_1_observer() { }, link, MessageRouting::new(net.clone(), peer_id), + futures::empty(), ).expect("all in order with client and network"); runtime.spawn(voter); @@ -592,6 +594,7 @@ fn transition_3_voters_twice_1_observer() { }, link, MessageRouting::new(net.clone(), peer_id), + futures::empty(), ).expect("all in order with client and network"); runtime.spawn(voter); diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 04d8e8582ae50310e2cf3d255aaaa725c7680a87..f316b49250d99d2ac4fee8036c5ef12e50e8c8b9 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -107,7 +107,7 @@ pub struct Service<Components: components::Components> { /// Configuration of this Service pub config: FactoryFullConfiguration<Components::Factory>, _rpc: Box<::std::any::Any + Send + Sync>, - _telemetry: Option<tel::Telemetry>, + _telemetry: Option<Arc<tel::Telemetry>>, } /// Creates bare client without any networking. @@ -263,7 +263,7 @@ impl<Components: components::Components> Service<Components> { let impl_name = config.impl_name.to_owned(); let version = version.clone(); let chain_name = config.chain_spec.name().to_owned(); - Some(tel::init_telemetry(tel::TelemetryConfig { + Some(Arc::new(tel::init_telemetry(tel::TelemetryConfig { url: url, on_connect: Box::new(move || { telemetry!("system.connected"; @@ -276,7 +276,7 @@ impl<Components: components::Components> Service<Components> { "authority" => is_authority ); }), - })) + }))) }, None => None, }; @@ -306,6 +306,10 @@ impl<Components: components::Components> Service<Components> { None } } + + pub fn telemetry(&self) -> Option<Arc<tel::Telemetry>> { + self._telemetry.as_ref().map(|t| t.clone()) + } } impl<Components> Service<Components> where Components: components::Components { diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index d5aafbe675606995d6ddef704e3481394c2e20d6..4335bffe86551eefdf240c7652468edaae058d69 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -33,7 +33,7 @@ use std::iter; use std::sync::Arc; use std::net::Ipv4Addr; use std::time::Duration; -use futures::Stream; +use futures::{Future, Stream}; use tempdir::TempDir; use tokio::runtime::Runtime; use tokio::timer::Interval; @@ -188,7 +188,7 @@ pub fn connectivity<F: ServiceFactory, Inherent>(spec: FactoryChainSpec<F>) wher const NUM_NODES: u32 = 10; { let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir"); - { + let runtime = { let mut network = TestNet::<F>::new(&temp, spec.clone(), NUM_NODES, 0, vec![], 30400); info!("Checking star topology"); let first_address = network.full_nodes[0].1.network().node_id().expect("No node address"); @@ -198,13 +198,17 @@ pub fn connectivity<F: ServiceFactory, Inherent>(spec: FactoryChainSpec<F>) wher network.run_until_all_full(|_index, service| service.network().status().num_peers == NUM_NODES as usize - 1 ); - } + network.runtime + }; + + runtime.shutdown_on_idle().wait().expect("Error shutting down runtime"); + temp.close().expect("Error removing temp dir"); } { let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir"); { - let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES, 0, vec![], 30500); + let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES, 0, vec![], 30400); info!("Checking linked topology"); let mut address = network.full_nodes[0].1.network().node_id().expect("No node address"); for (_, service) in network.full_nodes.iter().skip(1) { diff --git a/substrate/node/cli/src/lib.rs b/substrate/node/cli/src/lib.rs index 2be83134041ab21f997ef49caceea0ac1189b827..88059728f8e5b5a7a05ccfaaf24fec8be4a9b495 100644 --- a/substrate/node/cli/src/lib.rs +++ b/substrate/node/cli/src/lib.rs @@ -50,6 +50,7 @@ pub mod chain_spec; mod service; mod params; +use tokio::prelude::Future; use tokio::runtime::Runtime; pub use cli::{VersionInfo, IntoExit}; use substrate_service::{ServiceFactory, Roles as ServiceRoles}; @@ -136,8 +137,8 @@ pub fn run<I, T, E>(args: I, exit: E, version: cli::VersionInfo) -> error::Resul let mut runtime = Runtime::new()?; let executor = runtime.executor(); match config.roles == ServiceRoles::LIGHT { - true => run_until_exit(&mut runtime, service::Factory::new_light(config, executor)?, exit)?, - false => run_until_exit(&mut runtime, service::Factory::new_full(config, executor)?, exit)?, + true => run_until_exit(runtime, service::Factory::new_light(config, executor)?, exit)?, + false => run_until_exit(runtime, service::Factory::new_full(config, executor)?, exit)?, } } } @@ -145,7 +146,7 @@ pub fn run<I, T, E>(args: I, exit: E, version: cli::VersionInfo) -> error::Resul } fn run_until_exit<T, C, E>( - runtime: &mut Runtime, + mut runtime: Runtime, service: T, e: E, ) -> error::Result<()> @@ -161,5 +162,14 @@ fn run_until_exit<T, C, E>( let _ = runtime.block_on(e.into_exit()); exit_send.fire(); + + // we eagerly drop the service so that the internal exit future is fired, + // but we need to keep holding a reference to the global telemetry guard + let _telemetry = service.telemetry(); + drop(service); + + // TODO [andre]: timeout this future #1318 + let _ = runtime.shutdown_on_idle().wait(); + Ok(()) } diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index f4796343b5c05237b9b8fcf3a0ead3f7c10fbc51..d173c7419e4cd0127069ac9797216c9b54dfb38b 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -19,19 +19,20 @@ //! Service and ServiceFactory implementation. Specialized wrapper over substrate service. use std::sync::Arc; -use transaction_pool::{self, txpool::{Pool as TransactionPool}}; -use node_runtime::{GenesisConfig, RuntimeApi}; +use std::time::Duration; + +use client; +use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra}; +use grandpa; +use node_executor; +use primitives::ed25519::Pair; use node_primitives::{Block, InherentData}; +use node_runtime::{GenesisConfig, RuntimeApi}; use substrate_service::{ FactoryFullConfiguration, LightComponents, FullComponents, FullBackend, FullClient, LightClient, LightBackend, FullExecutor, LightExecutor, TaskExecutor }; -use node_executor; -use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra}; -use primitives::ed25519::Pair; -use client; -use std::time::Duration; -use grandpa; +use transaction_pool::{self, txpool::{Pool as TransactionPool}}; construct_simple_protocol! { /// Demo protocol attachment for substrate. @@ -89,12 +90,13 @@ construct_service_factory! { block_import.clone(), proposer, service.network(), + service.on_exit(), )); info!("Running Grandpa session as Authority {}", key.public()); } - let voter = grandpa::run_grandpa( + executor.spawn(grandpa::run_grandpa( grandpa::Config { local_key, gossip_duration: Duration::new(4, 0), // FIXME: make this available through chainspec? @@ -102,9 +104,8 @@ construct_service_factory! { }, link_half, grandpa::NetworkBridge::new(service.network()), - )?; - - executor.spawn(voter); + service.on_exit(), + )?); Ok(service) }