Unverified Commit 3d9ae5e8 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

overseer: observe stalled subsystems and shut down (#2148)

* overseer: observe stalled subsystems and shut down

* notify on send_message failure as well
parent b1a4c82d
Pipeline #117773 passed with stages
in 30 minutes and 29 seconds
......@@ -90,7 +90,7 @@ pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}};
use polkadot_node_primitives::SpawnNamed;
......@@ -289,6 +289,7 @@ impl Debug for ToOverseer {
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M> {
tx: mpsc::Sender<FromOverseer<M>>,
name: &'static str,
}
/// A context type that is given to the [`Subsystem`] upon spawning.
......@@ -389,22 +390,41 @@ impl<M> OverseenSubsystem<M> {
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_message(&mut self, msg: M) -> SubsystemResult<()> {
const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);
if let Some(ref mut instance) = self.instance {
instance.tx.send(FromOverseer::Communication { msg }).await?;
match instance.tx.send(
FromOverseer::Communication { msg }
).timeout(MESSAGE_TIMEOUT).await
{
None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
Err(SubsystemError::SubsystemStalled(instance.name))
}
Some(res) => res.map_err(Into::into),
}
} else {
Ok(())
}
Ok(())
}
/// Send a signal to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10);
if let Some(ref mut instance) = self.instance {
instance.tx.send(FromOverseer::Signal(signal)).await?;
match instance.tx.send(FromOverseer::Signal(signal)).timeout(SIGNAL_TIMEOUT).await {
None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
Err(SubsystemError::SubsystemStalled(instance.name))
}
Some(res) => res.map_err(Into::into),
}
} else {
Ok(())
}
Ok(())
}
}
......@@ -1319,7 +1339,7 @@ where
match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg).await;
self.route_message(msg).await?;
}
Event::Stop => {
self.stop().await;
......@@ -1344,7 +1364,7 @@ where
};
match msg {
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await,
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await?,
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
}
......@@ -1445,55 +1465,57 @@ where
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn route_message(&mut self, msg: AllMessages) {
async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.metrics.on_message_relayed();
match msg {
AllMessages::CandidateValidation(msg) => {
let _ = self.candidate_validation_subsystem.send_message(msg).await;
self.candidate_validation_subsystem.send_message(msg).await?;
},
AllMessages::CandidateBacking(msg) => {
let _ = self.candidate_backing_subsystem.send_message(msg).await;
self.candidate_backing_subsystem.send_message(msg).await?;
},
AllMessages::CandidateSelection(msg) => {
let _ = self.candidate_selection_subsystem.send_message(msg).await;
self.candidate_selection_subsystem.send_message(msg).await?;
},
AllMessages::StatementDistribution(msg) => {
let _ = self.statement_distribution_subsystem.send_message(msg).await;
self.statement_distribution_subsystem.send_message(msg).await?;
},
AllMessages::AvailabilityDistribution(msg) => {
let _ = self.availability_distribution_subsystem.send_message(msg).await;
self.availability_distribution_subsystem.send_message(msg).await?;
},
AllMessages::BitfieldDistribution(msg) => {
let _ = self.bitfield_distribution_subsystem.send_message(msg).await;
self.bitfield_distribution_subsystem.send_message(msg).await?;
},
AllMessages::BitfieldSigning(msg) => {
let _ = self.bitfield_signing_subsystem.send_message(msg).await;
self.bitfield_signing_subsystem.send_message(msg).await?;
},
AllMessages::Provisioner(msg) => {
let _ = self.provisioner_subsystem.send_message(msg).await;
self.provisioner_subsystem.send_message(msg).await?;
},
AllMessages::PoVDistribution(msg) => {
let _ = self.pov_distribution_subsystem.send_message(msg).await;
self.pov_distribution_subsystem.send_message(msg).await?;
},
AllMessages::RuntimeApi(msg) => {
let _ = self.runtime_api_subsystem.send_message(msg).await;
self.runtime_api_subsystem.send_message(msg).await?;
},
AllMessages::AvailabilityStore(msg) => {
let _ = self.availability_store_subsystem.send_message(msg).await;
self.availability_store_subsystem.send_message(msg).await?;
},
AllMessages::NetworkBridge(msg) => {
let _ = self.network_bridge_subsystem.send_message(msg).await;
self.network_bridge_subsystem.send_message(msg).await?;
},
AllMessages::ChainApi(msg) => {
let _ = self.chain_api_subsystem.send_message(msg).await;
self.chain_api_subsystem.send_message(msg).await?;
},
AllMessages::CollationGeneration(msg) => {
let _ = self.collation_generation_subsystem.send_message(msg).await;
self.collation_generation_subsystem.send_message(msg).await?;
},
AllMessages::CollatorProtocol(msg) => {
let _ = self.collator_protocol_subsystem.send_message(msg).await;
self.collator_protocol_subsystem.send_message(msg).await?;
},
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
......@@ -1577,6 +1599,7 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
let instance = Some(SubsystemInstance {
tx: to_tx,
name,
});
Ok(OverseenSubsystem {
......
......@@ -144,6 +144,9 @@ pub enum SubsystemError {
#[error("Failed to {0}")]
Context(String),
#[error("Subsystem stalled: {0}")]
SubsystemStalled(&'static str),
/// Per origin (or subsystem) annotations to wrap an error.
#[error("Error originated in {origin}")]
FromOrigin {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment