Unverified Commit 9e8b26fe authored by sandreim's avatar sandreim Committed by GitHub
Browse files

Per subsystem CPU usage tracking (#4239)



* SubsystemContext: add subsystem name str
Signed-off-by: sandreim's avatarAndrei Sandu <sandu.andrei@gmail.com>

* Overseer builder proc macro changes

* initilize SubsystemContext name field.
* Add subsystem name in TaskKind::launch_task()
Signed-off-by: sandreim's avatarAndrei Sandu <sandu.andrei@gmail.com>

* Update ToOverseer enum
Signed-off-by: sandreim's avatarAndrei Sandu <sandu.andrei@gmail.com>

* Assign subsystem names to orphan tasks
Signed-off-by: sandreim's avatarAndrei Sandu <sandu.andrei@gmail.com>

* cargo fmt
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* SubsystemContext: add subsystem name str
Signed-off-by: sandreim's avatarAndrei Sandu <sandu.andrei@gmail.com>

* Overseer builder proc macro changes

* initilize SubsystemContext name field.
* Add subsystem name in TaskKind::launch_task()
Signed-off-by: sandreim's avatarAndrei Sandu <sandu.andrei@gmail.com>

* Update ToOverseer enum
Signed-off-by: sandreim's avatarAndrei Sandu <sandu.andrei@gmail.com>

* Assign subsystem names to orphan tasks
Signed-off-by: sandreim's avatarAndrei Sandu <sandu.andrei@gmail.com>

* cargo fmt
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Rebase changes for new spawn() group param
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Add subsystem constat in JobTrait
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Add subsystem string
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Fix tests
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Fix spawn() calls
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* cargo fmt
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Fix
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Fix tests
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* fix
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Fix more tests
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Address PR review feedback #1

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Address PR review round 2
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Fixes
- remove JobTrait::Subsystem
- fix tests
Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* update Cargo.lock
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 83e5955b
Pipeline #165611 failed with stages
in 36 minutes and 32 seconds
This diff is collapsed.
......@@ -291,7 +291,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
let mut task_sender = sender.clone();
let metrics = metrics.clone();
ctx.spawn(
"collation generation collation builder",
"collation-builder",
Box::pin(async move {
let persisted_validation_data_hash = validation_data.hash();
......
......@@ -383,8 +383,9 @@ impl Error {
fn trace(&self) {
match self {
// don't spam the log with spurious errors
Self::RuntimeApi(_) | Self::Oneshot(_) =>
tracing::debug!(target: LOG_TARGET, err = ?self),
Self::RuntimeApi(_) | Self::Oneshot(_) => {
tracing::debug!(target: LOG_TARGET, err = ?self)
},
// it's worth reporting otherwise
_ => tracing::warn!(target: LOG_TARGET, err = ?self),
}
......
......@@ -659,7 +659,7 @@ impl CandidateBackingJob {
}
};
sender
.send_command(FromJobCommand::Spawn("Backing Validation", bg.boxed()))
.send_command(FromJobCommand::Spawn("backing-validation", bg.boxed()))
.await?;
}
......@@ -900,11 +900,13 @@ impl CandidateBackingJob {
.await;
match confirmation_rx.await {
Err(oneshot::Canceled) =>
tracing::debug!(target: LOG_TARGET, "Dispute coordinator confirmation lost",),
Err(oneshot::Canceled) => {
tracing::debug!(target: LOG_TARGET, "Dispute coordinator confirmation lost",)
},
Ok(ImportStatementsResult::ValidImport) => {},
Ok(ImportStatementsResult::InvalidImport) =>
tracing::warn!(target: LOG_TARGET, "Failed to import statements of validity",),
Ok(ImportStatementsResult::InvalidImport) => {
tracing::warn!(target: LOG_TARGET, "Failed to import statements of validity",)
},
}
}
......@@ -1168,7 +1170,7 @@ impl util::JobTrait for CandidateBackingJob {
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;
const NAME: &'static str = "CandidateBackingJob";
const NAME: &'static str = "candidate-backing-job";
fn run<S: SubsystemSender>(
parent: Hash,
......
......@@ -233,7 +233,7 @@ impl JobTrait for BitfieldSigningJob {
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;
const NAME: &'static str = "BitfieldSigningJob";
const NAME: &'static str = "bitfield-signing-job";
/// Run a job for the parent block indicated
fn run<S: SubsystemSender>(
......
......@@ -148,7 +148,7 @@ impl JobTrait for ProvisioningJob {
type RunArgs = ();
type Metrics = Metrics;
const NAME: &'static str = "ProvisioningJob";
const NAME: &'static str = "provisioner-job";
/// Run a job for the parent block indicated
//
......
......@@ -278,11 +278,21 @@ impl TaskExecutor {
}
impl sp_core::traits::SpawnNamed for TaskExecutor {
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
}
......
......@@ -270,7 +270,8 @@ where
)
}
} else {
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, request);
self.spawn_handle
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request);
self.active_requests.push(receiver);
}
}
......@@ -288,7 +289,8 @@ where
}
if let Some((req, recv)) = self.waiting_requests.pop_front() {
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req);
self.spawn_handle
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), req);
self.active_requests.push(recv);
}
}
......
......@@ -112,6 +112,7 @@ impl Jaeger {
// Spawn a background task that pulls span information and sends them on the network.
spawner.spawn(
"jaeger-collector",
Some("jaeger"),
Box::pin(async move {
match async_std::net::UdpSocket::bind("0.0.0.0:0").await {
Ok(udp_socket) => loop {
......
......@@ -197,7 +197,8 @@ impl TestState {
// lock ;-)
let update_tx = tx.clone();
harness.pool.spawn(
"Sending active leaves updates",
"sending-active-leaves-updates",
None,
async move {
for update in updates {
overseer_signal(update_tx.clone(), OverseerSignal::ActiveLeaves(update)).await;
......@@ -308,7 +309,8 @@ fn to_incoming_req(
let (tx, rx): (oneshot::Sender<netconfig::OutgoingResponse>, oneshot::Receiver<_>) =
oneshot::channel();
executor.spawn(
"Message forwarding",
"message-forwarding",
None,
async {
let response = rx.await;
let payload = response.expect("Unexpected canceled request").result;
......
......@@ -782,7 +782,7 @@ where
awaiting: vec![response_sender],
});
if let Err(e) = ctx.spawn("recovery task", Box::pin(remote)) {
if let Err(e) = ctx.spawn("recovery-task", Box::pin(remote)) {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
......
......@@ -102,12 +102,22 @@ struct Xxx {
struct DummySpawner;
impl SpawnNamed for DummySpawner {
fn spawn_blocking(&self, name: &'static str, _future: futures::future::BoxFuture<'static, ()>) {
unimplemented!("spawn blocking {}", name)
fn spawn_blocking(
&self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
_future: futures::future::BoxFuture<'static, ()>,
) {
unimplemented!("spawn blocking {} {}", task_name, subsystem_name.unwrap_or("default"))
}
fn spawn(&self, name: &'static str, _future: futures::future::BoxFuture<'static, ()>) {
unimplemented!("spawn {}", name)
fn spawn(
&self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
_future: futures::future::BoxFuture<'static, ()>,
) {
unimplemented!("spawn {} {}", task_name, subsystem_name.unwrap_or("default"))
}
}
......
......@@ -337,7 +337,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
// TODO generate a builder pattern that ensures this
// TODO https://github.com/paritytech/polkadot/issues/3427
let #subsystem_name = match self. #subsystem_name {
FieldInitMethod::Fn(func) => func(handle.clone())?,
FieldInitMethod::Fn(func) => func(handle.clone())?,
FieldInitMethod::Value(val) => val,
FieldInitMethod::Uninitialized =>
panic!("All subsystems must exist with the builder pattern."),
......@@ -349,11 +349,18 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#channel_name_rx, #channel_name_unbounded_rx
);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(SIGNAL_CHANNEL_CAPACITY);
// Generate subsystem name based on overseer field name.
let mut subsystem_string = String::from(stringify!(#subsystem_name));
// Convert owned `snake case` string to a `kebab case` static str.
let subsystem_static_str = Box::leak(subsystem_string.replace("_", "-").into_boxed_str());
let ctx = #subsyste_ctx_name::< #consumes >::new(
signal_rx,
message_rx,
channels_out.clone(),
to_overseer_tx.clone(),
subsystem_static_str
);
let #subsystem_name: OverseenSubsystem< #consumes > =
......@@ -364,6 +371,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
unbounded_meter,
ctx,
#subsystem_name,
subsystem_static_str,
&mut running_subsystems,
)?;
)*
......@@ -489,22 +497,22 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
/// Task kind to launch.
pub trait TaskKind {
/// Spawn a task, it depends on the implementer if this is blocking or not.
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>);
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>);
}
#[allow(missing_docs)]
struct Regular;
impl TaskKind for Regular {
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn(name, future)
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn(task_name, Some(subsystem_name), future)
}
}
#[allow(missing_docs)]
struct Blocking;
impl TaskKind for Blocking {
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn_blocking(name, future)
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn(task_name, Some(subsystem_name), future)
}
}
......@@ -517,6 +525,7 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
unbounded_meter: #support_crate ::metered::Meter,
ctx: Ctx,
s: SubSys,
subsystem_name: &'static str,
futures: &mut #support_crate ::FuturesUnordered<BoxFuture<'static, ::std::result::Result<(), #error_ty> >>,
) -> ::std::result::Result<OverseenSubsystem<M>, #error_ty >
where
......@@ -540,7 +549,7 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
let _ = tx.send(());
});
<TK as TaskKind>::launch_task(spawner, name, fut);
<TK as TaskKind>::launch_task(spawner, name, subsystem_name, fut);
futures.push(Box::pin(
rx.map(|e| {
......
......@@ -112,6 +112,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
>,
signals_received: SignalsReceived,
pending_incoming: Option<(usize, M)>,
name: &'static str
}
impl<M> #subsystem_ctx_name<M> {
......@@ -121,6 +122,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
messages: SubsystemIncomingMessages<M>,
to_subsystems: ChannelsOut,
to_overseer: #support_crate ::metered::UnboundedMeteredSender<#support_crate:: ToOverseer>,
name: &'static str
) -> Self {
let signals_received = SignalsReceived::default();
#subsystem_ctx_name {
......@@ -133,8 +135,13 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
to_overseer,
signals_received,
pending_incoming: None,
name
}
}
fn name(&self) -> &'static str {
self.name
}
}
#[#support_crate ::async_trait]
......@@ -229,6 +236,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
{
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnJob {
name,
subsystem: Some(self.name()),
s,
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
Ok(())
......@@ -239,6 +247,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
{
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnBlockingJob {
name,
subsystem: Some(self.name()),
s,
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
Ok(())
......
......@@ -111,6 +111,8 @@ pub enum ToOverseer {
SpawnJob {
/// Name of the task to spawn which be shown in jaeger and tracing logs.
name: &'static str,
/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
subsystem: Option<&'static str>,
/// The future to execute.
s: BoxFuture<'static, ()>,
},
......@@ -120,6 +122,8 @@ pub enum ToOverseer {
SpawnBlockingJob {
/// Name of the task to spawn which be shown in jaeger and tracing logs.
name: &'static str,
/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
subsystem: Option<&'static str>,
/// The future to execute.
s: BoxFuture<'static, ()>,
},
......@@ -128,8 +132,12 @@ pub enum ToOverseer {
impl fmt::Debug for ToOverseer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SpawnJob { name, .. } => writeln!(f, "SpawnJob{{ {}, ..}}", name),
Self::SpawnBlockingJob { name, .. } => writeln!(f, "SpawnBlockingJob{{ {}, ..}}", name),
Self::SpawnJob { name, subsystem, .. } => {
writeln!(f, "SpawnJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
},
Self::SpawnBlockingJob { name, subsystem, .. } => {
writeln!(f, "SpawnBlockingJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
},
}
}
}
......
......@@ -562,7 +562,10 @@ where
futures::future::ready(())
});
overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
overseer
.spawner()
.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
Ok(())
}
......@@ -616,11 +619,11 @@ where
},
msg = self.to_overseer_rx.select_next_some() => {
match msg {
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
ToOverseer::SpawnJob { name, subsystem, s } => {
self.spawn_job(name, subsystem, s);
}
ToOverseer::SpawnBlockingJob { name, s } => {
self.spawn_blocking_job(name, s);
ToOverseer::SpawnBlockingJob { name, subsystem, s } => {
self.spawn_blocking_job(name, subsystem, s);
}
}
},
......@@ -772,11 +775,21 @@ where
}
}
fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.spawner.spawn(name, j);
fn spawn_job(
&mut self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn(task_name, subsystem_name, j);
}
fn spawn_blocking_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.spawner.spawn_blocking(name, j);
fn spawn_blocking_job(
&mut self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn_blocking(task_name, subsystem_name, j);
}
}
......@@ -1151,6 +1151,7 @@ fn context_holds_onto_message_until_enough_signals_received() {
stream::select(bounded_rx, unbounded_rx),
channels_out,
to_overseer_tx,
"test",
);
assert_eq!(ctx.signals_received.load(), 0);
......
......@@ -380,7 +380,11 @@ where
let telemetry = telemetry.map(|(worker, telemetry)| {
if let Some(worker) = worker {
task_manager.spawn_handle().spawn("telemetry", worker.run());
task_manager.spawn_handle().spawn(
"telemetry",
Some("telemetry"),
Box::pin(worker.run()),
);
}
telemetry
});
......@@ -805,6 +809,7 @@ where
// Start the offchain workers to have
task_manager.spawn_handle().spawn(
"offchain-notifications",
None,
sc_offchain::notification_future(
config.role.is_authority(),
client.clone(),
......@@ -904,7 +909,11 @@ where
prometheus_registry.clone(),
);
task_manager.spawn_handle().spawn("authority-discovery-worker", worker.run());
task_manager.spawn_handle().spawn(
"authority-discovery-worker",
Some("authority-discovery"),
Box::pin(worker.run()),
);
Some(service)
} else {
None
......@@ -950,6 +959,7 @@ where
let handle = handle.clone();
task_manager.spawn_essential_handle().spawn_blocking(
"overseer",
None,
Box::pin(async move {
use futures::{pin_mut, select, FutureExt};
......@@ -1038,7 +1048,7 @@ where
};
let babe = babe::start_babe(babe_config)?;
task_manager.spawn_essential_handle().spawn_blocking("babe", babe);
task_manager.spawn_essential_handle().spawn_blocking("babe", None, babe);
}
// if the node isn't actively participating in consensus then it doesn't
......@@ -1063,9 +1073,11 @@ where
// Wococo's purpose is to be a testbed for BEEFY, so if it fails we'll
// bring the node down with it to make sure it is noticed.
if chain_spec.is_wococo() {
task_manager.spawn_essential_handle().spawn_blocking("beefy-gadget", gadget);
task_manager
.spawn_essential_handle()
.spawn_blocking("beefy-gadget", None, gadget);
} else {
task_manager.spawn_handle().spawn_blocking("beefy-gadget", gadget);
task_manager.spawn_handle().spawn_blocking("beefy-gadget", None, gadget);
}
}
......@@ -1119,9 +1131,11 @@ where
telemetry: telemetry.as_ref().map(|x| x.handle()),
};
task_manager
.spawn_essential_handle()
.spawn_blocking("grandpa-voter", grandpa::run_grandpa_voter(grandpa_config)?);
task_manager.spawn_essential_handle().spawn_blocking(
"grandpa-voter",
None,
grandpa::run_grandpa_voter(grandpa_config)?,
);
}
network_starter.start_network();
......
......@@ -212,7 +212,7 @@ where
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.spawn.spawn(name, s);
self.spawn.spawn(name, None, s);
Ok(())
}
......@@ -221,7 +221,7 @@ where
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.spawn.spawn_blocking(name, s);
self.spawn.spawn_blocking(name, None, s);
Ok(())
}
......@@ -396,7 +396,7 @@ mod tests {
let mut handle = Handle::new(handle);
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
spawner.spawn("overseer", None, overseer.run().then(|_| async { () }).boxed());
block_on(handle.send_msg_anon(CollatorProtocolMessage::CollateOn(Default::default())));
assert!(matches!(
......
......@@ -485,7 +485,7 @@ pub trait JobTrait: Unpin + Sized {
/// The `delegate_subsystem!` macro should take care of this.
type Metrics: 'static + metrics::Metrics + Send;
/// Name of the job, i.e. `CandidateBackingJob`
/// Name of the job, i.e. `candidate-backing-job`
const NAME: &'static str;
/// Run a job for the given relay `parent`.
......@@ -577,7 +577,11 @@ where
Ok(())
});
self.spawner.spawn(Job::NAME, future.map(drop).boxed());
self.spawner.spawn(
Job::NAME,
Some(Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME)),
future.map(drop).boxed(),
);
self.outgoing_msgs.push(from_job_rx);
let handle = JobHandle { _abort_handle: AbortOnDrop(abort_handle), to_job: to_job_tx };
......@@ -750,6 +754,6 @@ where
Ok(())
});
SpawnedSubsystem { name: Job::NAME.strip_suffix("Job").unwrap_or(Job::NAME), future }
SpawnedSubsystem { name: Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME), future }
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment