Unverified Commit 9ba989dd authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Use `SpawnNamed` to give tasks names (#1379)

parent 8ab7370e
Pipeline #99818 passed with stages
in 25 minutes and 10 seconds
......@@ -23,7 +23,7 @@
#![warn(missing_docs)]
use futures::prelude::*;
use futures::{channel::{mpsc, oneshot}, task::Spawn};
use futures::channel::{mpsc, oneshot};
use keystore::KeyStorePtr;
use polkadot_primitives::{
Hash, Block,
......@@ -39,6 +39,7 @@ use client::{
};
use sp_api::{ApiExt, ProvideRuntimeApi};
use codec::{Encode, Decode};
use sp_core::traits::SpawnNamed;
use log::warn;
......@@ -174,7 +175,7 @@ impl Store {
&self,
wrapped_block_import: I,
client: Arc<P>,
spawner: impl Spawn,
spawner: impl SpawnNamed,
keystore: KeyStorePtr,
) -> ClientResult<AvailabilityBlockImport<I, P>>
where
......
......@@ -20,7 +20,7 @@ use std::sync::Arc;
use std::thread;
use log::{error, info, trace, warn};
use sp_blockchain::{Result as ClientResult};
use sp_blockchain::Result as ClientResult;
use sp_runtime::traits::{Header as HeaderT, Block as BlockT, HashFor, BlakeTwo256};
use sp_api::{ApiExt, ProvideRuntimeApi};
use client::{
......@@ -32,12 +32,13 @@ use consensus_common::{
ImportResult,
import_queue::CacheKeyId,
};
use sp_core::traits::SpawnNamed;
use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::parachain::{
ParachainHost, ValidatorId, AbridgedCandidateReceipt, AvailableData,
ValidatorPair, ErasureChunk,
};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}};
use futures::future::AbortHandle;
use keystore::KeyStorePtr;
......@@ -641,7 +642,7 @@ impl<I, P> AvailabilityBlockImport<I, P> {
pub(crate) fn new(
client: Arc<P>,
block_import: I,
spawner: impl Spawn,
spawner: impl SpawnNamed,
keystore: KeyStorePtr,
to_worker: mpsc::UnboundedSender<WorkerMsg>,
) -> Self
......@@ -662,9 +663,7 @@ impl<I, P> AvailabilityBlockImport<I, P> {
to_worker.clone(),
));
if let Err(_) = spawner.spawn(prune_available.map(drop)) {
error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
}
spawner.spawn("polkadot-prune-availibility", prune_available.map(drop).boxed());
AvailabilityBlockImport {
client,
......
......@@ -50,7 +50,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;
use futures::{future, Future, Stream, FutureExt, StreamExt, task::Spawn};
use futures::{future, Future, Stream, FutureExt, StreamExt};
use log::warn;
use sc_client_api::{StateBackend, BlockchainEvents};
use sp_blockchain::HeaderBackend;
......@@ -82,6 +82,7 @@ use polkadot_service_new::{
Error as ServiceError, FullNodeHandles, PolkadotClient,
};
use sc_service::SpawnTaskHandle;
use sp_core::traits::SpawnNamed;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
......@@ -133,7 +134,7 @@ pub trait BuildParachainContext {
Client::Api: RuntimeApiCollection<Extrinsic>,
<Client::Api as ApiExt<Block>>::StateBackend: StateBackend<HashFor<Block>>,
Extrinsic: codec::Codec + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static;
SP: SpawnNamed + Clone + Send + Sync + 'static;
}
/// Parachain context needed for collation.
......@@ -233,7 +234,7 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
Extrinsic: service::Codec + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static,
SP: SpawnNamed + Clone + Send + Sync + 'static,
{
Err("Collator is not functional with the new service yet".into())
}
......
......@@ -295,7 +295,7 @@ pub(crate) fn pov_block_topic(parent_hash: Hash) -> Hash {
pub fn register_validator<C: ChainContext + 'static>(
service: Arc<NetworkService<Block, Hash>>,
chain: C,
executor: &impl futures::task::Spawn,
executor: &impl sp_core::traits::SpawnNamed,
) -> RegisteredMessageValidator
{
let s = service.clone();
......@@ -331,12 +331,7 @@ pub fn register_validator<C: ChainContext + 'static>(
let fut = futures::future::poll_fn(move |cx| {
gossip_engine.lock().poll_unpin(cx)
});
let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(fut)));
// Note: we consider the chances of an error to spawn a background task almost null.
if spawn_res.is_err() {
log::error!(target: "polkadot-gossip", "Failed to spawn background task");
}
executor.spawn("polkadot-legacy-gossip-engine", fut.boxed());
}
RegisteredMessageValidator {
......
......@@ -26,7 +26,7 @@ use codec::{Decode, Encode};
use futures::channel::{mpsc, oneshot};
use futures::future::Either;
use futures::prelude::*;
use futures::task::{Spawn, SpawnExt, Context, Poll};
use futures::task::{Context, Poll};
use futures::stream::{FuturesUnordered, StreamFuture};
use log::{debug, trace};
......@@ -44,6 +44,7 @@ use polkadot_validation::{
use sc_network::{ObservedRole, Event, PeerId};
use sp_api::ProvideRuntimeApi;
use sp_runtime::ConsensusEngineId;
use sp_core::traits::SpawnNamed;
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::pin::Pin;
......@@ -126,7 +127,9 @@ enum ServiceToWorkerMsg {
/// Messages from a background task to the main worker task.
enum BackgroundToWorkerMsg {
// Spawn a given future.
Spawn(future::BoxFuture<'static, ()>),
//
// The name is used for the future task.
Spawn(&'static str, future::BoxFuture<'static, ()>),
}
/// Operations that a handle to an underlying network service should provide.
......@@ -221,7 +224,7 @@ pub fn start<C, Api, SP>(
C: ChainContext + 'static,
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
SP: Spawn + Clone + Send + 'static,
SP: SpawnNamed + Clone + Send + 'static,
{
const SERVICE_TO_WORKER_BUF: usize = 256;
......@@ -234,67 +237,73 @@ pub fn start<C, Api, SP>(
chain_context,
&executor,
);
executor.spawn(worker_loop(
config,
service.clone(),
gossip_validator,
api,
worker_receiver,
executor.clone(),
))?;
executor.spawn(
"polkadot-network-worker",
worker_loop(
config,
service.clone(),
gossip_validator,
api,
worker_receiver,
executor.clone(),
).boxed(),
);
let polkadot_service = Service {
sender: worker_sender.clone(),
network_service: service.clone(),
};
executor.spawn(async move {
while let Some(event) = event_stream.next().await {
let res = match event {
Event::Dht(_) => continue,
Event::NotificationStreamOpened {
remote,
engine_id,
role,
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }
worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, role)).await
},
Event::NotificationStreamClosed {
remote,
engine_id,
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }
worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await
},
Event::NotificationsReceived {
remote,
messages,
} => {
let our_notifications = messages.into_iter()
.filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID {
Some(message)
} else {
None
})
.collect();
executor.spawn(
"polkadot-network-notifications",
async move {
while let Some(event) = event_stream.next().await {
let res = match event {
Event::Dht(_) => continue,
Event::NotificationStreamOpened {
remote,
engine_id,
role,
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }
worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, role)).await
},
Event::NotificationStreamClosed {
remote,
engine_id,
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }
worker_sender.send(
ServiceToWorkerMsg::PeerMessage(remote, our_notifications)
).await
}
};
worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await
},
Event::NotificationsReceived {
remote,
messages,
} => {
let our_notifications = messages.into_iter()
.filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID {
Some(message)
} else {
None
})
.collect();
worker_sender.send(
ServiceToWorkerMsg::PeerMessage(remote, our_notifications)
).await
}
};
if let Err(e) = res {
// full is impossible here, as we've `await`ed the value being sent.
if e.is_disconnected() {
break
if let Err(e) = res {
// full is impossible here, as we've `await`ed the value being sent.
if e.is_disconnected() {
break
}
}
}
}
})?;
}.boxed(),
);
Ok(polkadot_service)
}
......@@ -845,7 +854,7 @@ struct Worker<Api, Sp, Gossip> {
impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
Sp: Spawn + Clone,
Sp: SpawnNamed + Clone,
Gossip: GossipOps,
{
// spawns a background task to spawn consensus networking.
......@@ -888,14 +897,17 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
// glue the incoming messages, shared table, and validation
// work together.
let _ = self.executor.spawn(statement_import_loop(
relay_parent,
table,
self.api.clone(),
self.gossip_handle.clone(),
self.background_to_main_sender.clone(),
exit,
));
self.executor.spawn(
"polkadot-statement-import-loop",
statement_import_loop(
relay_parent,
table,
self.api.clone(),
self.gossip_handle.clone(),
self.background_to_main_sender.clone(),
exit,
).boxed(),
);
}
fn handle_service_message(&mut self, message: ServiceToWorkerMsg) {
......@@ -932,12 +944,15 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
// before placing in the pool, so we can safely check by candidate hash.
let get_msg = fetch_pov_from_gossip(&candidate, &self.gossip_handle);
let _ = self.executor.spawn(async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((pov_block, _)) = res {
let _ = sender.send(pov_block);
}
});
self.executor.spawn(
"polkadot-fetch-pov-block",
async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((pov_block, _)) = res {
let _ = sender.send(pov_block);
}
}.boxed(),
);
}
ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => {
let topic = crate::erasure_coding_topic(&candidate_hash);
......@@ -963,12 +978,15 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
"gossip message streams do not conclude early; qed"
));
let _ = self.executor.spawn(async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((chunk, _)) = res {
let _ = sender.send(chunk);
}
});
self.executor.spawn(
"polkadot-fetch-erasure-chunk",
async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((chunk, _)) = res {
let _ = sender.send(chunk);
}
}.boxed(),
);
}
ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => {
let topic = crate::erasure_coding_topic(&candidate_hash);
......@@ -1017,8 +1035,8 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
fn handle_background_message(&mut self, message: BackgroundToWorkerMsg) {
match message {
BackgroundToWorkerMsg::Spawn(task) => {
let _ = self.executor.spawn(task);
BackgroundToWorkerMsg::Spawn(name, task) => {
let _ = self.executor.spawn(name, task);
}
}
}
......@@ -1068,7 +1086,7 @@ async fn worker_loop<Api, Sp>(
) where
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
Sp: Spawn + Clone,
Sp: SpawnNamed + Clone,
{
const BACKGROUND_TO_MAIN_BUF: usize = 16;
......@@ -1250,7 +1268,7 @@ async fn statement_import_loop<Api>(
let work = future::select(work.boxed(), exit.clone()).map(drop);
if let Err(_) = to_worker.send(
BackgroundToWorkerMsg::Spawn(work.boxed())
BackgroundToWorkerMsg::Spawn("polkadot-statement-import-loop-sub-task", work.boxed())
).await {
// can fail only if remote has hung up - worker is dead,
// we should die too. this is defensive, since the exit future
......
......@@ -30,11 +30,24 @@ use av_store::{Store as AvailabilityStore, ErasureNetworking};
use sc_network_gossip::TopicNotification;
use sp_api::{ApiRef, ProvideRuntimeApi};
use sp_runtime::traits::Block as BlockT;
use sp_core::crypto::Pair;
use sp_core::{crypto::Pair, traits::SpawnNamed};
use sp_keyring::Sr25519Keyring;
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use futures::executor::{LocalPool, LocalSpawner};
use futures::task::{LocalSpawnExt, SpawnExt};
#[derive(Clone)]
struct Executor(LocalSpawner);
impl SpawnNamed for Executor {
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_local(future).unwrap();
}
fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.spawn(name, future);
}
}
#[derive(Default)]
pub struct MockNetworkOps {
......@@ -243,7 +256,7 @@ fn test_setup(config: Config) -> (
mock_gossip.clone(),
api.clone(),
worker_rx,
pool.spawner(),
Executor(pool.spawner()),
);
let service = Service {
......
......@@ -26,13 +26,12 @@
//!
//! These attestation sessions are kept live until they are periodically garbage-collected.
use std::{time::{Duration, Instant}, sync::Arc, pin::Pin};
use std::collections::HashMap;
use std::{time::{Duration, Instant}, sync::Arc, pin::Pin, collections::HashMap};
use crate::pipeline::FullOutput;
use sc_client_api::{BlockchainEvents, BlockBackend};
use consensus::SelectChain;
use futures::{prelude::*, task::{Spawn, SpawnExt}};
use futures::prelude::*;
use polkadot_primitives::{Block, Hash, BlockId};
use polkadot_primitives::parachain::{
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
......@@ -42,17 +41,15 @@ use keystore::KeyStorePtr;
use sp_api::{ProvideRuntimeApi, ApiExt};
use runtime_primitives::traits::HashFor;
use availability_store::Store as AvailabilityStore;
use primitives::traits::SpawnNamed;
use ansi_term::Colour;
use log::{warn, error, info, debug, trace};
use log::{warn, info, debug, trace};
use super::{Network, Collators, SharedTable, TableRouter};
use crate::Error;
use crate::pipeline::ValidationPool;
/// A handle to spawn background tasks onto.
pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
// Remote processes may request for a validation instance to be cloned or instantiated.
// They send a oneshot channel.
type ValidationInstanceRequest = (
......@@ -148,7 +145,7 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
N::BuildTableRouter: Send + Unpin + 'static,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
SC: SelectChain<Block> + 'static,
SP: Spawn + Send + 'static,
SP: SpawnNamed + Send + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
{
......@@ -337,7 +334,7 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
N::TableRouter: Send + 'static + Sync,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
N::BuildTableRouter: Unpin + Send + 'static,
SP: Spawn + Send + 'static,
SP: SpawnNamed + Send + 'static,
CF: CollationFetch + Clone + Send + Sync + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
......@@ -453,19 +450,16 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
let collation_fetch = self.collation_fetch.clone();
let router = router.clone();
let res = self.spawner.spawn(
self.spawner.spawn(
"polkadot-parachain-validation-work",
launch_work(
move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
availability_store,
router,
n_validators,
index,
),
).boxed(),
);
if let Err(e) = res {
error!(target: "validation", "Failed to launch work: {:?}", e);
}
}
let tracker = ValidationInstanceHandle {
......@@ -549,7 +543,7 @@ async fn launch_work<CFF, E>(
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor::{ThreadPool, self}, future::ready, channel::mpsc};
use futures::{executor, future::ready, channel::mpsc};
use availability_store::ErasureNetworking;
use polkadot_primitives::parachain::{
PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex,
......@@ -559,6 +553,7 @@ mod tests {
use runtime_primitives::traits::Block as BlockT;
use std::pin::Pin;
use sp_keyring::sr25519::Keyring;
use primitives::testing::SpawnBlockingExecutor;
/// Events fired while running mock implementations to follow execution.
enum Events {
......@@ -719,7 +714,7 @@ mod tests {
#[test]
fn launch_work_is_executed_properly() {
let executor = ThreadPool::new().unwrap();
let executor = SpawnBlockingExecutor::new();
let keystore = keystore::Store::new_in_memory();
// Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator.
......@@ -759,7 +754,7 @@ mod tests {
#[test]
fn router_is_built_on_relay_chain_validator() {
let executor = ThreadPool::new().unwrap();
let executor = SpawnBlockingExecutor::new();
let keystore = keystore::Store::new_in_memory();
// Make sure `Alice` key is in the keystore, so this mocked node will be a relay-chain validator.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment