Unverified Commit d34585dd authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Change SpawnedSubsystem type to log subsystem errors (#1878)

* Change SpawnedSubsystem type to log subsystem errors

* Remove clone
parent ac9bc28a
Pipeline #112510 passed with stages
in 15 minutes and 49 seconds
......@@ -165,7 +165,10 @@ where
Context: SubsystemContext<Message = CollationGenerationMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(self.run(ctx));
let future = Box::pin(async move {
self.run(ctx).await;
Ok(())
});
SpawnedSubsystem {
name: "collation-generation-subsystem",
......
......@@ -27,7 +27,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
use codec::{Encode, Decode};
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt};
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt, TryFutureExt};
use futures_timer::Delay;
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};
......@@ -969,9 +969,7 @@ where
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(self, ctx)
.map(|r| if let Err(e) = r {
log::error!(target: "availabilitystore", "Subsystem exited with an error {:?}", e);
})
.map_err(|e| SubsystemError::with_origin("availability-store", e))
.boxed();
SpawnedSubsystem {
......
......@@ -121,7 +121,6 @@ impl<S, C> Subsystem<C> for CandidateValidationSubsystem<S> where
fn start(self, ctx: C) -> SpawnedSubsystem {
let future = run(ctx, self.spawn, self.metrics)
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
.map(|_| ())
.boxed();
SpawnedSubsystem {
name: "candidate-validation-subsystem",
......
......@@ -67,7 +67,6 @@ impl<Client, Context> Subsystem<Context> for ChainApiSubsystem<Client> where
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(ctx, self)
.map_err(|e| SubsystemError::with_origin("chain-api", e))
.map(|_| ())
.boxed();
SpawnedSubsystem {
future,
......
......@@ -60,7 +60,7 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem {
future: run(ctx, self).map(|_| ()).boxed(),
future: run(ctx, self).boxed(),
name: "runtime-api-subsystem",
}
}
......
......@@ -823,7 +823,6 @@ where
let future = self
.run(ctx)
.map_err(|e| SubsystemError::with_origin("availability-distribution", e))
.map(|_| ())
.boxed();
SpawnedSubsystem {
......
......@@ -585,7 +585,7 @@ where
.map_err(|e| {
SubsystemError::with_origin("bitfield-distribution", e)
})
.map(|_| ()).boxed();
.boxed();
SpawnedSubsystem {
name: "bitfield-distribution-subsystem",
......
......@@ -231,7 +231,6 @@ impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
.map_err(|e| {
SubsystemError::with_origin("network-bridge", e)
})
.map(|_| ())
.boxed();
SpawnedSubsystem {
name: "network-bridge-subsystem",
......
......@@ -20,7 +20,7 @@
#![deny(missing_docs, unused_crate_dependencies)]
use std::time::Duration;
use futures::{channel::oneshot, FutureExt};
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use log::trace;
use thiserror::Error;
......@@ -122,9 +122,14 @@ where
Context: SubsystemContext<Message = CollatorProtocolMessage> + Sync + Send,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self
.run(ctx)
.map_err(|e| SubsystemError::with_origin("collator-protocol", e))
.boxed();
SpawnedSubsystem {
name: "collator-protocol-subsystem",
future: self.run(ctx).map(|_| ()).boxed(),
future,
}
}
}
......
......@@ -66,7 +66,6 @@ impl<C> Subsystem<C> for PoVDistribution
// within `run`.
let future = self.run(ctx)
.map_err(|e| SubsystemError::with_origin("pov-distribution", e))
.map(|_| ())
.boxed();
SpawnedSubsystem {
name: "pov-distribution-subsystem",
......@@ -616,4 +615,4 @@ impl metrics::Metrics for Metrics {
}
#[cfg(test)]
mod tests;
\ No newline at end of file
mod tests;
......@@ -81,7 +81,7 @@ impl<C> Subsystem<C> for StatementDistribution
// within `run`.
SpawnedSubsystem {
name: "statement-distribution-subsystem",
future: self.run(ctx).map(|_| ()).boxed(),
future: self.run(ctx).boxed(),
}
}
}
......
......@@ -76,6 +76,7 @@ impl<C> Subsystem<C> for Subsystem1
fn start(self, ctx: C) -> SpawnedSubsystem {
let future = Box::pin(async move {
Self::run(ctx).await;
Ok(())
});
SpawnedSubsystem {
......@@ -121,6 +122,7 @@ impl<C> Subsystem<C> for Subsystem2
fn start(self, ctx: C) -> SpawnedSubsystem {
let future = Box::pin(async move {
Self::run(ctx).await;
Ok(())
});
SpawnedSubsystem {
......
......@@ -1606,7 +1606,9 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
let (tx, rx) = oneshot::channel();
let fut = Box::pin(async move {
future.await;
if let Err(e) = future.await {
log::error!("Subsystem {} exited with error {:?}", name, e);
}
let _ = tx.send(());
});
......@@ -1658,8 +1660,8 @@ mod tests {
i += 1;
continue;
}
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
Err(_) => return,
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Err(_) => return Ok(()),
_ => (),
}
}
......@@ -1704,11 +1706,13 @@ mod tests {
Ok(Some(_)) => {
continue;
}
Err(_) => return,
Err(_) => return Ok(()),
_ => (),
}
pending!();
}
Ok(())
}),
}
}
......@@ -1724,6 +1728,7 @@ mod tests {
name: "test-subsystem-4",
future: Box::pin(async move {
// Do nothing and exit.
Ok(())
}),
}
}
......@@ -1902,11 +1907,13 @@ mod tests {
continue;
},
Ok(Some(_)) => continue,
Err(_) => return,
Err(_) => break,
_ => (),
}
pending!();
}
Ok(())
}),
}
}
......@@ -1931,11 +1938,13 @@ mod tests {
continue;
},
Ok(Some(_)) => continue,
Err(_) => return,
Err(_) => break,
_ => (),
}
pending!();
}
Ok(())
}),
}
}
......@@ -2180,6 +2189,8 @@ mod tests {
}
pending!();
}
Ok(())
}),
}
}
......
......@@ -303,11 +303,11 @@ impl<C: SubsystemContext<Message = Msg>, Msg: Send + 'static> Subsystem<C> for F
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOverseer::Communication { msg }) => {
let _ = self.0.send(msg).await;
},
Err(_) => return,
Err(_) => return Ok(()),
_ => (),
}
}
......
......@@ -942,6 +942,7 @@ where
let future = Box::pin(async move {
Self::run(ctx, run_args, metrics, spawner, errors).await;
Ok(())
});
SpawnedSubsystem {
......
......@@ -164,7 +164,7 @@ pub struct SpawnedSubsystem {
/// Name of the subsystem being spawned.
pub name: &'static str,
/// The task of the subsystem being spawned.
pub future: BoxFuture<'static, ()>,
pub future: BoxFuture<'static, SubsystemResult<()>>,
}
/// A `Result` type that wraps [`SubsystemError`].
......@@ -233,8 +233,8 @@ impl<C: SubsystemContext> Subsystem<C> for DummySubsystem {
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
Err(_) => return,
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Err(_) => return Ok(()),
_ => continue,
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment