diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 25ccb0912a040037509d742883416659528cbb5d..4ceb36fcff45ed83765f5036a37e2ca46ab25c11 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -161,6 +161,7 @@ check-web-wasm: - time cargo web build -p substrate-keystore - time cargo web build -p substrate-executor - time cargo web build -p substrate-network + - time cargo web build -p substrate-offchain - time cargo web build -p substrate-panic-handler - time cargo web build -p substrate-peerset - time cargo web build -p substrate-primitives diff --git a/Cargo.lock b/Cargo.lock index 1cb4beeeb4280daaec8e5cd0ffa0c759967b97df..e444bc1e7491fafc8ea54c51391e47bfd7f5d373 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4252,6 +4252,8 @@ dependencies = [ "substrate-telemetry 2.0.0", "substrate-test-runtime-client 2.0.0", "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4511,7 +4513,7 @@ dependencies = [ "substrate-transaction-pool 2.0.0", "sysinfo 0.8.6 (registry+https://github.com/rust-lang/crates.io-index)", "target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/core/cli/src/lib.rs b/core/cli/src/lib.rs index c14c9625917e51d9a6b487668b634151d38429c8..4fe5ac3a829c938f3844b8dc26a66e5e092a023f 100644 --- a/core/cli/src/lib.rs +++ b/core/cli/src/lib.rs @@ -645,7 +645,9 @@ where None => Box::new(stdin()), }; - service::chain_ops::import_blocks::<F, _, _>(config, exit.into_exit(), file).map_err(Into::into) + let fut = service::chain_ops::import_blocks::<F, _, _>(config, exit.into_exit(), file)?; + tokio::run(fut); + Ok(()) } fn revert_chain<F, S>( diff --git a/core/consensus/common/src/import_queue.rs b/core/consensus/common/src/import_queue.rs index 6cbb8ee413ed6b2373df3e25ffbfac1e46d25469..68a39a9d9cc3ee2c4b37dbc472b48099c6a244b2 100644 --- a/core/consensus/common/src/import_queue.rs +++ b/core/consensus/common/src/import_queue.rs @@ -26,7 +26,7 @@ //! queues to be instantiated simply. use std::{sync::Arc, collections::HashMap}; -use futures::{prelude::*, sync::mpsc}; +use futures::{prelude::*, future::Executor, sync::mpsc}; use runtime_primitives::{Justification, traits::{ Block as BlockT, Header as HeaderT, NumberFor, }}; @@ -133,6 +133,10 @@ pub struct BasicQueue<B: BlockT> { /// If `Some`, contains the task to spawn in the background. If `None`, the future has already /// been spawned. future_to_spawn: Option<Box<dyn Future<Item = (), Error = ()> + Send>>, + /// If it isn't possible to spawn the future in `future_to_spawn` (which is notably the case in + /// "no std" environment), we instead put it in `manual_poll`. It is then polled manually from + /// `poll_actions`. + manual_poll: Option<Box<dyn Future<Item = (), Error = ()> + Send>>, } impl<B: BlockT> BasicQueue<B> { @@ -161,6 +165,7 @@ impl<B: BlockT> BasicQueue<B> { result_port, finality_proof_request_builder, future_to_spawn: Some(Box::new(future)), + manual_poll: None, } } @@ -200,8 +205,21 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> { } fn poll_actions(&mut self, link: &mut dyn Link<B>) { + // Try to spawn the future in `future_to_spawn`. if let Some(future) = self.future_to_spawn.take() { - tokio_executor::spawn(future); + if let Err(err) = tokio_executor::DefaultExecutor::current().execute(future) { + debug_assert!(self.manual_poll.is_none()); + self.manual_poll = Some(err.into_future()); + } + } + + // As a backup mechanism, if we failed to spawn the `future_to_spawn`, we instead poll + // manually here. + if let Some(manual_poll) = self.manual_poll.as_mut() { + match manual_poll.poll() { + Ok(Async::NotReady) => {} + _ => self.manual_poll = None, + } } if let Some(fprb) = self.finality_proof_request_builder.take() { diff --git a/core/finality-grandpa/Cargo.toml b/core/finality-grandpa/Cargo.toml index 6333cf6a5bd6ae1262a60ff03258ce007a4737c4..856fe3c0332f5efcee0ef46ecf90d15146400729 100644 --- a/core/finality-grandpa/Cargo.toml +++ b/core/finality-grandpa/Cargo.toml @@ -9,7 +9,8 @@ fork-tree = { path = "../../core/util/fork-tree" } futures = "0.1" log = "0.4" parking_lot = "0.8.0" -tokio = "0.1.7" +tokio-executor = "0.1.7" +tokio-timer = "0.2.11" rand = "0.6" parity-codec = { version = "4.1.1", features = ["derive"] } runtime_primitives = { package = "sr-primitives", path = "../sr-primitives" } @@ -31,6 +32,7 @@ network = { package = "substrate-network", path = "../network", features = ["tes keyring = { package = "substrate-keyring", path = "../keyring" } test-client = { package = "substrate-test-runtime-client", path = "../test-runtime/client"} env_logger = "0.6" +tokio = "0.1.17" [features] default = ["service-integration"] diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index cbcfef0d41a2ebd65f9e5a29471d760874d5a088..adfdf06291021877b8f0abb9c50ae315ff130cd4 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -34,6 +34,7 @@ use grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use futures::prelude::*; use futures::sync::{oneshot, mpsc}; use log::{debug, trace}; +use tokio_executor::Executor; use parity_codec::{Encode, Decode}; use substrate_primitives::{ed25519, Pair}; use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; @@ -291,8 +292,11 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { let startup_work = futures::future::lazy(move || { // lazily spawn these jobs onto their own tasks. the lazy future has access // to tokio globals, which aren't available outside. - tokio::spawn(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(()))); - tokio::spawn(reporting_job.select(on_exit.clone()).then(|_| Ok(()))); + let mut executor = tokio_executor::DefaultExecutor::current(); + executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(())))) + .expect("failed to spawn grandpa rebroadcast job task"); + executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(())))) + .expect("failed to spawn grandpa reporting job task"); Ok(()) }); diff --git a/core/finality-grandpa/src/communication/periodic.rs b/core/finality-grandpa/src/communication/periodic.rs index c6121370421bccefffb524e5afb3c9929a36e6f6..8490ff2f794ebcd789b8cb05ec46c029b7771838 100644 --- a/core/finality-grandpa/src/communication/periodic.rs +++ b/core/finality-grandpa/src/communication/periodic.rs @@ -21,7 +21,7 @@ use futures::prelude::*; use futures::sync::mpsc; use runtime_primitives::traits::{NumberFor, Block as BlockT}; use network::PeerId; -use tokio::timer::Delay; +use tokio_timer::Delay; use log::warn; use parity_codec::Encode; diff --git a/core/finality-grandpa/src/environment.rs b/core/finality-grandpa/src/environment.rs index c646c23323aa45c99aa902d233fee58da9a37e8f..9d5078116df2c44f564d9c460a6181f23be0df12 100644 --- a/core/finality-grandpa/src/environment.rs +++ b/core/finality-grandpa/src/environment.rs @@ -22,7 +22,7 @@ use std::time::{Duration, Instant}; use log::{debug, warn, info}; use parity_codec::{Decode, Encode}; use futures::prelude::*; -use tokio::timer::Delay; +use tokio_timer::Delay; use parking_lot::RwLock; use client::{ diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index e7410ca44a1ebdbe43b55e9e5f33246dbc75797a..7d82c0f2e44843895c280eb524c3bb2f22aa07b8 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -179,7 +179,7 @@ pub enum Error { /// An invariant has been violated (e.g. not finalizing pending change blocks in-order) Safety(String), /// A timer failed to fire. - Timer(::tokio::timer::Error), + Timer(tokio_timer::Error), } impl From<GrandpaError> for Error { @@ -443,7 +443,7 @@ fn register_finality_tracker_inherent_data_provider<B, E, Block: BlockT<Hash=H25 } /// Parameters used to run Grandpa. -pub struct GrandpaParams<'a, B, E, Block: BlockT<Hash=H256>, N, RA, SC, X> { +pub struct GrandpaParams<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X> { /// Configuration for the GRANDPA service. pub config: Config, /// A link to the block import worker. @@ -455,7 +455,7 @@ pub struct GrandpaParams<'a, B, E, Block: BlockT<Hash=H256>, N, RA, SC, X> { /// Handle to a future that will resolve on exit. pub on_exit: X, /// If supplied, can be used to hook on telemetry connection established events. - pub telemetry_on_connect: Option<TelemetryOnConnect<'a>>, + pub telemetry_on_connect: Option<TelemetryOnConnect>, } /// Run a GRANDPA voter as a task. Provide configuration and a link to a @@ -503,7 +503,7 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>( register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?; - if let Some(telemetry_on_connect) = telemetry_on_connect { + let telemetry_task = if let Some(telemetry_on_connect) = telemetry_on_connect { let authorities = authority_set.clone(); let events = telemetry_on_connect.telemetry_connection_sinks .for_each(move |_| { @@ -520,10 +520,11 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>( ); Ok(()) }) - .then(|_| Ok(())); - let events = events.select(telemetry_on_connect.on_exit).then(|_| Ok(())); - telemetry_on_connect.executor.spawn(events); - } + .then(|_| -> Result<(), ()> { Ok(()) }); + futures::future::Either::A(events) + } else { + futures::future::Either::B(futures::future::empty()) + }; let voters = authority_set.current_authorities(); let initial_environment = Arc::new(Environment { @@ -723,7 +724,11 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>( let voter_work = network_startup.and_then(move |()| voter_work); - Ok(voter_work.select(on_exit).then(|_| Ok(()))) + // Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa. + let telemetry_task = telemetry_task + .then(|_| futures::future::empty::<(), ()>()); + + Ok(voter_work.select(on_exit).select2(telemetry_task).then(|_| Ok(()))) } #[deprecated(since = "1.1", note = "Please switch to run_grandpa_voter.")] diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index 7c981050dd4aea6a850f05636077fad2bd3c5361..daef7cafb1f4a53f42a92b61a63688467f02ef4a 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -28,7 +28,7 @@ use futures::prelude::*; use futures::stream::Fuse; use parking_lot::Mutex; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use tokio::timer::Interval; +use tokio_timer::Interval; use std::collections::{HashMap, VecDeque}; use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; @@ -411,7 +411,7 @@ pub(crate) type UntilCommitBlocksImported<Block, Status, I, U> = UntilImported< mod tests { use super::*; use tokio::runtime::current_thread::Runtime; - use tokio::timer::Delay; + use tokio_timer::Delay; use test_client::runtime::{Block, Hash, Header}; use consensus_common::BlockOrigin; use client::BlockImportNotification; diff --git a/core/offchain/Cargo.toml b/core/offchain/Cargo.toml index 4ea963a773148cb0c0e71750df02d565c21651db..c272653b6437f3a9edb03049bff6656dc6353129 100644 --- a/core/offchain/Cargo.toml +++ b/core/offchain/Cargo.toml @@ -16,12 +16,12 @@ parity-codec = { version = "4.1.1", features = ["derive"] } parking_lot = "0.8.0" primitives = { package = "substrate-primitives", path = "../../core/primitives" } runtime_primitives = { package = "sr-primitives", path = "../../core/sr-primitives" } -tokio = "0.1.7" transaction_pool = { package = "substrate-transaction-pool", path = "../../core/transaction-pool" } [dev-dependencies] env_logger = "0.6" test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } +tokio = "0.1.7" [features] default = [] diff --git a/core/offchain/src/lib.rs b/core/offchain/src/lib.rs index 081ae61a5bcaeecc975bab74f911a4f2cfa7a41f..96c0a190774451db56b936340138b550325824a4 100644 --- a/core/offchain/src/lib.rs +++ b/core/offchain/src/lib.rs @@ -34,6 +34,7 @@ #![warn(missing_docs)] use std::{ + fmt, marker::PhantomData, sync::Arc, }; @@ -45,7 +46,7 @@ use runtime_primitives::{ generic::BlockId, traits::{self, ProvideRuntimeApi}, }; -use tokio::runtime::TaskExecutor; +use futures::future::Future; use transaction_pool::txpool::{Pool, ChainApi}; mod api; @@ -55,22 +56,24 @@ pub mod testing; pub use offchain_primitives::OffchainWorkerApi; /// An offchain workers manager. -#[derive(Debug)] pub struct OffchainWorkers<C, Block: traits::Block> { client: Arc<C>, - executor: TaskExecutor, _block: PhantomData<Block>, } +impl<C, Block: traits::Block> fmt::Debug for OffchainWorkers<C, Block> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("OffchainWorkers").finish() + } +} + impl<C, Block: traits::Block> OffchainWorkers<C, Block> { /// Creates new `OffchainWorkers`. pub fn new( client: Arc<C>, - executor: TaskExecutor, ) -> Self { Self { client, - executor, _block: PhantomData, } } @@ -82,11 +85,12 @@ impl<C, Block> OffchainWorkers<C, Block> where C::Api: OffchainWorkerApi<Block>, { /// Start the offchain workers after given block. + #[must_use] pub fn on_block_imported<A>( &self, number: &<Block::Header as traits::Header>::Number, pool: &Arc<Pool<A>>, - ) where + ) -> impl Future<Item = (), Error = ()> where A: ChainApi<Block=Block> + 'static, { let runtime = self.client.runtime_api(); @@ -96,11 +100,12 @@ impl<C, Block> OffchainWorkers<C, Block> where if has_api.unwrap_or(false) { let (api, runner) = api::Api::new(pool.clone(), at.clone()); - self.executor.spawn(runner.process()); - debug!("Running offchain workers at {:?}", at); let api = Box::new(api); runtime.offchain_worker_with_context(&at, ExecutionContext::OffchainWorker(api), *number).unwrap(); + futures::future::Either::A(runner.process()) + } else { + futures::future::Either::B(futures::future::ok(())) } } } @@ -119,8 +124,8 @@ mod tests { let pool = Arc::new(Pool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone()))); // when - let offchain = OffchainWorkers::new(client, runtime.executor()); - offchain.on_block_imported(&0u64, &pool); + let offchain = OffchainWorkers::new(client); + runtime.executor().spawn(offchain.on_block_imported(&0u64, &pool)); // then runtime.shutdown_on_idle().wait().unwrap(); diff --git a/core/rpc-servers/src/lib.rs b/core/rpc-servers/src/lib.rs index adf560ce5a6375eeee780ecace7ef6c42908959a..37ea83353714febfcd11e865aa870c443f3dee25 100644 --- a/core/rpc-servers/src/lib.rs +++ b/core/rpc-servers/src/lib.rs @@ -66,6 +66,7 @@ pub fn start_http( http::ServerBuilder::new(io) .threads(4) .health_api(("/health", "system_health")) + .allowed_hosts(hosts_filtering(cors.is_some())) .rest_api(if cors.is_some() { http::RestApi::Secure } else { @@ -87,6 +88,7 @@ pub fn start_ws( .max_payload(MAX_PAYLOAD) .max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS)) .allowed_origins(map_cors(cors)) + .allowed_hosts(hosts_filtering(cors.is_some())) .start(addr) .map_err(|err| match err { ws::Error::Io(io) => io, @@ -103,3 +105,14 @@ fn map_cors<T: for<'a> From<&'a str>>( ) -> http::DomainsValidation<T> { cors.map(|x| x.iter().map(AsRef::as_ref).map(Into::into).collect::<Vec<_>>()).into() } + +fn hosts_filtering(enable: bool) -> http::DomainsValidation<http::Host> { + if enable { + // NOTE The listening address is whitelisted by default. + // Setting an empty vector here enables the validation + // and allows only the listening address. + http::DomainsValidation::AllowOnly(vec![]) + } else { + http::DomainsValidation::Disabled + } +} diff --git a/core/rpc/Cargo.toml b/core/rpc/Cargo.toml index 81662b037a2788b89c442496b3017b341dc7cfc2..7833f17f6bd1d232912415af505fe9b9fde075ad 100644 --- a/core/rpc/Cargo.toml +++ b/core/rpc/Cargo.toml @@ -24,7 +24,6 @@ state_machine = { package = "substrate-state-machine", path = "../state-machine" transaction_pool = { package = "substrate-transaction-pool", path = "../transaction-pool" } runtime_primitives = { package = "sr-primitives", path = "../sr-primitives" } runtime_version = { package = "sr-version", path = "../sr-version" } -tokio = "0.1.7" [dev-dependencies] assert_matches = "1.1" @@ -32,3 +31,4 @@ futures = "0.1.17" sr-io = { path = "../sr-io" } test-client = { package = "substrate-test-runtime-client", path = "../test-runtime/client" } rustc-hex = "2.0" +tokio = "0.1.17" diff --git a/core/rpc/src/author/tests.rs b/core/rpc/src/author/tests.rs index 4c6a724acd5ae35bbf4ccbfbe71e7999a7d684e6..cf320ee1442e7aff950850143d65155137c62d49 100644 --- a/core/rpc/src/author/tests.rs +++ b/core/rpc/src/author/tests.rs @@ -44,7 +44,7 @@ fn submit_transaction_should_not_cause_error() { let p = Author { client: client.clone(), pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client))), - subscriptions: Subscriptions::new(runtime.executor()), + subscriptions: Subscriptions::new(Arc::new(runtime.executor())), }; let xt = uxt(AccountKeyring::Alice, 1).encode(); let h: H256 = blake2_256(&xt).into(); @@ -65,7 +65,7 @@ fn submit_rich_transaction_should_not_cause_error() { let p = Author { client: client.clone(), pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))), - subscriptions: Subscriptions::new(runtime.executor()), + subscriptions: Subscriptions::new(Arc::new(runtime.executor())), }; let xt = uxt(AccountKeyring::Alice, 0).encode(); let h: H256 = blake2_256(&xt).into(); @@ -88,7 +88,7 @@ fn should_watch_extrinsic() { let p = Author { client, pool: pool.clone(), - subscriptions: Subscriptions::new(runtime.executor()), + subscriptions: Subscriptions::new(Arc::new(runtime.executor())), }; let (subscriber, id_rx, data) = ::jsonrpc_pubsub::typed::Subscriber::new_test("test"); @@ -128,7 +128,7 @@ fn should_return_pending_extrinsics() { let p = Author { client, pool: pool.clone(), - subscriptions: Subscriptions::new(runtime.executor()), + subscriptions: Subscriptions::new(Arc::new(runtime.executor())), }; let ex = uxt(AccountKeyring::Alice, 0); AuthorApi::submit_extrinsic(&p, ex.encode().into()).unwrap(); @@ -146,7 +146,7 @@ fn should_remove_extrinsics() { let p = Author { client, pool: pool.clone(), - subscriptions: Subscriptions::new(runtime.executor()), + subscriptions: Subscriptions::new(Arc::new(runtime.executor())), }; let ex1 = uxt(AccountKeyring::Alice, 0); p.submit_extrinsic(ex1.encode().into()).unwrap(); diff --git a/core/rpc/src/chain/tests.rs b/core/rpc/src/chain/tests.rs index eed9ae836b8b5cbdb88da859cb58a54b9ae16207..e6fa4d94e5b0dfa96bfebc615eeb9aede6e40082 100644 --- a/core/rpc/src/chain/tests.rs +++ b/core/rpc/src/chain/tests.rs @@ -29,7 +29,7 @@ fn should_return_header() { let client = Chain { client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(remote), + subscriptions: Subscriptions::new(Arc::new(remote)), }; assert_matches!( @@ -67,7 +67,7 @@ fn should_return_a_block() { let api = Chain { client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(remote), + subscriptions: Subscriptions::new(Arc::new(remote)), }; let block = api.client.new_block(Default::default()).unwrap().bake().unwrap(); @@ -121,7 +121,7 @@ fn should_return_block_hash() { let client = Chain { client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(remote), + subscriptions: Subscriptions::new(Arc::new(remote)), }; assert_matches!( @@ -165,7 +165,7 @@ fn should_return_finalized_hash() { let client = Chain { client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(remote), + subscriptions: Subscriptions::new(Arc::new(remote)), }; assert_matches!( @@ -199,7 +199,7 @@ fn should_notify_about_latest_block() { { let api = Chain { client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(remote), + subscriptions: Subscriptions::new(Arc::new(remote)), }; api.subscribe_new_head(Default::default(), subscriber); @@ -230,7 +230,7 @@ fn should_notify_about_finalized_block() { { let api = Chain { client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(remote), + subscriptions: Subscriptions::new(Arc::new(remote)), }; api.subscribe_finalized_heads(Default::default(), subscriber); diff --git a/core/rpc/src/state/tests.rs b/core/rpc/src/state/tests.rs index f8cb19451337aae09b0c475aa67a9cb04f386791..6a8eefa10b660d2785e16740e7e064a959963e83 100644 --- a/core/rpc/src/state/tests.rs +++ b/core/rpc/src/state/tests.rs @@ -32,7 +32,7 @@ fn should_return_storage() { let core = tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let genesis_hash = client.genesis_hash(); - let client = State::new(client, Subscriptions::new(core.executor())); + let client = State::new(client, Subscriptions::new(Arc::new(core.executor()))); let key = StorageKey(b":code".to_vec()); assert_eq!( @@ -57,7 +57,7 @@ fn should_return_child_storage() { .add_child_storage("test", "key", vec![42_u8]) .build()); let genesis_hash = client.genesis_hash(); - let client = State::new(client, Subscriptions::new(core.executor())); + let client = State::new(client, Subscriptions::new(Arc::new(core.executor()))); let child_key = StorageKey(well_known_keys::CHILD_STORAGE_KEY_PREFIX.iter().chain(b"test").cloned().collect()); let key = StorageKey(b"key".to_vec()); @@ -82,7 +82,7 @@ fn should_call_contract() { let core = tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let genesis_hash = client.genesis_hash(); - let client = State::new(client, Subscriptions::new(core.executor())); + let client = State::new(client, Subscriptions::new(Arc::new(core.executor()))); assert_matches!( client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()), @@ -97,7 +97,7 @@ fn should_notify_about_storage_changes() { let (subscriber, id, transport) = Subscriber::new_test("test"); { - let api = State::new(Arc::new(test_client::new()), Subscriptions::new(remote)); + let api = State::new(Arc::new(test_client::new()), Subscriptions::new(Arc::new(remote))); api.subscribe_storage(Default::default(), subscriber, None.into()); @@ -128,7 +128,7 @@ fn should_send_initial_storage_changes_and_notifications() { let (subscriber, id, transport) = Subscriber::new_test("test"); { - let api = State::new(Arc::new(test_client::new()), Subscriptions::new(remote)); + let api = State::new(Arc::new(test_client::new()), Subscriptions::new(Arc::new(remote))); let alice_balance_key = blake2_256(&runtime::system::balance_of_key(AccountKeyring::Alice.into())); @@ -163,7 +163,7 @@ fn should_send_initial_storage_changes_and_notifications() { fn should_query_storage() { fn run_tests(client: Arc<TestClient>) { let core = tokio::runtime::Runtime::new().unwrap(); - let api = State::new(client.clone(), Subscriptions::new(core.executor())); + let api = State::new(client.clone(), Subscriptions::new(Arc::new(core.executor()))); let add_block = |nonce| { let mut builder = client.new_block(Default::default()).unwrap(); @@ -254,7 +254,7 @@ fn should_return_runtime_version() { let core = tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); - let api = State::new(client.clone(), Subscriptions::new(core.executor())); + let api = State::new(client.clone(), Subscriptions::new(Arc::new(core.executor()))); let result = "{\"specName\":\"test\",\"implName\":\"parity-test\",\"authoringVersion\":1,\ \"specVersion\":1,\"implVersion\":1,\"apis\":[[\"0xdf6acb689907609b\",2],\ @@ -274,7 +274,7 @@ fn should_notify_on_runtime_version_initially() { { let client = Arc::new(test_client::new()); - let api = State::new(client.clone(), Subscriptions::new(core.executor())); + let api = State::new(client.clone(), Subscriptions::new(Arc::new(core.executor()))); api.subscribe_runtime_version(Default::default(), subscriber); diff --git a/core/rpc/src/subscriptions.rs b/core/rpc/src/subscriptions.rs index 500f3dac4545ccf966cf558f0dc94ccbb6f192de..77e1d958f683fed3f639f8093d048bd946984677 100644 --- a/core/rpc/src/subscriptions.rs +++ b/core/rpc/src/subscriptions.rs @@ -17,12 +17,11 @@ use std::collections::HashMap; use std::sync::{Arc, atomic::{self, AtomicUsize}}; -use log::warn; +use log::{error, warn}; use jsonrpc_pubsub::{SubscriptionId, typed::{Sink, Subscriber}}; use parking_lot::Mutex; use crate::rpc::futures::sync::oneshot; use crate::rpc::futures::{Future, future}; -use tokio::runtime::TaskExecutor; type Id = u64; @@ -50,16 +49,16 @@ impl IdProvider { /// /// Takes care of assigning unique subscription ids and /// driving the sinks into completion. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Subscriptions { next_id: IdProvider, active_subscriptions: Arc<Mutex<HashMap<Id, oneshot::Sender<()>>>>, - executor: TaskExecutor, + executor: Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>, } impl Subscriptions { /// Creates new `Subscriptions` object. - pub fn new(executor: TaskExecutor) -> Self { + pub fn new(executor: Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>) -> Self { Subscriptions { next_id: Default::default(), active_subscriptions: Default::default(), @@ -86,7 +85,9 @@ impl Subscriptions { .then(|_| Ok(())); self.active_subscriptions.lock().insert(id, tx); - self.executor.spawn(future); + if self.executor.execute(Box::new(future)).is_err() { + error!("Failed to spawn RPC subscription task"); + } } } diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index d70bf9cfefb5e18a2aa4b11e4345f7ba5c0ed7df..494ec0edefd158ab71778a93353c374995d37c98 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -11,7 +11,7 @@ parking_lot = "0.8.0" lazy_static = "1.0" log = "0.4" slog = {version = "^2", features = ["nested-values"]} -tokio = "0.1.7" +tokio-executor = "0.1.7" tokio-timer = "0.2" exit-future = "0.1" serde = { version = "1.0", features = ["derive"] } diff --git a/core/service/src/chain_ops.rs b/core/service/src/chain_ops.rs index 6c4a03ee7b003e38d39fc33b85504d4d9725b9ac..4573382c94880357e62164d38d9ce43067998bd7 100644 --- a/core/service/src/chain_ops.rs +++ b/core/service/src/chain_ops.rs @@ -116,12 +116,12 @@ impl<B: Block> Link<B> for WaitLink { } } -/// Import blocks from a binary stream. +/// Returns a future that import blocks from a binary stream. pub fn import_blocks<F, E, R>( mut config: FactoryFullConfiguration<F>, exit: E, mut input: R -) -> error::Result<()> +) -> error::Result<impl Future<Item = (), Error = ()>> where F: ServiceFactory, E: Future<Item=(),Error=()> + Send + 'static, R: Read, { let client = new_client::<F>(&config)?; @@ -175,7 +175,7 @@ pub fn import_blocks<F, E, R>( } let mut link = WaitLink::new(); - tokio::run(futures::future::poll_fn(move || { + Ok(futures::future::poll_fn(move || { let blocks_before = link.imported_blocks; queue.poll_actions(&mut link); if link.imported_blocks / 1000 != blocks_before / 1000 { @@ -186,15 +186,12 @@ pub fn import_blocks<F, E, R>( ); } if link.imported_blocks >= count { + info!("Imported {} blocks. Best: #{}", block_count, client.info().chain.best_number); Ok(Async::Ready(())) } else { Ok(Async::NotReady) } - })); - - info!("Imported {} blocks. Best: #{}", block_count, client.info().chain.best_number); - - Ok(()) + })) } /// Revert the chain. diff --git a/core/service/src/components.rs b/core/service/src/components.rs index a41f9e94fff40f647d1ac37bbb3cc9f4eb0642ee..600841f110afad33daffd6dc69c1de726ea1652d 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -18,7 +18,6 @@ use std::{sync::Arc, net::SocketAddr, ops::Deref, ops::DerefMut}; use serde::{Serialize, de::DeserializeOwned}; -use tokio::runtime::TaskExecutor; use crate::chain_spec::ChainSpec; use client_db; use client::{self, Client, runtime_api}; @@ -34,7 +33,7 @@ use crate::config::Configuration; use primitives::{Blake2Hasher, H256}; use rpc::{self, apis::system::SystemInfo}; use parking_lot::Mutex; -use futures::sync::mpsc; +use futures::{prelude::*, future::Executor, sync::mpsc}; // Type aliases. // These exist mainly to avoid typing `<F as Factory>::Foo` all over the code. @@ -262,7 +261,7 @@ pub trait OffchainWorker<C: Components> { number: &FactoryBlockNumber<C::Factory>, offchain: &offchain::OffchainWorkers<ComponentClient<C>, ComponentBlock<C>>, pool: &Arc<TransactionPool<C::TransactionPoolApi>>, - ) -> error::Result<()>; + ) -> error::Result<Box<dyn Future<Item = (), Error = ()> + Send>>; } impl<C: Components> OffchainWorker<Self> for C where @@ -273,8 +272,8 @@ impl<C: Components> OffchainWorker<Self> for C where number: &FactoryBlockNumber<C::Factory>, offchain: &offchain::OffchainWorkers<ComponentClient<C>, ComponentBlock<C>>, pool: &Arc<TransactionPool<C::TransactionPoolApi>>, - ) -> error::Result<()> { - Ok(offchain.on_block_imported(number, pool)) + ) -> error::Result<Box<dyn Future<Item = (), Error = ()> + Send>> { + Ok(Box::new(offchain.on_block_imported(number, pool))) } } @@ -298,6 +297,9 @@ impl<C: Components, T> ServiceTrait<C> for T where + OffchainWorker<C> {} +/// Alias for a an implementation of `futures::future::Executor`. +pub type TaskExecutor = Arc<dyn Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>; + /// A collection of types and methods to build a service on top of the substrate service. pub trait ServiceFactory: 'static + Sized { /// Block type. @@ -351,10 +353,10 @@ pub trait ServiceFactory: 'static + Sized { ) -> Result<Self::SelectChain, error::Error>; /// Build full service. - fn new_full(config: FactoryFullConfiguration<Self>, executor: TaskExecutor) + fn new_full(config: FactoryFullConfiguration<Self>) -> Result<Self::FullService, error::Error>; /// Build light service. - fn new_light(config: FactoryFullConfiguration<Self>, executor: TaskExecutor) + fn new_light(config: FactoryFullConfiguration<Self>) -> Result<Self::LightService, error::Error>; /// ImportQueue for a full client @@ -455,12 +457,11 @@ pub struct FullComponents<Factory: ServiceFactory> { impl<Factory: ServiceFactory> FullComponents<Factory> { /// Create new `FullComponents` pub fn new( - config: FactoryFullConfiguration<Factory>, - task_executor: TaskExecutor + config: FactoryFullConfiguration<Factory> ) -> Result<Self, error::Error> { Ok( Self { - service: Service::new(config, task_executor)?, + service: Service::new(config)?, } ) } @@ -480,6 +481,15 @@ impl<Factory: ServiceFactory> DerefMut for FullComponents<Factory> { } } +impl<Factory: ServiceFactory> Future for FullComponents<Factory> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + self.service.poll() + } +} + impl<Factory: ServiceFactory> Components for FullComponents<Factory> { type Factory = Factory; type Executor = FullExecutor<Factory>; @@ -555,11 +565,10 @@ impl<Factory: ServiceFactory> LightComponents<Factory> { /// Create new `LightComponents` pub fn new( config: FactoryFullConfiguration<Factory>, - task_executor: TaskExecutor ) -> Result<Self, error::Error> { Ok( Self { - service: Service::new(config, task_executor)?, + service: Service::new(config)?, } ) } @@ -573,6 +582,15 @@ impl<Factory: ServiceFactory> Deref for LightComponents<Factory> { } } +impl<Factory: ServiceFactory> Future for LightComponents<Factory> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + self.service.poll() + } +} + impl<Factory: ServiceFactory> Components for LightComponents<Factory> { type Factory = Factory; type Executor = LightExecutor<Factory>; diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index b6a35475ae0f572b46772f046288f2d08dbce746..7467f00f419064772fee634ac8f89b2bbe1a17b3 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -36,7 +36,7 @@ use client::{BlockchainEvents, backend::Backend}; use exit_future::Signal; use futures::prelude::*; use keystore::Store as Keystore; -use log::{info, warn, debug}; +use log::{info, warn, debug, error}; use parity_codec::{Encode, Decode}; use primitives::Pair; use runtime_primitives::generic::BlockId; @@ -67,7 +67,7 @@ pub use std::{ops::Deref, result::Result, sync::Arc}; #[doc(hidden)] pub use network::{FinalityProofProvider, OnDemand}; #[doc(hidden)] -pub use tokio::runtime::TaskExecutor; +pub use futures::future::Executor; const DEFAULT_PROTOCOL_ID: &str = "sup"; @@ -82,6 +82,14 @@ pub struct Service<Components: components::Components> { keystore: Keystore, exit: ::exit_future::Exit, signal: Option<Signal>, + /// Sender for futures that must be spawned as background tasks. + to_spawn_tx: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>, + /// Receiver for futures that must be spawned as background tasks. + to_spawn_rx: Mutex<mpsc::UnboundedReceiver<Box<dyn Future<Item = (), Error = ()> + Send>>>, + /// List of futures to poll from `poll`. + /// If spawning a background task is not possible, we instead push the task into this `Vec`. + /// The elements must then be polled manually. + to_poll: Mutex<Vec<Box<dyn Future<Item = (), Error = ()> + Send>>>, /// Configuration of this Service pub config: FactoryFullConfiguration<Components::Factory>, _rpc: Box<dyn std::any::Any + Send + Sync>, @@ -106,13 +114,9 @@ pub fn new_client<Factory: components::ServiceFactory>(config: &FactoryFullConfi pub type TelemetryOnConnectNotifications = mpsc::UnboundedReceiver<()>; /// Used to hook on telemetry connection established events. -pub struct TelemetryOnConnect<'a> { - /// Handle to a future that will resolve on exit. - pub on_exit: Box<dyn Future<Item=(), Error=()> + Send + 'static>, +pub struct TelemetryOnConnect { /// Event stream. pub telemetry_connection_sinks: TelemetryOnConnectNotifications, - /// Executor to which the hook is spawned. - pub executor: &'a TaskExecutor, } impl<Components: components::Components> Service<Components> { @@ -126,10 +130,13 @@ impl<Components: components::Components> Service<Components> { /// Creates a new service. pub fn new( mut config: FactoryFullConfiguration<Components::Factory>, - task_executor: TaskExecutor, ) -> Result<Self, error::Error> { let (signal, exit) = ::exit_future::signal(); + // List of asynchronous tasks to spawn. We collect them, then spawn them all at once. + let (to_spawn_tx, to_spawn_rx) = + mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>(); + // Create client let executor = NativeExecutor::new(config.default_heap_pages); @@ -209,16 +216,13 @@ impl<Components: components::Components> Service<Components> { let network = network_mut.service().clone(); let network_status_sinks = Arc::new(Mutex::new(Vec::new())); - task_executor.spawn(build_network_future(network_mut, network_status_sinks.clone()) + let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(network_mut, network_status_sinks.clone()) .map_err(|_| ()) .select(exit.clone()) - .then(|_| Ok(()))); + .then(|_| Ok(())))); let offchain_workers = if config.offchain_worker { - Some(Arc::new(offchain::OffchainWorkers::new( - client.clone(), - task_executor.clone(), - ))) + Some(Arc::new(offchain::OffchainWorkers::new(client.clone()))) } else { None }; @@ -229,6 +233,7 @@ impl<Components: components::Components> Service<Components> { let txpool = Arc::downgrade(&transaction_pool); let wclient = Arc::downgrade(&client); let offchain = offchain_workers.as_ref().map(Arc::downgrade); + let to_spawn_tx_ = to_spawn_tx.clone(); let events = client.import_notification_stream() .for_each(move |notification| { @@ -247,18 +252,19 @@ impl<Components: components::Components> Service<Components> { } if let (Some(txpool), Some(offchain)) = (txpool.upgrade(), offchain.as_ref().and_then(|o| o.upgrade())) { - Components::RuntimeServices::offchain_workers( + let future = Components::RuntimeServices::offchain_workers( &number, &offchain, &txpool, ).map_err(|e| warn!("Offchain workers error processing new block: {:?}", e))?; + let _ = to_spawn_tx_.unbounded_send(future); } Ok(()) }) .select(exit.clone()) .then(|_| Ok(())); - task_executor.spawn(events); + let _ = to_spawn_tx.unbounded_send(Box::new(events)); } { @@ -304,7 +310,7 @@ impl<Components: components::Components> Service<Components> { .select(exit.clone()) .then(|_| Ok(())); - task_executor.spawn(events); + let _ = to_spawn_tx.unbounded_send(Box::new(events)); } { @@ -326,7 +332,7 @@ impl<Components: components::Components> Service<Components> { .select(exit.clone()) .then(|_| Ok(())); - task_executor.spawn(events); + let _ = to_spawn_tx.unbounded_send(Box::new(events)); } // Periodically notify the telemetry. @@ -337,7 +343,7 @@ impl<Components: components::Components> Service<Components> { let self_pid = get_current_pid(); let (netstat_tx, netstat_rx) = mpsc::unbounded(); network_status_sinks.lock().push(netstat_tx); - task_executor.spawn(netstat_rx.for_each(move |net_status| { + let tel_task = netstat_rx.for_each(move |net_status| { let info = client_.info(); let best_number = info.chain.best_number.saturated_into::<u64>(); let best_hash = info.chain.best_hash; @@ -380,7 +386,8 @@ impl<Components: components::Components> Service<Components> { ); Ok(()) - }).select(exit.clone()).then(|_| Ok(()))); + }).select(exit.clone()).then(|_| Ok(())); + let _ = to_spawn_tx.unbounded_send(Box::new(tel_task)); // RPC let system_info = rpc::apis::system::SystemInfo { @@ -390,6 +397,19 @@ impl<Components: components::Components> Service<Components> { properties: config.chain_spec.properties(), }; let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded(); + struct ExecutorWithTx(mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>); + impl futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for ExecutorWithTx { + fn execute( + &self, + future: Box<dyn Future<Item = (), Error = ()> + Send> + ) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> { + self.0.unbounded_send(future) + .map_err(|err| { + let kind = futures::future::ExecuteErrorKind::Shutdown; + futures::future::ExecuteError::new(kind, err.into_inner()) + }) + } + } let rpc = Components::RuntimeServices::start_rpc( client.clone(), system_rpc_tx, @@ -398,14 +418,14 @@ impl<Components: components::Components> Service<Components> { config.rpc_ws, config.rpc_ws_max_connections, config.rpc_cors.clone(), - task_executor.clone(), + Arc::new(ExecutorWithTx(to_spawn_tx.clone())), transaction_pool.clone(), )?; - task_executor.spawn(build_system_rpc_handler::<Components>( + let _ = to_spawn_tx.unbounded_send(Box::new(build_system_rpc_handler::<Components>( network.clone(), system_rpc_rx, has_bootnodes - )); + ))); let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default(); @@ -444,9 +464,9 @@ impl<Components: components::Components> Service<Components> { }); Ok(()) }); - task_executor.spawn(future + let _ = to_spawn_tx.unbounded_send(Box::new(future .select(exit.clone()) - .then(|_| Ok(()))); + .then(|_| Ok(())))); telemetry }); @@ -457,6 +477,9 @@ impl<Components: components::Components> Service<Components> { select_chain, transaction_pool, signal: Some(signal), + to_spawn_tx, + to_spawn_rx: Mutex::new(to_spawn_rx), + to_poll: Mutex::new(Vec::new()), keystore, config, exit, @@ -484,9 +507,12 @@ impl<Components: components::Components> Service<Components> { pub fn telemetry(&self) -> Option<tel::Telemetry> { self._telemetry.as_ref().map(|t| t.clone()) } -} -impl<Components> Service<Components> where Components: components::Components { + /// Spawns a task in the background that runs the future passed as parameter. + pub fn spawn_task(&self, task: Box<dyn Future<Item = (), Error = ()> + Send>) { + let _ = self.to_spawn_tx.unbounded_send(task); + } + /// Get shared client instance. pub fn client(&self) -> Arc<ComponentClient<Components>> { self.client.clone() @@ -525,6 +551,50 @@ impl<Components> Service<Components> where Components: components::Components { } } +impl<Components> Future for Service<Components> where Components: components::Components { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + Future::poll(&mut &*self) + } +} + +// Note that this implementation is totally unnecessary. It exists only because of tests. The tests +// should eventually be reworked, as it would make it possible to remove the `Mutex`es. that we +// lock here. +impl<'a, Components> Future for &'a Service<Components> where Components: components::Components { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + // The user is supposed to poll only one service, so it doesn't matter if we keep this + // mutex locked. + let mut to_poll = self.to_poll.lock(); + let mut to_spawn_rx = self.to_spawn_rx.lock(); + + while let Ok(Async::Ready(Some(task_to_spawn))) = to_spawn_rx.poll() { + let executor = tokio_executor::DefaultExecutor::current(); + if let Err(err) = executor.execute(task_to_spawn) { + debug!( + target: "service", + "Failed to spawn background task: {:?}; falling back to manual polling", + err + ); + to_poll.push(err.into_future()); + } + } + + // Polling all the `to_poll` futures. + while let Some(pos) = to_poll.iter_mut().position(|t| t.poll().map(|t| t.is_ready()).unwrap_or(true)) { + to_poll.remove(pos); + } + + // The service future never ends. + Ok(Async::NotReady) + } +} + /// Builds a never-ending future that continuously polls the network. /// /// The `status_sink` contain a list of senders to send a periodic network status to. @@ -726,7 +796,7 @@ fn build_system_rpc_handler<Components: components::Components>( /// ``` /// # use substrate_service::{ /// # construct_service_factory, Service, FullBackend, FullExecutor, LightBackend, LightExecutor, -/// # FullComponents, LightComponents, FactoryFullConfiguration, FullClient, TaskExecutor +/// # FullComponents, LightComponents, FactoryFullConfiguration, FullClient /// # }; /// # use transaction_pool::{self, txpool::{Pool as TransactionPool}}; /// # use network::construct_simple_protocol; @@ -775,14 +845,14 @@ fn build_system_rpc_handler<Components: components::Components>( /// Genesis = GenesisConfig, /// Configuration = (), /// FullService = FullComponents<Self> -/// { |config, executor| <FullComponents<Factory>>::new(config, executor) }, +/// { |config| <FullComponents<Factory>>::new(config) }, /// // Setup as Consensus Authority (if the role and key are given) /// AuthoritySetup = { -/// |service: Self::FullService, executor: TaskExecutor, key: Option<Arc<ed25519::Pair>>| { +/// |service: Self::FullService, key: Option<Arc<ed25519::Pair>>| { /// Ok(service) /// }}, /// LightService = LightComponents<Self> -/// { |config, executor| <LightComponents<Factory>>::new(config, executor) }, +/// { |config| <LightComponents<Factory>>::new(config) }, /// FullImportQueue = BasicQueue<Block> /// { |_, client, _| Ok(BasicQueue::new(Arc::new(MyVerifier), client, None, None, None)) }, /// LightImportQueue = BasicQueue<Block> @@ -893,21 +963,19 @@ macro_rules! construct_service_factory { } fn new_light( - config: $crate::FactoryFullConfiguration<Self>, - executor: $crate::TaskExecutor + config: $crate::FactoryFullConfiguration<Self> ) -> $crate::Result<Self::LightService, $crate::Error> { - ( $( $light_service_init )* ) (config, executor) + ( $( $light_service_init )* ) (config) } fn new_full( - config: $crate::FactoryFullConfiguration<Self>, - executor: $crate::TaskExecutor, + config: $crate::FactoryFullConfiguration<Self> ) -> Result<Self::FullService, $crate::Error> { - ( $( $full_service_init )* ) (config, executor.clone()).and_then(|service| { + ( $( $full_service_init )* ) (config).and_then(|service| { let key = (&service).authority_key().map(Arc::new); - ($( $authority_setup )*)(service, executor, key) + ($( $authority_setup )*)(service, key) }) } } diff --git a/core/service/test/src/lib.rs b/core/service/test/src/lib.rs index e8d45750fdc025f4e8026fcc37f62c33894d6da1..a871b1cb0dc7a3035d6847e5f5bc94d83947e012 100644 --- a/core/service/test/src/lib.rs +++ b/core/service/test/src/lib.rs @@ -22,12 +22,14 @@ use std::net::Ipv4Addr; use std::time::Duration; use std::collections::HashMap; use log::info; -use futures::{Future, Stream}; +use futures::{Future, Stream, Poll}; use tempdir::TempDir; use tokio::{runtime::Runtime, prelude::FutureExt}; use tokio::timer::Interval; use service::{ + Service, ServiceFactory, + Components, Configuration, FactoryFullConfiguration, FactoryChainSpec, @@ -52,6 +54,17 @@ struct TestNet<F: ServiceFactory> { nodes: usize, } +/// Wraps around an `Arc<Service>>` and implements `Future`. +struct ArcService<T>(Arc<T>); +impl<T, C: Components> Future for ArcService<T> where T: std::ops::Deref<Target = Service<C>> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + Future::poll(&mut &**self.0) + } +} + impl<F: ServiceFactory> TestNet<F> { pub fn run_until_all_full<FP, LP>( &mut self, @@ -186,8 +199,9 @@ impl<F: ServiceFactory> TestNet<F> { self.authority_nodes.extend(authorities.iter().enumerate().map(|(index, key)| { let node_config = node_config::<F>(index as u32, &spec, Roles::AUTHORITY, Some(key.clone()), base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = Arc::new(F::new_full(node_config, executor.clone()) + let service = Arc::new(F::new_full(node_config) .expect("Error creating test node service")); + executor.spawn(ArcService(service.clone())); let addr = addr.with(multiaddr::Protocol::P2p(service.network().local_peer_id().into())); ((index + nodes) as u32, service, addr) })); @@ -196,8 +210,9 @@ impl<F: ServiceFactory> TestNet<F> { self.full_nodes.extend((nodes..nodes + full as usize).map(|index| { let node_config = node_config::<F>(index as u32, &spec, Roles::FULL, None, base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = Arc::new(F::new_full(node_config, executor.clone()) + let service = Arc::new(F::new_full(node_config) .expect("Error creating test node service")); + executor.spawn(ArcService(service.clone())); let addr = addr.with(multiaddr::Protocol::P2p(service.network().local_peer_id().into())); (index as u32, service, addr) })); @@ -206,8 +221,9 @@ impl<F: ServiceFactory> TestNet<F> { self.light_nodes.extend((nodes..nodes + light as usize).map(|index| { let node_config = node_config::<F>(index as u32, &spec, Roles::LIGHT, None, base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = Arc::new(F::new_light(node_config, executor.clone()) + let service = Arc::new(F::new_light(node_config) .expect("Error creating test node service")); + executor.spawn(ArcService(service.clone())); let addr = addr.with(multiaddr::Protocol::P2p(service.network().local_peer_id().into())); (index as u32, service, addr) })); @@ -247,7 +263,7 @@ pub fn connectivity<F: ServiceFactory>(spec: FactoryChainSpec<F>) { network.runtime }; - runtime.shutdown_on_idle().wait().expect("Error shutting down runtime"); + runtime.shutdown_now().wait().expect("Error shutting down runtime"); temp.close().expect("Error removing temp dir"); } diff --git a/node-template/src/cli.rs b/node-template/src/cli.rs index cd148f3462dce8cac7ffa981d37439967954ba3c..b799a5d9aee4233988302bb22fcd706eda724f79 100644 --- a/node-template/src/cli.rs +++ b/node-template/src/cli.rs @@ -25,16 +25,15 @@ pub fn run<I, T, E>(args: I, exit: E, version: VersionInfo) -> error::Result<()> info!("Node name: {}", config.name); info!("Roles: {:?}", config.roles); let runtime = Runtime::new().map_err(|e| format!("{:?}", e))?; - let executor = runtime.executor(); match config.roles { ServiceRoles::LIGHT => run_until_exit( runtime, - service::Factory::new_light(config, executor).map_err(|e| format!("{:?}", e))?, + service::Factory::new_light(config).map_err(|e| format!("{:?}", e))?, exit ), _ => run_until_exit( runtime, - service::Factory::new_full(config, executor).map_err(|e| format!("{:?}", e))?, + service::Factory::new_full(config).map_err(|e| format!("{:?}", e))?, exit ), }.map_err(|e| format!("{:?}", e)) @@ -55,7 +54,7 @@ fn run_until_exit<T, C, E>( e: E, ) -> error::Result<()> where - T: Deref<Target=substrate_service::Service<C>>, + T: Deref<Target=substrate_service::Service<C>> + Future<Item = (), Error = ()> + Send + 'static, C: substrate_service::Components, E: IntoExit, { @@ -64,13 +63,13 @@ fn run_until_exit<T, C, E>( let informant = informant::build(&service); runtime.executor().spawn(exit.until(informant).map(|_| ())); - let _ = runtime.block_on(e.into_exit()); - exit_send.fire(); - // we eagerly drop the service so that the internal exit future is fired, // but we need to keep holding a reference to the global telemetry guard let _telemetry = service.telemetry(); - drop(service); + + let _ = runtime.block_on(service.select(e.into_exit())); + exit_send.fire(); + Ok(()) } diff --git a/node-template/src/service.rs b/node-template/src/service.rs index 1de7bb47675242df4a45a84b9eaf56b4f40f2b47..248e13e1ce6cfaf85858212e418827e839cba11e 100644 --- a/node-template/src/service.rs +++ b/node-template/src/service.rs @@ -9,7 +9,6 @@ use node_template_runtime::{self, GenesisConfig, opaque::Block, RuntimeApi}; use substrate_service::{ FactoryFullConfiguration, LightComponents, FullComponents, FullBackend, FullClient, LightClient, LightBackend, FullExecutor, LightExecutor, - TaskExecutor, error::{Error as ServiceError}, }; use basic_authorship::ProposerFactory; @@ -62,11 +61,11 @@ construct_service_factory! { Genesis = GenesisConfig, Configuration = NodeConfig, FullService = FullComponents<Self> - { |config: FactoryFullConfiguration<Self>, executor: TaskExecutor| - FullComponents::<Factory>::new(config, executor) + { |config: FactoryFullConfiguration<Self>| + FullComponents::<Factory>::new(config) }, AuthoritySetup = { - |service: Self::FullService, executor: TaskExecutor, key: Option<Arc<Pair>>| { + |service: Self::FullService, key: Option<Arc<Pair>>| { if let Some(key) = key { info!("Using authority key {}", key.public()); let proposer = Arc::new(ProposerFactory { @@ -87,14 +86,14 @@ construct_service_factory! { service.config.custom.inherent_data_providers.clone(), service.config.force_authoring, )?; - executor.spawn(aura.select(service.on_exit()).then(|_| Ok(()))); + service.spawn_task(Box::new(aura.select(service.on_exit()).then(|_| Ok(())))); } Ok(service) } }, LightService = LightComponents<Self> - { |config, executor| <LightComponents<Factory>>::new(config, executor) }, + { |config| <LightComponents<Factory>>::new(config) }, FullImportQueue = AuraImportQueue< Self::Block, > diff --git a/node/cli/src/lib.rs b/node/cli/src/lib.rs index ab1fd03ae7b24eb7afc4a4ba44a710f143bc09c9..b18fa57411ce5bdd04b971dbcbf7e0ab4c2b275a 100644 --- a/node/cli/src/lib.rs +++ b/node/cli/src/lib.rs @@ -156,16 +156,15 @@ pub fn run<I, T, E>(args: I, exit: E, version: cli::VersionInfo) -> error::Resul info!("Roles: {:?}", config.roles); let runtime = RuntimeBuilder::new().name_prefix("main-tokio-").build() .map_err(|e| format!("{:?}", e))?; - let executor = runtime.executor(); match config.roles { ServiceRoles::LIGHT => run_until_exit( runtime, - service::Factory::new_light(config, executor).map_err(|e| format!("{:?}", e))?, + service::Factory::new_light(config).map_err(|e| format!("{:?}", e))?, exit ), _ => run_until_exit( runtime, - service::Factory::new_full(config, executor).map_err(|e| format!("{:?}", e))?, + service::Factory::new_full(config).map_err(|e| format!("{:?}", e))?, exit ), }.map_err(|e| format!("{:?}", e)) @@ -207,7 +206,7 @@ fn run_until_exit<T, C, E>( e: E, ) -> error::Result<()> where - T: Deref<Target=substrate_service::Service<C>>, + T: Deref<Target=substrate_service::Service<C>> + Future<Item = (), Error = ()> + Send + 'static, C: substrate_service::Components, E: IntoExit, { @@ -216,13 +215,12 @@ fn run_until_exit<T, C, E>( let informant = cli::informant::build(&service); runtime.executor().spawn(exit.until(informant).map(|_| ())); - let _ = runtime.block_on(e.into_exit()); - exit_send.fire(); - // we eagerly drop the service so that the internal exit future is fired, // but we need to keep holding a reference to the global telemetry guard let _telemetry = service.telemetry(); - drop(service); + + let _ = runtime.block_on(service.select(e.into_exit())); + exit_send.fire(); // TODO [andre]: timeout this future #1318 let _ = runtime.shutdown_on_idle().wait(); diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 058f58c9c2c7e1c8b9d631934e20dde1c77f53e7..e13c5b901d6ad4c95d5d58fa4ac52d4d7fc9928c 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -31,7 +31,7 @@ use node_primitives::Block; use node_runtime::{GenesisConfig, RuntimeApi}; use substrate_service::{ FactoryFullConfiguration, LightComponents, FullComponents, FullBackend, - FullClient, LightClient, LightBackend, FullExecutor, LightExecutor, TaskExecutor, + FullClient, LightClient, LightBackend, FullExecutor, LightExecutor, error::{Error as ServiceError}, }; use transaction_pool::{self, txpool::{Pool as TransactionPool}}; @@ -76,10 +76,10 @@ construct_service_factory! { Genesis = GenesisConfig, Configuration = NodeConfig<Self>, FullService = FullComponents<Self> - { |config: FactoryFullConfiguration<Self>, executor: TaskExecutor| - FullComponents::<Factory>::new(config, executor) }, + { |config: FactoryFullConfiguration<Self>| + FullComponents::<Factory>::new(config) }, AuthoritySetup = { - |mut service: Self::FullService, executor: TaskExecutor, local_key: Option<Arc<ed25519::Pair>>| { + |mut service: Self::FullService, local_key: Option<Arc<ed25519::Pair>>| { let (block_import, link_half) = service.config.custom.grandpa_import_setup.take() .expect("Link Half and Block Import are present for Full Services or setup failed before. qed"); @@ -104,7 +104,7 @@ construct_service_factory! { service.config.custom.inherent_data_providers.clone(), service.config.force_authoring, )?; - executor.spawn(aura.select(service.on_exit()).then(|_| Ok(()))); + service.spawn_task(Box::new(aura.select(service.on_exit()).then(|_| Ok(())))); info!("Running Grandpa session as Authority {}", key.public()); } @@ -125,18 +125,16 @@ construct_service_factory! { match config.local_key { None => { - executor.spawn(grandpa::run_grandpa_observer( + service.spawn_task(Box::new(grandpa::run_grandpa_observer( config, link_half, service.network(), service.on_exit(), - )?); + )?)); }, Some(_) => { let telemetry_on_connect = TelemetryOnConnect { - on_exit: Box::new(service.on_exit()), telemetry_connection_sinks: service.telemetry_on_connect_stream(), - executor: &executor, }; let grandpa_config = grandpa::GrandpaParams { config: config, @@ -146,7 +144,7 @@ construct_service_factory! { on_exit: service.on_exit(), telemetry_on_connect: Some(telemetry_on_connect), }; - executor.spawn(grandpa::run_grandpa_voter(grandpa_config)?); + service.spawn_task(Box::new(grandpa::run_grandpa_voter(grandpa_config)?)); }, } @@ -154,7 +152,7 @@ construct_service_factory! { } }, LightService = LightComponents<Self> - { |config, executor| <LightComponents<Factory>>::new(config, executor) }, + { |config| <LightComponents<Factory>>::new(config) }, FullImportQueue = AuraImportQueue<Self::Block> { |config: &mut FactoryFullConfiguration<Self> , client: Arc<FullClient<Self>>, select_chain: Self::SelectChain| { let slot_duration = SlotDuration::get_or_compute(&*client)?;