diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index f12f6b1b9d7b37b61d28278816f763e825c55945..179f9be309e7a4b9577ecfcca72820a5802a5541 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -318,7 +318,7 @@ async fn spawn_validate_exhaustive( let _ = tx.send(res); }; - ctx.spawn("blocking-candidate-validation-task", fut.boxed()).await?; + ctx.spawn_blocking("blocking-candidate-validation-task", fut.boxed()).await?; rx.await.map_err(Into::into) } diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 185a0f5614bad064ad3ab48774f8c0e3d3a72ec6..4db8209b8b6dbe30caf2471af91c452b8344d700 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -112,6 +112,13 @@ enum ToOverseer { name: &'static str, s: BoxFuture<'static, ()>, }, + + /// Same as `SpawnJob` but for blocking tasks to be executed on a + /// dedicated thread pool. + SpawnBlockingJob { + name: &'static str, + s: BoxFuture<'static, ()>, + }, } /// An event telling the `Overseer` on the particular block @@ -238,7 +245,8 @@ impl Debug for ToOverseer { ToOverseer::SubsystemMessage(msg) => { write!(f, "OverseerMessage::SubsystemMessage({:?})", msg) } - ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)") + ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)"), + ToOverseer::SpawnBlockingJob { .. } => write!(f, "OverseerMessage::SpawnBlocking(..)") } } } @@ -290,6 +298,17 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> { Ok(()) } + async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) + -> SubsystemResult<()> + { + self.tx.send(ToOverseer::SpawnBlockingJob { + name, + s, + }).await?; + + Ok(()) + } + async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { self.tx.send(ToOverseer::SubsystemMessage(msg)).await?; @@ -803,6 +822,9 @@ where ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); } + ToOverseer::SpawnBlockingJob { name, s } => { + self.spawn_blocking_job(name, s); + } } } @@ -992,6 +1014,10 @@ where fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) { self.s.spawn(name, j); } + + fn spawn_blocking_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) { + self.s.spawn_blocking(name, j); + } } fn spawn<S: SpawnNamed, M: Send + 'static>( diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 4d9b70d4f8e3037bcd62b880ed0bead93e43e8bf..26c55354fbe1230f7a203de2edb6eafaf20ce997 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -177,6 +177,13 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes Ok(()) } + async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) + -> SubsystemResult<()> + { + self.spawn.spawn_blocking(name, s); + Ok(()) + } + async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { self.tx.send(msg).await.expect("test overseer no longer live"); Ok(()) diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 91d485739bd1f8248b0fb9f3ab3dd610e8fbe65f..97d09f88cd814190be389de6fe7b3b7e559682c2 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -178,6 +178,13 @@ pub trait SubsystemContext: Send + 'static { /// Spawn a child task on the executor. async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()>; + /// Spawn a blocking child task on the executor's dedicated thread pool. + async fn spawn_blocking( + &mut self, + name: &'static str, + s: Pin<Box<dyn Future<Output = ()> + Send>>, + ) -> SubsystemResult<()>; + /// Send a direct message to some other `Subsystem`, routed based on message type. async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>;