Unverified Commit 474b72a5 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

Add spawn_blocking to SubsystemContext (#1570)

* subsystem: add spawn_blocking to SubsystemContext

* candidate-validation: use spawn_blocking for exhaustive tasks
parent d81c98b7
Pipeline #104033 canceled with stages
in 5 minutes and 58 seconds
......@@ -318,7 +318,7 @@ async fn spawn_validate_exhaustive(
let _ = tx.send(res);
};
ctx.spawn("blocking-candidate-validation-task", fut.boxed()).await?;
ctx.spawn_blocking("blocking-candidate-validation-task", fut.boxed()).await?;
rx.await.map_err(Into::into)
}
......
......@@ -112,6 +112,13 @@ enum ToOverseer {
name: &'static str,
s: BoxFuture<'static, ()>,
},
/// Same as `SpawnJob` but for blocking tasks to be executed on a
/// dedicated thread pool.
SpawnBlockingJob {
name: &'static str,
s: BoxFuture<'static, ()>,
},
}
/// An event telling the `Overseer` on the particular block
......@@ -238,7 +245,8 @@ impl Debug for ToOverseer {
ToOverseer::SubsystemMessage(msg) => {
write!(f, "OverseerMessage::SubsystemMessage({:?})", msg)
}
ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)")
ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)"),
ToOverseer::SpawnBlockingJob { .. } => write!(f, "OverseerMessage::SpawnBlocking(..)")
}
}
}
......@@ -290,6 +298,17 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
Ok(())
}
async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
self.tx.send(ToOverseer::SpawnBlockingJob {
name,
s,
}).await?;
Ok(())
}
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage(msg)).await?;
......@@ -803,6 +822,9 @@ where
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
}
ToOverseer::SpawnBlockingJob { name, s } => {
self.spawn_blocking_job(name, s);
}
}
}
......@@ -992,6 +1014,10 @@ where
fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.s.spawn(name, j);
}
fn spawn_blocking_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.s.spawn_blocking(name, j);
}
}
fn spawn<S: SpawnNamed, M: Send + 'static>(
......
......@@ -177,6 +177,13 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes
Ok(())
}
async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
self.spawn.spawn_blocking(name, s);
Ok(())
}
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(msg).await.expect("test overseer no longer live");
Ok(())
......
......@@ -178,6 +178,13 @@ pub trait SubsystemContext: Send + 'static {
/// Spawn a child task on the executor.
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()>;
/// Spawn a blocking child task on the executor's dedicated thread pool.
async fn spawn_blocking(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()>;
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment