diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index 3206d20c8231ab958dee4d226a20e72a7afe2a95..9960e7f1d7d08b214f0e1f8fde4099d9f5cf2c5c 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -34,22 +34,6 @@ pub enum Error { #[error("Receive channel closed")] IncomingMessageChannel(#[source] SubsystemError), - /// Some request to utility functions failed. - #[error("Runtime request failed")] - UtilRequest(#[source] UtilError), - - /// Some request to the runtime failed. - #[error("Runtime request failed")] - RuntimeRequestCanceled(#[source] oneshot::Canceled), - - /// Some request to the runtime failed. - #[error("Runtime request failed")] - RuntimeRequest(#[source] RuntimeApiError), - - /// We tried fetching a session which was not available. - #[error("No such session")] - NoSuchSession(SessionIndex), - /// Spawning a running task failed. #[error("Spawning subsystem task failed")] SpawnTask(#[source] SubsystemError), @@ -57,7 +41,7 @@ pub enum Error { /// We tried accessing a session that was not cached. #[error("Session is not cached.")] NoSuchCachedSession, - + /// We tried reporting bad validators, although we are not a validator ourselves. #[error("Not a validator.")] NotAValidator, @@ -71,6 +55,24 @@ pub enum Error { SendResponse, } +/// Error that we should handle gracefully by logging it. +#[derive(Debug)] +pub enum NonFatalError { + /// Some request to utility functions failed. + /// This can be either `RuntimeRequestCanceled` or `RuntimeApiError`. + UtilRequest(UtilError), + + /// Runtime API subsystem is down, which means we're shutting down. + RuntimeRequestCanceled(oneshot::Canceled), + + /// Some request to the runtime failed. + /// For example if we prune a block we're requesting info about. + RuntimeRequest(RuntimeApiError), + + /// We tried fetching a session info which was not available. + NoSuchSession(SessionIndex), +} + pub type Result<T> = std::result::Result<T, Error>; impl From<SubsystemError> for Error { @@ -85,9 +87,9 @@ pub(crate) async fn recv_runtime<V>( oneshot::Receiver<std::result::Result<V, RuntimeApiError>>, UtilError, >, -) -> Result<V> { - r.map_err(Error::UtilRequest)? +) -> std::result::Result<V, NonFatalError> { + r.map_err(NonFatalError::UtilRequest)? .await - .map_err(Error::RuntimeRequestCanceled)? - .map_err(Error::RuntimeRequest) + .map_err(NonFatalError::RuntimeRequestCanceled)? + .map_err(NonFatalError::RuntimeRequest) } diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 03cc06ffc72100fdd490662e7a0d6e7ff00f45e1..07f3db51b5a23bb0a228eed3d9b9ac3f12224ae9 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -108,10 +108,13 @@ impl AvailabilityDistributionSubsystem { match message { FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { // Update the relay chain heads we are fetching our pieces for: - requester + if let Some(e) = requester .get_mut() .update_fetching_heads(&mut ctx, update) - .await?; + .await? + { + tracing::debug!(target: LOG_TARGET, "Error processing ActiveLeavesUpdate: {:?}", e); + } } FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {} FromOverseer::Signal(OverseerSignal::Conclude) => { diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index f613632ccbdee6557b8dcdb0ad10ea6b62be7c81..adbca626d1e4f5002b8fdff4ff9ef2f2b3332841 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -39,7 +39,8 @@ use polkadot_subsystem::{ messages::AllMessages, ActiveLeavesUpdate, jaeger, SubsystemContext, }; -use super::{error::recv_runtime, session_cache::SessionCache, Result, LOG_TARGET, Metrics}; +use super::{error::recv_runtime, session_cache::SessionCache, LOG_TARGET, Metrics}; +use crate::error::NonFatalError; /// A task fetching a particular chunk. mod fetch_task; @@ -96,7 +97,7 @@ impl Requester { &mut self, ctx: &mut Context, update: ActiveLeavesUpdate, - ) -> Result<()> + ) -> super::Result<Option<NonFatalError>> where Context: SubsystemContext, { @@ -106,10 +107,9 @@ impl Requester { } = update; // Order important! We need to handle activated, prior to deactivated, otherwise we might // cancel still needed jobs. - self.start_requesting_chunks(ctx, activated.into_iter()) - .await?; + let err = self.start_requesting_chunks(ctx, activated.into_iter()).await?; self.stop_requesting_chunks(deactivated.into_iter()); - Ok(()) + Ok(err) } /// Start requesting chunks for newly imported heads. @@ -117,15 +117,20 @@ impl Requester { &mut self, ctx: &mut Context, new_heads: impl Iterator<Item = (Hash, Arc<jaeger::Span>)>, - ) -> Result<()> + ) -> super::Result<Option<NonFatalError>> where Context: SubsystemContext, { for (leaf, _) in new_heads { - let cores = query_occupied_cores(ctx, leaf).await?; - self.add_cores(ctx, leaf, cores).await?; + let cores = match query_occupied_cores(ctx, leaf).await { + Err(err) => return Ok(Some(err)), + Ok(cores) => cores, + }; + if let Some(err) = self.add_cores(ctx, leaf, cores).await? { + return Ok(Some(err)); + } } - Ok(()) + Ok(None) } /// Stop requesting chunks for obsolete heads. @@ -150,7 +155,7 @@ impl Requester { ctx: &mut Context, leaf: Hash, cores: impl IntoIterator<Item = OccupiedCore>, - ) -> Result<()> + ) -> super::Result<Option<NonFatalError>> where Context: SubsystemContext, { @@ -165,7 +170,7 @@ impl Requester { let tx = self.tx.clone(); let metrics = self.metrics.clone(); - let task_cfg = self + let task_cfg = match self .session_cache .with_session_info( ctx, @@ -175,7 +180,11 @@ impl Requester { leaf, |info| FetchTaskConfig::new(leaf, &core, tx, metrics, info), ) - .await?; + .await + { + Err(err) => return Ok(Some(err)), + Ok(task_cfg) => task_cfg, + }; if let Some(task_cfg) = task_cfg { e.insert(FetchTask::start(task_cfg, ctx).await?); @@ -184,7 +193,7 @@ impl Requester { } } } - Ok(()) + Ok(None) } } @@ -219,7 +228,7 @@ impl Stream for Requester { async fn query_occupied_cores<Context>( ctx: &mut Context, relay_parent: Hash, -) -> Result<Vec<OccupiedCore>> +) -> Result<Vec<OccupiedCore>, NonFatalError> where Context: SubsystemContext, { diff --git a/polkadot/node/network/availability-distribution/src/session_cache.rs b/polkadot/node/network/availability-distribution/src/session_cache.rs index 0212717767ba5d94f952b498d6f661e718269f4d..a18bf8bcee298d2d20cb16ea6912ca86ef765c57 100644 --- a/polkadot/node/network/availability-distribution/src/session_cache.rs +++ b/polkadot/node/network/availability-distribution/src/session_cache.rs @@ -33,7 +33,7 @@ use polkadot_primitives::v1::{ use polkadot_subsystem::SubsystemContext; use super::{ - error::{recv_runtime, Result}, + error::{recv_runtime, NonFatalError}, Error, LOG_TARGET, }; @@ -122,7 +122,7 @@ impl SessionCache { ctx: &mut Context, parent: Hash, with_info: F, - ) -> Result<Option<R>> + ) -> Result<Option<R>, NonFatalError> where Context: SubsystemContext, F: FnOnce(&SessionInfo) -> R, @@ -173,7 +173,7 @@ impl SessionCache { if let Err(err) = self.report_bad(report) { tracing::warn!( target: LOG_TARGET, - err= ?err, + err = ?err, "Reporting bad validators failed with error" ); } @@ -184,7 +184,7 @@ impl SessionCache { /// We assume validators in a group are tried in reverse order, so the reported bad validators /// will be put at the beginning of the group. #[tracing::instrument(level = "trace", skip(self, report), fields(subsystem = LOG_TARGET))] - pub fn report_bad(&mut self, report: BadValidators) -> Result<()> { + pub fn report_bad(&mut self, report: BadValidators) -> super::Result<()> { let session = self .session_info_cache .get_mut(&report.session_index) @@ -219,7 +219,7 @@ impl SessionCache { ctx: &mut Context, parent: Hash, session_index: SessionIndex, - ) -> Result<Option<SessionInfo>> + ) -> Result<Option<SessionInfo>, NonFatalError> where Context: SubsystemContext, { @@ -230,7 +230,7 @@ impl SessionCache { .. } = recv_runtime(request_session_info_ctx(parent, session_index, ctx).await) .await? - .ok_or(Error::NoSuchSession(session_index))?; + .ok_or(NonFatalError::NoSuchSession(session_index))?; if let Some(our_index) = self.get_our_index(validators).await { // Get our group index: