diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index bc731dd2de87b8afaca83a3cfed67a7e3b36d6fe..db60c94997f384b828e5ee3d4b4951f53ede08e4 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see <http://www.gnu.org/licenses/>. use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm}; -use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, TransactionPoolAdapter}; +use crate::{TaskManagerBuilder, start_rpc_servers, build_network_future, TransactionPoolAdapter}; use crate::status_sinks; use crate::config::{Configuration, DatabaseConfig, KeystoreConfig}; use sc_client_api::{ @@ -30,7 +30,7 @@ use sp_consensus::import_queue::ImportQueue; use futures::{ Future, FutureExt, StreamExt, channel::mpsc, - future::{select, ready} + future::ready, }; use sc_keystore::{Store as Keystore}; use log::{info, warn, error}; @@ -44,7 +44,6 @@ use sp_runtime::traits::{ use sp_api::ProvideRuntimeApi; use sc_executor::{NativeExecutor, NativeExecutionDispatch}; use std::{ - borrow::Cow, io::{Read, Write, Seek}, marker::PhantomData, sync::Arc, pin::Pin }; @@ -117,6 +116,7 @@ pub struct ServiceBuilder<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TF config: Configuration<TGen, TCSExt>, pub (crate) client: Arc<TCl>, backend: Arc<Backend>, + tasks_builder: TaskManagerBuilder, keystore: Arc<RwLock<Keystore>>, fetcher: Option<TFchr>, select_chain: Option<TSc>, @@ -181,6 +181,7 @@ type TFullParts<TBl, TRtApi, TExecDisp> = ( TFullClient<TBl, TRtApi, TExecDisp>, Arc<TFullBackend<TBl>>, Arc<RwLock<sc_keystore::Store>>, + TaskManagerBuilder, ); /// Creates a new full client for the given config. @@ -212,6 +213,8 @@ fn new_full_parts<TBl, TRtApi, TExecDisp, TGen, TCSExt>( KeystoreConfig::None => return Err("No keystore config provided!".into()), }; + let tasks_builder = TaskManagerBuilder::new(); + let executor = NativeExecutor::<TExecDisp>::new( config.wasm_method, config.default_heap_pages, @@ -262,7 +265,7 @@ fn new_full_parts<TBl, TRtApi, TExecDisp, TGen, TCSExt>( )? }; - Ok((client, backend, keystore)) + Ok((client, backend, keystore, tasks_builder)) } impl<TGen, TCSExt> ServiceBuilder<(), (), TGen, TCSExt, (), (), (), (), (), (), (), (), ()> @@ -285,7 +288,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension { (), TFullBackend<TBl>, >, Error> { - let (client, backend, keystore) = new_full_parts(&config)?; + let (client, backend, keystore, tasks_builder) = new_full_parts(&config)?; let client = Arc::new(client); @@ -294,6 +297,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension { client, backend, keystore, + tasks_builder, fetcher: None, select_chain: None, import_queue: (), @@ -326,6 +330,8 @@ where TGen: RuntimeGenesis, TCSExt: Extension { (), TLightBackend<TBl>, >, Error> { + let tasks_builder = TaskManagerBuilder::new(); + let keystore = match &config.keystore { KeystoreConfig::Path { path, password } => Keystore::open( path.clone(), @@ -378,6 +384,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension { config, client, backend, + tasks_builder, keystore, fetcher: Some(fetcher.clone()), select_chain: None, @@ -451,6 +458,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T config: self.config, client: self.client, backend: self.backend, + tasks_builder: self.tasks_builder, keystore: self.keystore, fetcher: self.fetcher, select_chain, @@ -494,6 +502,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T config: self.config, client: self.client, backend: self.backend, + tasks_builder: self.tasks_builder, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, @@ -534,6 +543,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T config: self.config, client: self.client, backend: self.backend, + tasks_builder: self.tasks_builder, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, @@ -598,6 +608,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T config: self.config, client: self.client, backend: self.backend, + tasks_builder: self.tasks_builder, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, @@ -657,6 +668,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T Ok(ServiceBuilder { config: self.config, client: self.client, + tasks_builder: self.tasks_builder, backend: self.backend, keystore: self.keystore, fetcher: self.fetcher, @@ -686,6 +698,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T config: self.config, client: self.client, backend: self.backend, + tasks_builder: self.tasks_builder, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, @@ -707,6 +720,7 @@ impl<TBl, TRtApi, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, T config: self.config, client: self.client, backend: self.backend, + tasks_builder: self.tasks_builder, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, @@ -819,6 +833,7 @@ ServiceBuilder< marker: _, mut config, client, + tasks_builder, fetcher: on_demand, backend, keystore, @@ -839,12 +854,6 @@ ServiceBuilder< config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(), )?; - let (signal, exit) = exit_future::signal(); - - // List of asynchronous tasks to spawn. We collect them, then spawn them all at once. - let (to_spawn_tx, to_spawn_rx) = - mpsc::unbounded::<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>(); - // A side-channel for essential tasks to communicate shutdown. let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded(); @@ -869,7 +878,7 @@ ServiceBuilder< imports_external_transactions: !config.roles.is_light(), pool: transaction_pool.clone(), client: client.clone(), - executor: SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }, + executor: tasks_builder.spawn_handle(), }); let protocol_id = { @@ -899,11 +908,9 @@ ServiceBuilder< let network_params = sc_network::config::Params { roles: config.roles, executor: { - let to_spawn_tx = to_spawn_tx.clone(); + let spawn_handle = tasks_builder.spawn_handle(); Some(Box::new(move |fut| { - if let Err(e) = to_spawn_tx.unbounded_send((fut, From::from("libp2p-node"))) { - error!("Failed to spawn libp2p background task: {:?}", e); - } + spawn_handle.spawn("libp2p-node", fut); })) }, network_config: config.network.clone(), @@ -935,20 +942,19 @@ ServiceBuilder< _ => None, }; + let spawn_handle = tasks_builder.spawn_handle(); + // Spawn background tasks which were stacked during the // service building. for (title, background_task) in background_tasks { - let _ = to_spawn_tx.unbounded_send(( - background_task, - title.into(), - )); + spawn_handle.spawn(title, background_task); } { // block notifications let txpool = Arc::downgrade(&transaction_pool); let offchain = offchain_workers.as_ref().map(Arc::downgrade); - let to_spawn_tx_ = to_spawn_tx.clone(); + let notifications_spawn_handle = tasks_builder.spawn_handle(); let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone(); let is_validator = config.roles.is_authority(); @@ -970,15 +976,14 @@ ServiceBuilder< let offchain = offchain.as_ref().and_then(|o| o.upgrade()); match offchain { Some(offchain) if is_new_best => { - let future = offchain.on_block_imported( - &header, - network_state_info.clone(), - is_validator, + notifications_spawn_handle.spawn( + "offchain-on-block", + offchain.on_block_imported( + &header, + network_state_info.clone(), + is_validator, + ), ); - let _ = to_spawn_tx_.unbounded_send(( - Box::pin(future), - From::from("offchain-on-block"), - )); }, Some(_) => log::debug!( target: "sc_offchain", @@ -991,20 +996,19 @@ ServiceBuilder< let txpool = txpool.upgrade(); if let Some(txpool) = txpool.as_ref() { - let future = txpool.maintain(event); - let _ = to_spawn_tx_.unbounded_send(( - Box::pin(future), - From::from("txpool-maintain") - )); + notifications_spawn_handle.spawn( + "txpool-maintain", + txpool.maintain(event), + ); } ready(()) }); - let _ = to_spawn_tx.unbounded_send(( - Box::pin(select(events, exit.clone()).map(drop)), - From::from("txpool-and-offchain-notif"), - )); + spawn_handle.spawn( + "txpool-and-offchain-notif", + events, + ); } { @@ -1024,28 +1028,20 @@ ServiceBuilder< ready(()) }); - let _ = to_spawn_tx.unbounded_send(( - Box::pin(select(events, exit.clone()).map(drop)), - From::from("telemetry-on-block"), - )); + spawn_handle.spawn( + "telemetry-on-block", + events, + ); } // Prometheus metrics let metrics = if let Some((registry, port)) = prometheus_registry_and_port.clone() { let metrics = ServiceMetrics::register(®istry)?; - metrics.node_roles.set(u64::from(config.roles.bits())); - - let future = select( - prometheus_endpoint::init_prometheus(port, registry).boxed(), - exit.clone() - ).map(drop); - - let _ = to_spawn_tx.unbounded_send(( - Box::pin(future), - From::from("prometheus-endpoint") - )); - + spawn_handle.spawn( + "prometheus-endpoint", + prometheus_endpoint::init_prometheus(port, registry).map(drop) + ); Some(metrics) } else { None @@ -1123,10 +1119,10 @@ ServiceBuilder< ready(()) }); - let _ = to_spawn_tx.unbounded_send(( - Box::pin(select(tel_task, exit.clone()).map(drop)), - From::from("telemetry-periodic-send"), - )); + spawn_handle.spawn( + "telemetry-periodic-send", + tel_task, + ); // Periodically send the network state to the telemetry. let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>(); @@ -1139,10 +1135,10 @@ ServiceBuilder< ); ready(()) }); - let _ = to_spawn_tx.unbounded_send(( - Box::pin(select(tel_task_2, exit.clone()).map(drop)), - From::from("telemetry-periodic-network-state"), - )); + spawn_handle.spawn( + "telemetry-periodic-network-state", + tel_task_2, + ); // RPC let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded(); @@ -1156,10 +1152,7 @@ ServiceBuilder< properties: chain_spec.properties().clone(), }; - let subscriptions = sc_rpc::Subscriptions::new(Arc::new(SpawnTaskHandle { - sender: to_spawn_tx.clone(), - on_exit: exit.clone() - })); + let subscriptions = sc_rpc::Subscriptions::new(Arc::new(tasks_builder.spawn_handle())); let (chain, state) = if let (Some(remote_backend), Some(on_demand)) = (remote_backend.as_ref(), on_demand.as_ref()) { @@ -1217,18 +1210,17 @@ ServiceBuilder< let rpc_handlers = gen_handler(); let rpc = start_rpc_servers(&config, gen_handler)?; - - let _ = to_spawn_tx.unbounded_send(( - Box::pin(select(build_network_future( + spawn_handle.spawn( + "network-worker", + build_network_future( config.roles, network_mut, client.clone(), network_status_sinks.clone(), system_rpc_rx, has_bootnodes, - ), exit.clone()).map(drop)), - From::from("network-worker"), - )); + ), + ); let telemetry_connection_sinks: Arc<Mutex<Vec<futures::channel::mpsc::UnboundedSender<()>>>> = Default::default(); @@ -1269,9 +1261,12 @@ ServiceBuilder< }); ready(()) }); - let _ = to_spawn_tx.unbounded_send((Box::pin(select( - future, exit.clone() - ).map(drop)), From::from("telemetry-worker"))); + + spawn_handle.spawn( + "telemetry-worker", + future, + ); + telemetry }); @@ -1288,21 +1283,13 @@ ServiceBuilder< Ok(Service { client, + task_manager: tasks_builder.into_task_manager(config.task_executor.ok_or(Error::TaskExecutorRequired)?), network, network_status_sinks, select_chain, transaction_pool, - exit, - signal: Some(signal), essential_failed_tx, essential_failed_rx, - to_spawn_tx, - to_spawn_rx, - task_executor: if let Some(exec) = config.task_executor { - exec - } else { - return Err(Error::TaskExecutorRequired); - }, rpc_handlers, _rpc: rpc, _telemetry: telemetry, diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 5c59cdf91fcf4105de748d19bca2e8496fb9470a..db56c141db0b5bb7c6765a2ff8f53ec8039592e1 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -26,6 +26,7 @@ pub mod error; mod builder; mod status_sinks; +mod task_manager; use std::{borrow::Cow, io, pin::Pin}; use std::marker::PhantomData; @@ -37,10 +38,9 @@ use std::task::{Poll, Context}; use parking_lot::Mutex; use sc_client::Client; -use exit_future::Signal; use futures::{ Future, FutureExt, Stream, StreamExt, - future::select, channel::mpsc, + channel::mpsc, compat::*, sink::SinkExt, task::{Spawn, FutureObj, SpawnError}, @@ -69,6 +69,8 @@ pub use sc_executor::NativeExecutionDispatch; pub use std::{ops::Deref, result::Result, sync::Arc}; #[doc(hidden)] pub use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder}; +pub use task_manager::{TaskManagerBuilder, SpawnTaskHandle}; +use task_manager::TaskManager; const DEFAULT_PROTOCOL_ID: &str = "sup"; @@ -85,28 +87,18 @@ impl<T> MallocSizeOfWasm for T {} /// Substrate service. pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> { client: Arc<TCl>, + task_manager: TaskManager, select_chain: Option<TSc>, network: Arc<TNet>, /// Sinks to propagate network status updates. /// For each element, every time the `Interval` fires we push an element on the sender. network_status_sinks: Arc<Mutex<status_sinks::StatusSinks<(TNetStatus, NetworkState)>>>, transaction_pool: Arc<TTxPool>, - /// A future that resolves when the service has exited, this is useful to - /// make sure any internally spawned futures stop when the service does. - exit: exit_future::Exit, - /// A signal that makes the exit future above resolve, fired on service drop. - signal: Option<Signal>, /// Send a signal when a spawned essential task has concluded. The next time /// the service future is polled it should complete with an error. essential_failed_tx: mpsc::UnboundedSender<()>, /// A receiver for spawned essential-tasks concluding. essential_failed_rx: mpsc::UnboundedReceiver<()>, - /// Sender for futures that must be spawned as background tasks. - to_spawn_tx: mpsc::UnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>, - /// Receiver for futures that must be spawned as background tasks. - to_spawn_rx: mpsc::UnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>, - /// How to spawn background tasks. - task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>, rpc_handlers: sc_rpc_server::RpcHandler<sc_rpc::Metadata>, _rpc: Box<dyn std::any::Any + Send + Sync>, _telemetry: Option<sc_telemetry::Telemetry>, @@ -119,48 +111,6 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> { impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Unpin for Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {} -/// Alias for a an implementation of `futures::future::Executor`. -pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>; - -/// An handle for spawning tasks in the service. -#[derive(Clone)] -pub struct SpawnTaskHandle { - sender: mpsc::UnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>, - on_exit: exit_future::Exit, -} - -impl SpawnTaskHandle { - /// Spawns the given task with the given name. - pub fn spawn(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) { - let on_exit = self.on_exit.clone(); - let future = async move { - futures::pin_mut!(task); - let _ = select(on_exit, task).await; - }; - if self.sender.unbounded_send((Box::pin(future), name.into())).is_err() { - error!("Failed to send task to spawn over channel"); - } - } -} - -impl Spawn for SpawnTaskHandle { - fn spawn_obj(&self, future: FutureObj<'static, ()>) - -> Result<(), SpawnError> { - let future = select(self.on_exit.clone(), future).map(drop); - self.sender.unbounded_send((Box::pin(future), From::from("unnamed"))) - .map_err(|_| SpawnError::shutdown()) - } -} - -type Boxed01Future01 = Box<dyn futures01::Future<Item = (), Error = ()> + Send + 'static>; - -impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle { - fn execute(&self, future: Boxed01Future01) -> Result<(), futures01::future::ExecuteError<Boxed01Future01>>{ - self.spawn("unnamed", future.compat().map(drop)); - Ok(()) - } -} - /// Abstraction over a Substrate service. pub trait AbstractService: 'static + Future<Output = Result<(), Error>> + Spawn + Send + Unpin { @@ -225,6 +175,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> + fn transaction_pool(&self) -> Arc<Self::TransactionPool>; /// Get a handle to a future that will resolve on exit. + #[deprecated(note = "Use `spawn_task`/`spawn_essential_task` instead, those functions will attach on_exit signal.")] fn on_exit(&self) -> ::exit_future::Exit; /// Get the prometheus metrics registry, if available. @@ -265,12 +216,7 @@ where } fn spawn_task(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) { - let on_exit = self.on_exit(); - let task = async move { - futures::pin_mut!(task); - let _ = select(on_exit, task).await; - }; - let _ = self.to_spawn_tx.unbounded_send((Box::pin(task), name.into())); + self.task_manager.spawn(name, task) } fn spawn_essential_task(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) { @@ -281,20 +227,12 @@ where error!("Essential task failed. Shutting down service."); let _ = essential_failed.send(()); }); - let on_exit = self.on_exit(); - let task = async move { - futures::pin_mut!(essential_task); - let _ = select(on_exit, essential_task).await; - }; - let _ = self.to_spawn_tx.unbounded_send((Box::pin(task), name.into())); + let _ = self.spawn_task(name, essential_task); } fn spawn_task_handle(&self) -> SpawnTaskHandle { - SpawnTaskHandle { - sender: self.to_spawn_tx.clone(), - on_exit: self.on_exit(), - } + self.task_manager.spawn_handle() } fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send>> { @@ -330,7 +268,7 @@ where } fn on_exit(&self) -> exit_future::Exit { - self.exit.clone() + self.task_manager.on_exit() } fn prometheus_registry(&self) -> Option<prometheus_endpoint::Registry> { @@ -355,9 +293,7 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for } } - while let Poll::Ready(Some((task_to_spawn, name))) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) { - (this.task_executor)(Box::pin(futures_diagnose::diagnose(name, task_to_spawn))); - } + this.task_manager.process_receiver(cx); // The service future never ends. Poll::Pending @@ -371,7 +307,7 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Spawn for &self, future: FutureObj<'static, ()> ) -> Result<(), SpawnError> { - self.to_spawn_tx.unbounded_send((Box::pin(future), From::from("unnamed"))) + self.task_manager.scheduler().unbounded_send((Box::pin(future), From::from("unnamed"))) .map_err(|_| SpawnError::shutdown()) } } @@ -525,17 +461,6 @@ pub struct NetworkStatus<B: BlockT> { pub average_upload_per_sec: u64, } -impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Drop for - Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> -{ - fn drop(&mut self) { - debug!(target: "service", "Substrate service shutdown"); - if let Some(signal) = self.signal.take() { - let _ = signal.fire(); - } - } -} - #[cfg(not(target_os = "unknown"))] // Wrapper for HTTP and WS servers that makes sure they are properly shut down. mod waiting { diff --git a/substrate/client/service/src/task_manager.rs b/substrate/client/service/src/task_manager.rs new file mode 100644 index 0000000000000000000000000000000000000000..d7041e44b9c1e17eaa2d82e0e8a80467ddcffcb3 --- /dev/null +++ b/substrate/client/service/src/task_manager.rs @@ -0,0 +1,190 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! Substrate service tasks management module. + +use std::{ + result::Result, sync::Arc, + task::{Poll, Context}, + borrow::Cow, pin::Pin, +}; +use exit_future::Signal; +use log::{debug, error}; +use futures::{ + Future, FutureExt, Stream, + future::select, channel::mpsc, + compat::*, + task::{Spawn, FutureObj, SpawnError}, +}; + +/// Type alias for service task executor (usually runtime). +pub type ServiceTaskExecutor = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>; + +/// Type alias for the task scheduler. +pub type TaskScheduler = mpsc::UnboundedSender<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>; + +/// Helper struct to setup background tasks execution for service. +pub struct TaskManagerBuilder { + /// A future that resolves when the service has exited, this is useful to + /// make sure any internally spawned futures stop when the service does. + on_exit: exit_future::Exit, + /// A signal that makes the exit future above resolve, fired on service drop. + signal: Option<Signal>, + /// Sender for futures that must be spawned as background tasks. + to_spawn_tx: TaskScheduler, + /// Receiver for futures that must be spawned as background tasks. + to_spawn_rx: mpsc::UnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>, +} + +impl TaskManagerBuilder { + /// New asynchronous task manager setup. + pub fn new() -> Self { + let (signal, on_exit) = exit_future::signal(); + let (to_spawn_tx, to_spawn_rx) = mpsc::unbounded(); + Self { + on_exit, + signal: Some(signal), + to_spawn_tx, + to_spawn_rx, + } + } + + /// Get spawn handle. + /// + /// Tasks spawned through this handle will get scheduled once + /// service is up and running. + pub fn spawn_handle(&self) -> SpawnTaskHandle { + SpawnTaskHandle { + on_exit: self.on_exit.clone(), + sender: self.to_spawn_tx.clone(), + } + } + + /// Convert into actual task manager from initial setup. + pub(crate) fn into_task_manager(self, executor: ServiceTaskExecutor) -> TaskManager { + let TaskManagerBuilder { + on_exit, + signal, + to_spawn_rx, + to_spawn_tx + } = self; + TaskManager { + on_exit, + signal, + to_spawn_tx, + to_spawn_rx, + executor, + } + } +} + +/// An handle for spawning tasks in the service. +#[derive(Clone)] +pub struct SpawnTaskHandle { + sender: TaskScheduler, + on_exit: exit_future::Exit, +} + +impl SpawnTaskHandle { + /// Spawns the given task with the given name. + pub fn spawn(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) { + let on_exit = self.on_exit.clone(); + let future = async move { + futures::pin_mut!(task); + let _ = select(on_exit, task).await; + }; + if self.sender.unbounded_send((Box::pin(future), name.into())).is_err() { + error!("Failed to send task to spawn over channel"); + } + } +} + +impl Spawn for SpawnTaskHandle { + fn spawn_obj(&self, future: FutureObj<'static, ()>) + -> Result<(), SpawnError> { + let future = select(self.on_exit.clone(), future).map(drop); + self.sender.unbounded_send((Box::pin(future), From::from("unnamed"))) + .map_err(|_| SpawnError::shutdown()) + } +} + +type Boxed01Future01 = Box<dyn futures01::Future<Item = (), Error = ()> + Send + 'static>; + +impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle { + fn execute(&self, future: Boxed01Future01) -> Result<(), futures01::future::ExecuteError<Boxed01Future01>>{ + self.spawn("unnamed", future.compat().map(drop)); + Ok(()) + } +} + +/// Helper struct to manage background/async tasks in Service. +pub struct TaskManager { + /// A future that resolves when the service has exited, this is useful to + /// make sure any internally spawned futures stop when the service does. + on_exit: exit_future::Exit, + /// A signal that makes the exit future above resolve, fired on service drop. + signal: Option<Signal>, + /// Sender for futures that must be spawned as background tasks. + to_spawn_tx: TaskScheduler, + /// Receiver for futures that must be spawned as background tasks. + to_spawn_rx: mpsc::UnboundedReceiver<(Pin<Box<dyn Future<Output = ()> + Send>>, Cow<'static, str>)>, + /// How to spawn background tasks. + executor: ServiceTaskExecutor, +} + +impl TaskManager { + /// Spawn background/async task, which will be aware on exit signal. + pub(super) fn spawn(&self, name: impl Into<Cow<'static, str>>, task: impl Future<Output = ()> + Send + 'static) { + let on_exit = self.on_exit.clone(); + let future = async move { + futures::pin_mut!(task); + let _ = select(on_exit, task).await; + }; + if self.to_spawn_tx.unbounded_send((Box::pin(future), name.into())).is_err() { + error!("Failed to send task to spawn over channel"); + } + } + + pub(super) fn spawn_handle(&self) -> SpawnTaskHandle { + SpawnTaskHandle { + on_exit: self.on_exit.clone(), + sender: self.to_spawn_tx.clone(), + } + } + + /// Get sender where background/async tasks can be sent. + pub(super) fn scheduler(&self) -> TaskScheduler { + self.to_spawn_tx.clone() + } + + /// Process background task receiver. + pub(super) fn process_receiver(&mut self, cx: &mut Context) { + while let Poll::Ready(Some((task_to_spawn, name))) = Pin::new(&mut self.to_spawn_rx).poll_next(cx) { + (self.executor)(Box::pin(futures_diagnose::diagnose(name, task_to_spawn))); + } + } + + /// Clone on exit signal. + pub(super) fn on_exit(&self) -> exit_future::Exit { + self.on_exit.clone() + } +} + +impl Drop for TaskManager { + fn drop(&mut self) { + debug!(target: "service", "Tasks manager shutdown"); + if let Some(signal) = self.signal.take() { + let _ = signal.fire(); + } + } +}