Skip to content
Snippets Groups Projects
Commit a543b1d6 authored by ordian's avatar ordian Committed by GitHub
Browse files

availability distribution: don't early return on runtime errors (#2606)

* availability distribution: don't early return on runtime errors

* log error

* extract runtime api error from Error

* uh

* oh
parent bd2f5b27
Branches
No related merge requests found
......@@ -34,22 +34,6 @@ pub enum Error {
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
/// Some request to utility functions failed.
#[error("Runtime request failed")]
UtilRequest(#[source] UtilError),
/// Some request to the runtime failed.
#[error("Runtime request failed")]
RuntimeRequestCanceled(#[source] oneshot::Canceled),
/// Some request to the runtime failed.
#[error("Runtime request failed")]
RuntimeRequest(#[source] RuntimeApiError),
/// We tried fetching a session which was not available.
#[error("No such session")]
NoSuchSession(SessionIndex),
/// Spawning a running task failed.
#[error("Spawning subsystem task failed")]
SpawnTask(#[source] SubsystemError),
......@@ -57,7 +41,7 @@ pub enum Error {
/// We tried accessing a session that was not cached.
#[error("Session is not cached.")]
NoSuchCachedSession,
/// We tried reporting bad validators, although we are not a validator ourselves.
#[error("Not a validator.")]
NotAValidator,
......@@ -71,6 +55,24 @@ pub enum Error {
SendResponse,
}
/// Error that we should handle gracefully by logging it.
#[derive(Debug)]
pub enum NonFatalError {
/// Some request to utility functions failed.
/// This can be either `RuntimeRequestCanceled` or `RuntimeApiError`.
UtilRequest(UtilError),
/// Runtime API subsystem is down, which means we're shutting down.
RuntimeRequestCanceled(oneshot::Canceled),
/// Some request to the runtime failed.
/// For example if we prune a block we're requesting info about.
RuntimeRequest(RuntimeApiError),
/// We tried fetching a session info which was not available.
NoSuchSession(SessionIndex),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<SubsystemError> for Error {
......@@ -85,9 +87,9 @@ pub(crate) async fn recv_runtime<V>(
oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
UtilError,
>,
) -> Result<V> {
r.map_err(Error::UtilRequest)?
) -> std::result::Result<V, NonFatalError> {
r.map_err(NonFatalError::UtilRequest)?
.await
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
.map_err(NonFatalError::RuntimeRequestCanceled)?
.map_err(NonFatalError::RuntimeRequest)
}
......@@ -108,10 +108,13 @@ impl AvailabilityDistributionSubsystem {
match message {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
// Update the relay chain heads we are fetching our pieces for:
requester
if let Some(e) = requester
.get_mut()
.update_fetching_heads(&mut ctx, update)
.await?;
.await?
{
tracing::debug!(target: LOG_TARGET, "Error processing ActiveLeavesUpdate: {:?}", e);
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => {
......
......@@ -39,7 +39,8 @@ use polkadot_subsystem::{
messages::AllMessages, ActiveLeavesUpdate, jaeger, SubsystemContext,
};
use super::{error::recv_runtime, session_cache::SessionCache, Result, LOG_TARGET, Metrics};
use super::{error::recv_runtime, session_cache::SessionCache, LOG_TARGET, Metrics};
use crate::error::NonFatalError;
/// A task fetching a particular chunk.
mod fetch_task;
......@@ -96,7 +97,7 @@ impl Requester {
&mut self,
ctx: &mut Context,
update: ActiveLeavesUpdate,
) -> Result<()>
) -> super::Result<Option<NonFatalError>>
where
Context: SubsystemContext,
{
......@@ -106,10 +107,9 @@ impl Requester {
} = update;
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, activated.into_iter())
.await?;
let err = self.start_requesting_chunks(ctx, activated.into_iter()).await?;
self.stop_requesting_chunks(deactivated.into_iter());
Ok(())
Ok(err)
}
/// Start requesting chunks for newly imported heads.
......@@ -117,15 +117,20 @@ impl Requester {
&mut self,
ctx: &mut Context,
new_heads: impl Iterator<Item = (Hash, Arc<jaeger::Span>)>,
) -> Result<()>
) -> super::Result<Option<NonFatalError>>
where
Context: SubsystemContext,
{
for (leaf, _) in new_heads {
let cores = query_occupied_cores(ctx, leaf).await?;
self.add_cores(ctx, leaf, cores).await?;
let cores = match query_occupied_cores(ctx, leaf).await {
Err(err) => return Ok(Some(err)),
Ok(cores) => cores,
};
if let Some(err) = self.add_cores(ctx, leaf, cores).await? {
return Ok(Some(err));
}
}
Ok(())
Ok(None)
}
/// Stop requesting chunks for obsolete heads.
......@@ -150,7 +155,7 @@ impl Requester {
ctx: &mut Context,
leaf: Hash,
cores: impl IntoIterator<Item = OccupiedCore>,
) -> Result<()>
) -> super::Result<Option<NonFatalError>>
where
Context: SubsystemContext,
{
......@@ -165,7 +170,7 @@ impl Requester {
let tx = self.tx.clone();
let metrics = self.metrics.clone();
let task_cfg = self
let task_cfg = match self
.session_cache
.with_session_info(
ctx,
......@@ -175,7 +180,11 @@ impl Requester {
leaf,
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info),
)
.await?;
.await
{
Err(err) => return Ok(Some(err)),
Ok(task_cfg) => task_cfg,
};
if let Some(task_cfg) = task_cfg {
e.insert(FetchTask::start(task_cfg, ctx).await?);
......@@ -184,7 +193,7 @@ impl Requester {
}
}
}
Ok(())
Ok(None)
}
}
......@@ -219,7 +228,7 @@ impl Stream for Requester {
async fn query_occupied_cores<Context>(
ctx: &mut Context,
relay_parent: Hash,
) -> Result<Vec<OccupiedCore>>
) -> Result<Vec<OccupiedCore>, NonFatalError>
where
Context: SubsystemContext,
{
......
......@@ -33,7 +33,7 @@ use polkadot_primitives::v1::{
use polkadot_subsystem::SubsystemContext;
use super::{
error::{recv_runtime, Result},
error::{recv_runtime, NonFatalError},
Error,
LOG_TARGET,
};
......@@ -122,7 +122,7 @@ impl SessionCache {
ctx: &mut Context,
parent: Hash,
with_info: F,
) -> Result<Option<R>>
) -> Result<Option<R>, NonFatalError>
where
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R,
......@@ -173,7 +173,7 @@ impl SessionCache {
if let Err(err) = self.report_bad(report) {
tracing::warn!(
target: LOG_TARGET,
err= ?err,
err = ?err,
"Reporting bad validators failed with error"
);
}
......@@ -184,7 +184,7 @@ impl SessionCache {
/// We assume validators in a group are tried in reverse order, so the reported bad validators
/// will be put at the beginning of the group.
#[tracing::instrument(level = "trace", skip(self, report), fields(subsystem = LOG_TARGET))]
pub fn report_bad(&mut self, report: BadValidators) -> Result<()> {
pub fn report_bad(&mut self, report: BadValidators) -> super::Result<()> {
let session = self
.session_info_cache
.get_mut(&report.session_index)
......@@ -219,7 +219,7 @@ impl SessionCache {
ctx: &mut Context,
parent: Hash,
session_index: SessionIndex,
) -> Result<Option<SessionInfo>>
) -> Result<Option<SessionInfo>, NonFatalError>
where
Context: SubsystemContext,
{
......@@ -230,7 +230,7 @@ impl SessionCache {
..
} = recv_runtime(request_session_info_ctx(parent, session_index, ctx).await)
.await?
.ok_or(Error::NoSuchSession(session_index))?;
.ok_or(NonFatalError::NoSuchSession(session_index))?;
if let Some(our_index) = self.get_our_index(validators).await {
// Get our group index:
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment