Unverified Commit 8230fef8 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

make `ctx.spawn` blocking (#3337)

* make spawn sync

* improve error type
parent f560edf6
Pipeline #143586 passed with stages
in 35 minutes and 54 seconds
......@@ -412,7 +412,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
"failed to send collation result",
);
}
})).await?;
}))?;
}
}
......
......@@ -2100,7 +2100,6 @@ async fn launch_approval(
let (background, remote_handle) = background.remote_handle();
ctx.spawn("approval-checks", Box::pin(background))
.await
.map(move |()| Some(remote_handle))
}
......
......@@ -107,7 +107,7 @@ async fn run(
let (mut validation_host, task) = polkadot_node_core_pvf::start(
polkadot_node_core_pvf::Config::new(cache_path, program_path),
);
ctx.spawn_blocking("pvf-validation-host", task.boxed()).await?;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
loop {
match ctx.recv().await? {
......
......@@ -134,20 +134,20 @@ where
}
}
async fn spawn(
fn spawn(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.inner.spawn(name, s).await
self.inner.spawn(name, s)
}
async fn spawn_blocking(
fn spawn_blocking(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.inner.spawn_blocking(name, s).await
self.inner.spawn_blocking(name, s)
}
fn sender(&mut self) -> &mut Self::Sender {
......
......@@ -169,14 +169,13 @@ mod tests {
#[test]
fn failed_send_does_not_inc_sent() {
let (mut bounded, _) = channel::<Msg>(5);
let (mut unbounded, _) = unbounded::<Msg>();
let (unbounded, _) = unbounded::<Msg>();
block_on(async move {
assert!(bounded.send(Msg::default()).await.is_err());
assert!(bounded.try_send(Msg::default()).is_err());
assert_eq!(bounded.meter().read(), Readout { sent: 0, received: 0 });
assert!(unbounded.send(Msg::default()).await.is_err());
assert!(unbounded.unbounded_send(Msg::default()).is_err());
assert_eq!(unbounded.meter().read(), Readout { sent: 0, received: 0 });
});
......
......@@ -16,7 +16,7 @@
//! Metered variant of unbounded mpsc channels to be able to extract metrics.
use futures::{channel::mpsc, task::Poll, task::Context, sink::SinkExt, stream::Stream};
use futures::{channel::mpsc, task::Poll, task::Context, stream::Stream};
use std::result;
use std::pin::Pin;
......@@ -130,21 +130,6 @@ impl<T> UnboundedMeteredSender<T> {
&self.meter
}
/// Send message, wait until capacity is available.
pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError>
where
Self: Unpin,
{
self.meter.note_sent();
let fut = self.inner.send(item);
futures::pin_mut!(fut);
fut.await.map_err(|e| {
self.meter.retract_sent();
e
})
}
/// Attempt to send message or fail immediately.
pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
self.meter.note_sent();
......@@ -154,34 +139,3 @@ impl<T> UnboundedMeteredSender<T> {
})
}
}
impl<T> futures::sink::Sink<T> for UnboundedMeteredSender<T> {
type Error = <futures::channel::mpsc::UnboundedSender<T> as futures::sink::Sink<T>>::Error;
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::new(&mut self.inner).start_send(item)
}
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.inner).poll_ready(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_ready(cx) {
val @ Poll::Ready(_)=> {
val
}
other => other,
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_ready(cx) {
val @ Poll::Ready(_)=> {
self.meter.note_sent();
val
}
other => other,
}
}
}
......@@ -76,7 +76,6 @@ where
.with_validator_index(from_validator)
.with_relay_parent(parent);
ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
.await
.map_err(|e| Fatal::SpawnTask(e))?;
Ok(())
}
......
......@@ -189,7 +189,6 @@ impl FetchTask {
let (handle, kill) = oneshot::channel();
ctx.spawn("chunk-fetcher", running.run(kill).boxed())
.await
.map_err(|e| Fatal::SpawnTask(e))?;
Ok(FetchTask {
......
......@@ -650,7 +650,7 @@ async fn launch_interaction(
awaiting: vec![response_sender],
});
if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)).await {
if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)) {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
......
......@@ -887,7 +887,7 @@ where
shared.clone(),
).remote_handle();
ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?;
ctx.spawn("network-bridge-network-worker", Box::pin(remote))?;
ctx.send_message(AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(statement_receiver)
......
......@@ -1238,8 +1238,7 @@ async fn launch_request(
)
.remote_handle();
let result = ctx.spawn("large-statement-fetcher", task.boxed())
.await;
let result = ctx.spawn("large-statement-fetcher", task.boxed());
if let Err(err) = result {
tracing::error!(target: LOG_TARGET, ?err, "Spawning task failed.");
return None
......@@ -1952,9 +1951,7 @@ impl StatementDistribution {
ctx.spawn(
"large-statement-responder",
respond(receiver, res_sender.clone()).boxed()
)
.await
.map_err(Fatal::SpawnTask)?;
).map_err(Fatal::SpawnTask)?;
}
}
}
......
......@@ -103,7 +103,7 @@ impl Subsystem2 {
Delay::new(Duration::from_secs(1)).await;
}
}),
).await.unwrap();
).unwrap();
loop {
match ctx.try_recv().await {
......
......@@ -926,22 +926,22 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
}
}
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
self.to_overseer.send(ToOverseer::SpawnJob {
self.to_overseer.unbounded_send(ToOverseer::SpawnJob {
name,
s,
}).await.map_err(Into::into)
}).map_err(|_| SubsystemError::TaskSpawn(name))
}
async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
self.to_overseer.send(ToOverseer::SpawnBlockingJob {
self.to_overseer.unbounded_send(ToOverseer::SpawnBlockingJob {
name,
s,
}).await.map_err(Into::into)
}).map_err(|_| SubsystemError::TaskSpawn(name))
}
fn sender(&mut self) -> &mut OverseerSubsystemSender {
......
......@@ -224,7 +224,7 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
.ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))
}
async fn spawn(
fn spawn(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
......@@ -233,7 +233,7 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
Ok(())
}
async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
self.spawn.spawn_blocking(name, s);
......
......@@ -726,9 +726,9 @@ impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
}
outgoing = jobs.next() => {
let res = match outgoing.expect("the Jobs stream never ends; qed") {
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await,
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task),
FromJobCommand::SpawnBlocking(name, task)
=> ctx.spawn_blocking(name, task).await,
=> ctx.spawn_blocking(name, task),
};
if let Err(e) = res {
......
......@@ -197,8 +197,8 @@ pub enum SubsystemError {
#[error(transparent)]
QueueError(#[from] mpsc::SendError),
#[error(transparent)]
TaskSpawn(#[from] futures::task::SpawnError),
#[error("Failed to spawn a task: {0}")]
TaskSpawn(&'static str),
#[error(transparent)]
Infallible(#[from] std::convert::Infallible),
......@@ -293,10 +293,10 @@ pub trait SubsystemContext: Send + Sized + 'static {
async fn recv(&mut self) -> SubsystemResult<FromOverseer<Self::Message>>;
/// Spawn a child task on the executor.
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()>;
fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()>;
/// Spawn a blocking child task on the executor's dedicated thread pool.
async fn spawn_blocking(
fn spawn_blocking(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment