Unverified Commit aaf5e55f authored by Ashley's avatar Ashley
Browse files

Switch to Spawn trait

parent 2ab282f5
...@@ -31,7 +31,7 @@ use block_builder::BlockBuilderApi; ...@@ -31,7 +31,7 @@ use block_builder::BlockBuilderApi;
use consensus::SelectChain; use consensus::SelectChain;
use availability_store::Store as AvailabilityStore; use availability_store::Store as AvailabilityStore;
use futures01::prelude::*; use futures01::prelude::*;
use futures::{StreamExt, FutureExt, Future, future::{ready, select}}; use futures::{StreamExt, FutureExt, Future, future::{ready, select}, task::{Spawn, SpawnExt}};
use polkadot_primitives::{Block, BlockId}; use polkadot_primitives::{Block, BlockId};
use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost}; use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost};
use runtime_primitives::traits::{ProvideRuntimeApi}; use runtime_primitives::traits::{ProvideRuntimeApi};
...@@ -40,11 +40,11 @@ use keystore::KeyStorePtr; ...@@ -40,11 +40,11 @@ use keystore::KeyStorePtr;
use sr_api::ApiExt; use sr_api::ApiExt;
use tokio::{runtime::Runtime as LocalRuntime}; use tokio::{runtime::Runtime as LocalRuntime};
use log::warn; use log::{warn, error};
use super::{Network, Collators}; use super::{Network, Collators};
type TaskExecutor = futures::executor::ThreadPool; type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
/// Gets a list of the candidates in a block. /// Gets a list of the candidates in a block.
pub(crate) fn fetch_candidates<P: BlockBody<Block>>(client: &P, block: &BlockId) pub(crate) fn fetch_candidates<P: BlockBody<Block>>(client: &P, block: &BlockId)
...@@ -183,7 +183,9 @@ pub(crate) fn start<C, N, P, SC>( ...@@ -183,7 +183,9 @@ pub(crate) fn start<C, N, P, SC>(
}; };
runtime.spawn(notifications); runtime.spawn(notifications);
thread_pool.spawn_ok(prune_old_sessions); if let Err(_) = thread_pool.spawn(prune_old_sessions) {
error!("Failed to spawn old sessions pruning task");
}
let prune_available = futures::future::select( let prune_available = futures::future::select(
prune_unneeded_availability(client, availability_store), prune_unneeded_availability(client, availability_store),
...@@ -192,7 +194,9 @@ pub(crate) fn start<C, N, P, SC>( ...@@ -192,7 +194,9 @@ pub(crate) fn start<C, N, P, SC>(
.map(|_| ()); .map(|_| ());
// spawn this on the tokio executor since it's fine on a thread pool. // spawn this on the tokio executor since it's fine on a thread pool.
thread_pool.spawn_ok(prune_available); if let Err(_) = thread_pool.spawn(prune_available) {
error!("Failed to spawn available pruning task");
}
runtime.block_on(exit); runtime.block_on(exit);
}); });
......
...@@ -61,24 +61,23 @@ use attestation_service::ServiceHandle; ...@@ -61,24 +61,23 @@ use attestation_service::ServiceHandle;
use futures01::prelude::*; use futures01::prelude::*;
use futures::{ use futures::{
future::{self, Either, select}, FutureExt, StreamExt, compat::Future01CompatExt, Stream, future::{self, Either, select}, FutureExt, StreamExt, compat::Future01CompatExt, Stream,
stream::unfold stream::unfold, task::{Spawn, SpawnExt}
}; };
use collation::CollationFetch; use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion; use dynamic_inclusion::DynamicInclusion;
use inherents::InherentData; use inherents::InherentData;
use sp_timestamp::TimestampInherentData; use sp_timestamp::TimestampInherentData;
use log::{info, debug, warn, trace}; use log::{info, debug, warn, trace, error};
use keystore::KeyStorePtr; use keystore::KeyStorePtr;
use sr_api::ApiExt; use sr_api::ApiExt;
type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin { fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin {
unfold((), move |_| { unfold((), move |_| {
futures_timer::Delay::new(duration).map(|_| Some(((), ()))) futures_timer::Delay::new(duration).map(|_| Some(((), ())))
}).map(drop) }).map(drop)
} }
type TaskExecutor = futures::executor::ThreadPool;
pub use self::collation::{ pub use self::collation::{
validate_collation, validate_incoming, message_queue_root, egress_roots, Collators, validate_collation, validate_incoming, message_queue_root, egress_roots, Collators,
}; };
...@@ -423,7 +422,9 @@ impl<C, N, P> ParachainValidation<C, N, P> where ...@@ -423,7 +422,9 @@ impl<C, N, P> ParachainValidation<C, N, P> where
let cancellable_work = select(exit, router).map(|_| ()); let cancellable_work = select(exit, router).map(|_| ());
// spawn onto thread pool. // spawn onto thread pool.
self.handle.spawn_ok(cancellable_work) if self.handle.spawn(cancellable_work).is_err() {
error!("Failed to spawn cancellable work task");
}
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment