diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index c80aaee005bfaf8f0e8c19294e78e1e8e405612c..536f131b6bbf470f3d77b01870732cd09b9404d4 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6568,6 +6568,7 @@ dependencies = [ "sp-keystore", "sp-staking", "sp-tracing", + "thiserror", "tracing", ] diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index c15669f406fdb5a8dbc92c108d39044fd350ee11..6b6f62ae988ef0023601a58301caf4752a2aa356 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -23,30 +23,67 @@ use thiserror::Error; use futures::channel::oneshot; -use polkadot_node_subsystem_util::{ - runtime, - Error as UtilError, -}; +use polkadot_node_subsystem_util::{Fault, Error as UtilError, runtime, unwrap_non_fatal}; use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError}; use crate::LOG_TARGET; -/// Errors of this subsystem. #[derive(Debug, Error)] -pub enum Error { - #[error("Response channel to obtain chunk failed")] - QueryChunkResponseChannel(#[source] oneshot::Canceled), +#[error(transparent)] +pub struct Error(pub Fault<NonFatal, Fatal>); - #[error("Response channel to obtain available data failed")] - QueryAvailableDataResponseChannel(#[source] oneshot::Canceled), +impl From<NonFatal> for Error { + fn from(e: NonFatal) -> Self { + Self(Fault::from_non_fatal(e)) + } +} - #[error("Receive channel closed")] - IncomingMessageChannel(#[source] SubsystemError), +impl From<Fatal> for Error { + fn from(f: Fatal) -> Self { + Self(Fault::from_fatal(f)) + } +} + +impl From<runtime::Error> for Error { + fn from(o: runtime::Error) -> Self { + Self(Fault::from_other(o)) + } +} +/// Fatal errors of this subsystem. +#[derive(Debug, Error)] +pub enum Fatal { /// Spawning a running task failed. #[error("Spawning subsystem task failed")] SpawnTask(#[source] SubsystemError), + /// Runtime API subsystem is down, which means we're shutting down. + #[error("Runtime request canceled")] + RuntimeRequestCanceled(oneshot::Canceled), + + /// Requester stream exhausted. + #[error("Erasure chunk requester stream exhausted")] + RequesterExhausted, + + #[error("Receive channel closed")] + IncomingMessageChannel(#[source] SubsystemError), + + /// Errors coming from runtime::Runtime. + #[error("Error while accessing runtime information")] + Runtime(#[from] #[source] runtime::Fatal), +} + +/// Non fatal errors of this subsystem. +#[derive(Debug, Error)] +pub enum NonFatal { + /// av-store will drop the sender on any error that happens. + #[error("Response channel to obtain chunk failed")] + QueryChunkResponseChannel(#[source] oneshot::Canceled), + + /// av-store will drop the sender on any error that happens. + #[error("Response channel to obtain available data failed")] + QueryAvailableDataResponseChannel(#[source] oneshot::Canceled), + /// We tried accessing a session that was not cached. #[error("Session is not cached.")] NoSuchCachedSession, @@ -55,11 +92,7 @@ pub enum Error { #[error("Not a validator.")] NotAValidator, - /// Requester stream exhausted. - #[error("Erasure chunk requester stream exhausted")] - RequesterExhausted, - - /// Sending response failed. + /// Sending request response failed (Can happen on timeouts for example). #[error("Sending a request's response failed.")] SendResponse, @@ -68,10 +101,6 @@ pub enum Error { #[error("Utility request failed")] UtilRequest(UtilError), - /// Runtime API subsystem is down, which means we're shutting down. - #[error("Runtime request canceled")] - RuntimeRequestCanceled(oneshot::Canceled), - /// Some request to the runtime failed. /// For example if we prune a block we're requesting info about. #[error("Runtime API error")] @@ -98,39 +127,30 @@ pub enum Error { /// Errors coming from runtime::Runtime. #[error("Error while accessing runtime information")] - Runtime(#[source] runtime::Error), + Runtime(#[from] #[source] runtime::NonFatal), } pub type Result<T> = std::result::Result<T, Error>; -impl From<runtime::Error> for Error { - fn from(err: runtime::Error) -> Self { - Self::Runtime(err) - } -} - -impl From<SubsystemError> for Error { - fn from(err: SubsystemError) -> Self { - Self::IncomingMessageChannel(err) - } -} - -/// Receive a response from a runtime request and convert errors. -pub(crate) async fn recv_runtime<V>( - r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>, -) -> std::result::Result<V, Error> { - r.await - .map_err(Error::RuntimeRequestCanceled)? - .map_err(Error::RuntimeRequest) -} - - /// Utility for eating top level errors and log them. /// /// We basically always want to try and continue on error. This utility function is meant to /// consume top-level errors by simply logging them -pub fn log_error(result: Result<()>, ctx: &'static str) { - if let Err(error) = result { +pub fn log_error(result: Result<()>, ctx: &'static str) + -> std::result::Result<(), Fatal> +{ + if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? { tracing::warn!(target: LOG_TARGET, error = ?error, ctx); } + Ok(()) +} + +/// Receive a response from a runtime request and convert errors. +pub(crate) async fn recv_runtime<V>( + r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>, +) -> Result<V> { + let result = r.await + .map_err(Fatal::RuntimeRequestCanceled)? + .map_err(NonFatal::RuntimeRequest)?; + Ok(result) } diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 7c522859a0799ba7bb6c3d62c194cbbe91a6fbad..f0c80eb2b2d9f4f6de48ccdc4831df763952a4a9 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -25,10 +25,10 @@ use polkadot_subsystem::{ /// Error and [`Result`] type for this subsystem. mod error; -pub use error::Error; +pub use error::{Fatal, NonFatal}; use error::{Result, log_error}; -use polkadot_node_subsystem_util::runtime::Runtime; +use polkadot_node_subsystem_util::runtime::RuntimeInfo; /// `Requester` taking care of requesting chunks for candidates pending availability. mod requester; @@ -59,7 +59,7 @@ pub struct AvailabilityDistributionSubsystem { /// Pointer to a keystore, which is required for determining this nodes validator index. keystore: SyncCryptoStorePtr, /// Easy and efficient runtime access for this subsystem. - runtime: Runtime, + runtime: RuntimeInfo, /// Prometheus metrics. metrics: Metrics, } @@ -85,12 +85,12 @@ impl AvailabilityDistributionSubsystem { /// Create a new instance of the availability distribution. pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self { - let runtime = Runtime::new(keystore.clone()); + let runtime = RuntimeInfo::new(keystore.clone()); Self { keystore, runtime, metrics } } /// Start processing work as passed on from the Overseer. - async fn run<Context>(mut self, mut ctx: Context) -> Result<()> + async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), Fatal> where Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send, { @@ -108,10 +108,10 @@ impl AvailabilityDistributionSubsystem { // Handle task messages sending: let message = match action { Either::Left(subsystem_msg) => { - subsystem_msg.map_err(|e| Error::IncomingMessageChannel(e))? + subsystem_msg.map_err(|e| Fatal::IncomingMessageChannel(e))? } Either::Right(from_task) => { - let from_task = from_task.ok_or(Error::RequesterExhausted)?; + let from_task = from_task.ok_or(Fatal::RequesterExhausted)?; ctx.send_message(from_task).await; continue; } @@ -133,7 +133,7 @@ impl AvailabilityDistributionSubsystem { log_error( requester.get_mut().update_fetching_heads(&mut ctx, update).await, "Error in Requester::update_fetching_heads" - ); + )?; } FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {} FromOverseer::Signal(OverseerSignal::Conclude) => { @@ -169,7 +169,7 @@ impl AvailabilityDistributionSubsystem { tx, ).await, "PoVRequester::fetch_pov" - ); + )?; } } } diff --git a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs index 7bb5f253979c3c799521c0d7484fa4d91ff0403a..e53fdd4b241fcef11c7bf6b42c43197c86f0f00a 100644 --- a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs @@ -33,9 +33,10 @@ use polkadot_subsystem::{ ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf, messages::{AllMessages, NetworkBridgeMessage, IfDisconnected} }; -use polkadot_node_subsystem_util::runtime::{Runtime, ValidatorInfo}; +use polkadot_node_subsystem_util::runtime::{RuntimeInfo, ValidatorInfo}; -use crate::error::{Error, log_error}; +use crate::error::{Fatal, NonFatal}; +use crate::LOG_TARGET; /// Number of sessions we want to keep in the LRU. const NUM_SESSIONS: usize = 2; @@ -63,7 +64,7 @@ impl PoVRequester { pub async fn update_connected_validators<Context>( &mut self, ctx: &mut Context, - runtime: &mut Runtime, + runtime: &mut RuntimeInfo, update: &ActiveLeavesUpdate, ) -> super::Result<()> where @@ -87,7 +88,7 @@ impl PoVRequester { pub async fn fetch_pov<Context>( &self, ctx: &mut Context, - runtime: &mut Runtime, + runtime: &mut RuntimeInfo, parent: Hash, from_validator: ValidatorIndex, candidate_hash: CandidateHash, @@ -99,7 +100,7 @@ impl PoVRequester { { let info = &runtime.get_session_info(ctx, parent).await?.session_info; let authority_id = info.discovery_keys.get(from_validator.0 as usize) - .ok_or(Error::InvalidValidatorIndex)? + .ok_or(NonFatal::InvalidValidatorIndex)? .clone(); let (req, pending_response) = OutgoingRequest::new( Recipient::Authority(authority_id), @@ -125,7 +126,8 @@ impl PoVRequester { .with_relay_parent(parent); ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed()) .await - .map_err(|e| Error::SpawnTask(e)) + .map_err(|e| Fatal::SpawnTask(e))?; + Ok(()) } } @@ -136,10 +138,13 @@ async fn fetch_pov_job( span: jaeger::Span, tx: oneshot::Sender<PoV>, ) { - log_error( - do_fetch_pov(pov_hash, pending_response, span, tx).await, - "fetch_pov_job", - ) + if let Err(err) = do_fetch_pov(pov_hash, pending_response, span, tx).await { + tracing::warn!( + target: LOG_TARGET, + ?err, + "fetch_pov_job" + ); + } } /// Do the actual work of waiting for the response. @@ -149,24 +154,24 @@ async fn do_fetch_pov( _span: jaeger::Span, tx: oneshot::Sender<PoV>, ) - -> super::Result<()> + -> std::result::Result<(), NonFatal> { - let response = pending_response.await.map_err(Error::FetchPoV)?; + let response = pending_response.await.map_err(NonFatal::FetchPoV)?; let pov = match response { PoVFetchingResponse::PoV(pov) => pov, PoVFetchingResponse::NoSuchPoV => { - return Err(Error::NoSuchPoV) + return Err(NonFatal::NoSuchPoV) } }; if pov.hash() == pov_hash { - tx.send(pov).map_err(|_| Error::SendResponse) + tx.send(pov).map_err(|_| NonFatal::SendResponse) } else { - Err(Error::UnexpectedPoV) + Err(NonFatal::UnexpectedPoV) } } /// Get the session indeces for the given relay chain parents. -async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut Runtime, new_heads: impl Iterator<Item = &Hash>) +async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut RuntimeInfo, new_heads: impl Iterator<Item = &Hash>) -> super::Result<impl Iterator<Item = (Hash, SessionIndex)>> where Context: SubsystemContext, @@ -181,7 +186,7 @@ where /// Connect to validators of our validator group. async fn connect_to_relevant_validators<Context>( ctx: &mut Context, - runtime: &mut Runtime, + runtime: &mut RuntimeInfo, parent: Hash, session: SessionIndex ) @@ -206,7 +211,7 @@ where /// Return: `None` if not a validator. async fn determine_relevant_validators<Context>( ctx: &mut Context, - runtime: &mut Runtime, + runtime: &mut RuntimeInfo, parent: Hash, session: SessionIndex, ) @@ -275,7 +280,7 @@ mod tests { let (mut context, mut virtual_overseer) = test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone()); let keystore = make_ferdie_keystore(); - let mut runtime = polkadot_node_subsystem_util::runtime::Runtime::new(keystore); + let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(keystore); let (tx, rx) = oneshot::channel(); let testee = async { diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index c5527eedc4b6d3aebb49d0a5df49fadd489276f2..8fe099770fbe15dacc6ca15b07234f59450c834f 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -34,7 +34,7 @@ use polkadot_subsystem::messages::{ use polkadot_subsystem::{SubsystemContext, jaeger}; use crate::{ - error::{Error, Result}, + error::{Fatal, Result}, session_cache::{BadValidators, SessionInfo}, LOG_TARGET, metrics::{Metrics, SUCCEEDED, FAILED}, @@ -191,7 +191,7 @@ impl FetchTask { ctx.spawn("chunk-fetcher", running.run(kill).boxed()) .await - .map_err(|e| Error::SpawnTask(e))?; + .map_err(|e| Fatal::SpawnTask(e))?; Ok(FetchTask { live_in, diff --git a/polkadot/node/network/availability-distribution/src/responder.rs b/polkadot/node/network/availability-distribution/src/responder.rs index da3b87a7c378d191aa6330225d35185ec4ea65ff..a811607d0f329872bfee321a89af60cde9f38108 100644 --- a/polkadot/node/network/availability-distribution/src/responder.rs +++ b/polkadot/node/network/availability-distribution/src/responder.rs @@ -28,7 +28,7 @@ use polkadot_subsystem::{ SubsystemContext, jaeger, }; -use crate::error::{Error, Result}; +use crate::error::{NonFatal, Result}; use crate::{LOG_TARGET, metrics::{Metrics, SUCCEEDED, FAILED, NOT_FOUND}}; /// Variant of `answer_pov_request` that does Prometheus metric and logging on errors. @@ -107,7 +107,7 @@ where } }; - req.send_response(response).map_err(|_| Error::SendResponse)?; + req.send_response(response).map_err(|_| NonFatal::SendResponse)?; Ok(result) } @@ -144,7 +144,7 @@ where Some(chunk) => v1::ChunkFetchingResponse::Chunk(chunk.into()), }; - req.send_response(response).map_err(|_| Error::SendResponse)?; + req.send_response(response).map_err(|_| NonFatal::SendResponse)?; Ok(result) } @@ -164,7 +164,7 @@ where )) .await; - rx.await.map_err(|e| { + let result = rx.await.map_err(|e| { tracing::trace!( target: LOG_TARGET, ?validator_index, @@ -172,8 +172,9 @@ where error = ?e, "Error retrieving chunk", ); - Error::QueryChunkResponseChannel(e) - }) + NonFatal::QueryChunkResponseChannel(e) + })?; + Ok(result) } /// Query PoV from the availability store. @@ -191,5 +192,6 @@ where )) .await; - rx.await.map_err(|e| Error::QueryAvailableDataResponseChannel(e)) + let result = rx.await.map_err(|e| NonFatal::QueryAvailableDataResponseChannel(e))?; + Ok(result) } diff --git a/polkadot/node/network/availability-distribution/src/session_cache.rs b/polkadot/node/network/availability-distribution/src/session_cache.rs index 0b2b519bb64f7e87cc7ec0c0ec8498549bf51796..bbd40e2a5db2b0f0d92e0281804a9e7ab7941e28 100644 --- a/polkadot/node/network/availability-distribution/src/session_cache.rs +++ b/polkadot/node/network/availability-distribution/src/session_cache.rs @@ -33,7 +33,7 @@ use polkadot_primitives::v1::{ use polkadot_subsystem::SubsystemContext; use super::{ - error::{recv_runtime, Error}, + error::{recv_runtime, Error, NonFatal}, LOG_TARGET, }; @@ -189,9 +189,9 @@ impl SessionCache { let session = self .session_info_cache .get_mut(&report.session_index) - .ok_or(Error::NoSuchCachedSession)? + .ok_or(NonFatal::NoSuchCachedSession)? .as_mut() - .ok_or(Error::NotAValidator)?; + .ok_or(NonFatal::NotAValidator)?; let group = session .validator_groups .get_mut(report.group_index.0 as usize) @@ -231,7 +231,7 @@ impl SessionCache { .. } = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await) .await? - .ok_or(Error::NoSuchSession(session_index))?; + .ok_or(NonFatal::NoSuchSession(session_index))?; if let Some(our_index) = self.get_our_index(validators).await { // Get our group index: diff --git a/polkadot/node/network/statement-distribution/Cargo.toml b/polkadot/node/network/statement-distribution/Cargo.toml index b4648e1ac2df002eba6f30e2fbb0882fbe36cdd3..a958c6df62ded8182dce8424a61311f3a7a639df 100644 --- a/polkadot/node/network/statement-distribution/Cargo.toml +++ b/polkadot/node/network/statement-distribution/Cargo.toml @@ -10,6 +10,7 @@ futures = "0.3.12" tracing = "0.1.25" polkadot-primitives = { path = "../../../primitives" } sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } +sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-node-primitives = { path = "../../primitives" } @@ -18,6 +19,7 @@ polkadot-node-network-protocol = { path = "../../network/protocol" } arrayvec = "0.5.2" indexmap = "1.6.1" parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] } +thiserror = "1.0.23" [dev-dependencies] polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/network/statement-distribution/src/error.rs b/polkadot/node/network/statement-distribution/src/error.rs new file mode 100644 index 0000000000000000000000000000000000000000..82e918dfd18f16fdf80c4d670d9be92770371584 --- /dev/null +++ b/polkadot/node/network/statement-distribution/src/error.rs @@ -0,0 +1,122 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. +// + +//! Error handling related code and Error/Result definitions. + +use polkadot_node_network_protocol::PeerId; +use polkadot_primitives::v1::{CandidateHash, Hash}; +use polkadot_subsystem::SubsystemError; +use thiserror::Error; + +use polkadot_node_subsystem_util::{Fault, runtime, unwrap_non_fatal}; + +use crate::LOG_TARGET; + +/// General result. +pub type Result<T> = std::result::Result<T, Error>; +/// Result for non fatal only failures. +pub type NonFatalResult<T> = std::result::Result<T, NonFatal>; +/// Result for fatal only failures. +pub type FatalResult<T> = std::result::Result<T, Fatal>; + +/// Errors for statement distribution. +#[derive(Debug, Error)] +#[error(transparent)] +pub struct Error(pub Fault<NonFatal, Fatal>); + +impl From<NonFatal> for Error { + fn from(e: NonFatal) -> Self { + Self(Fault::from_non_fatal(e)) + } +} + +impl From<Fatal> for Error { + fn from(f: Fatal) -> Self { + Self(Fault::from_fatal(f)) + } +} + +impl From<runtime::Error> for Error { + fn from(o: runtime::Error) -> Self { + Self(Fault::from_other(o)) + } +} + +/// Fatal runtime errors. +#[derive(Debug, Error)] +pub enum Fatal { + /// Requester channel is never closed. + #[error("Requester receiver stream finished.")] + RequesterReceiverFinished, + + /// Responder channel is never closed. + #[error("Responder receiver stream finished.")] + ResponderReceiverFinished, + + /// Spawning a running task failed. + #[error("Spawning subsystem task failed")] + SpawnTask(#[source] SubsystemError), + + /// Receiving subsystem message from overseer failed. + #[error("Receiving message from overseer failed")] + SubsystemReceive(#[source] SubsystemError), + + /// Errors coming from runtime::Runtime. + #[error("Error while accessing runtime information")] + Runtime(#[from] #[source] runtime::Fatal), +} + +/// Errors for fetching of runtime information. +#[derive(Debug, Error)] +pub enum NonFatal { + /// Errors coming from runtime::Runtime. + #[error("Error while accessing runtime information")] + Runtime(#[from] #[source] runtime::NonFatal), + + /// Relay parent was not present in active heads. + #[error("Relay parent could not be found in active heads")] + NoSuchHead(Hash), + + /// Peer requested statement data for candidate that was never announced to it. + #[error("Peer requested data for candidate it never received a notification for")] + RequestedUnannouncedCandidate(PeerId, CandidateHash), + + /// A large statement status was requested, which could not be found. + #[error("Statement status does not exist")] + NoSuchLargeStatementStatus(Hash, CandidateHash), + + /// A fetched large statement was requested, but could not be found. + #[error("Fetched large statement does not exist")] + NoSuchFetchedLargeStatement(Hash, CandidateHash), + + /// Responder no longer waits for our data. (Should not happen right now.) + #[error("Oneshot `GetData` channel closed")] + ResponderGetDataCanceled, +} + +/// Utility for eating top level errors and log them. +/// +/// We basically always want to try and continue on error. This utility function is meant to +/// consume top-level errors by simply logging them. +pub fn log_error(result: Result<()>, ctx: &'static str) + -> FatalResult<()> +{ + if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? { + tracing::debug!(target: LOG_TARGET, error = ?error, ctx) + } + Ok(()) +} diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 97d527ebbb763be66ee2894c384ad2edba6cb8bd..b7ddb74dff37c0ad01937a43916252e1a9592a06 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -22,14 +22,15 @@ #![deny(unused_crate_dependencies)] #![warn(missing_docs)] +use error::{FatalResult, NonFatalResult, log_error}; use parity_scale_codec::Encode; use polkadot_subsystem::{ ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, SpawnedSubsystem, Subsystem, - SubsystemContext, SubsystemError, SubsystemResult, jaeger, + SubsystemContext, SubsystemError, jaeger, messages::{ AllMessages, NetworkBridgeMessage, StatementDistributionMessage, - CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent, + CandidateBackingMessage, NetworkBridgeEvent, }, }; use polkadot_node_subsystem_util::{ @@ -39,7 +40,7 @@ use polkadot_node_subsystem_util::{ use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_primitives::v1::{ CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash, - SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature + SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, AuthorityDiscoveryId, }; use polkadot_node_network_protocol::{ IfDisconnected, PeerId, UnifiedReputationChange as Rep, View, @@ -54,9 +55,14 @@ use polkadot_node_network_protocol::{ use futures::{channel::mpsc, future::RemoteHandle, prelude::*}; use futures::channel::oneshot; use indexmap::{IndexSet, IndexMap, map::Entry as IEntry}; +use sp_keystore::SyncCryptoStorePtr; +use util::{Fault, runtime::RuntimeInfo}; use std::collections::{HashMap, HashSet, hash_map::Entry}; +mod error; +pub use error::{Error, NonFatal, Fatal, Result}; + /// Background task logic for requesting of large statements. mod requester; use requester::{RequesterMessage, fetch}; @@ -89,6 +95,8 @@ const LOG_TARGET: &str = "parachain::statement-distribution"; /// The statement distribution subsystem. pub struct StatementDistribution { + /// Pointer to a keystore, which is required for determining this nodes validator index. + keystore: SyncCryptoStorePtr, // Prometheus metrics metrics: Metrics, } @@ -101,15 +109,19 @@ impl<C> Subsystem<C> for StatementDistribution // within `run`. SpawnedSubsystem { name: "statement-distribution-subsystem", - future: self.run(ctx).boxed(), + future: self + .run(ctx) + .map_err(|e| SubsystemError::with_origin("statement-distribution", e)) + .boxed(), } } } impl StatementDistribution { /// Create a new Statement Distribution Subsystem - pub fn new(metrics: Metrics) -> StatementDistribution { + pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> StatementDistribution { StatementDistribution { + keystore, metrics, } } @@ -262,7 +274,7 @@ impl PeerRelayParentKnowledge { &mut self, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, - ) -> Result<bool, Rep> { + ) -> std::result::Result<bool, Rep> { // We don't check `sent_statements` because a statement could be in-flight from both // sides at the same time. if self.received_statements.contains(fingerprint) { @@ -313,7 +325,7 @@ impl PeerRelayParentKnowledge { &self, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, - ) -> Result<(), Rep> { + ) -> std::result::Result<(), Rep> { // We don't check `sent_statements` because a statement could be in-flight from both // sides at the same time. if self.received_statements.contains(fingerprint) { @@ -361,6 +373,8 @@ impl PeerRelayParentKnowledge { struct PeerData { view: View, view_knowledge: HashMap<Hash, PeerRelayParentKnowledge>, + // Peer might be an authority. + maybe_authority: Option<AuthorityDiscoveryId>, } impl PeerData { @@ -423,7 +437,7 @@ impl PeerData { relay_parent: &Hash, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, - ) -> Result<bool, Rep> { + ) -> std::result::Result<bool, Rep> { self.view_knowledge .get_mut(relay_parent) .ok_or(COST_UNEXPECTED_STATEMENT)? @@ -438,7 +452,7 @@ impl PeerData { relay_parent: &Hash, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, - ) -> Result<(), Rep> { + ) -> std::result::Result<(), Rep> { self.view_knowledge .get(relay_parent) .ok_or(COST_UNEXPECTED_STATEMENT)? @@ -534,7 +548,7 @@ struct FetchingInfo { /// Messages to be handled in this subsystem. enum Message { /// Messages from other subsystems. - Subsystem(SubsystemResult<FromOverseer<StatementDistributionMessage>>), + Subsystem(FatalResult<FromOverseer<StatementDistributionMessage>>), /// Messages from spawned requester background tasks. Requester(Option<RequesterMessage>), /// Messages from spawned responder background task. @@ -554,7 +568,7 @@ impl Message { let from_responder = from_responder.next().fuse(); futures::pin_mut!(from_overseer, from_requester, from_responder); futures::select!( - msg = from_overseer => Message::Subsystem(msg), + msg = from_overseer => Message::Subsystem(msg.map_err(Fatal::SubsystemReceive)), msg = from_requester => Message::Requester(msg), msg = from_responder => Message::Responder(msg), ) @@ -706,7 +720,9 @@ impl ActiveHeadData { /// Returns an error if the statement is already known or not useful /// without modifying the internal state. - fn check_useful_or_unknown(&self, statement: SignedFullStatement) -> Result<(), DeniedStatement> { + fn check_useful_or_unknown(&self, statement: SignedFullStatement) + -> std::result::Result<(), DeniedStatement> + { let validator_index = statement.validator_index(); let compact = statement.payload().to_compact(); let comparator = StoredStatementComparator { @@ -786,7 +802,7 @@ fn check_statement_signature( head: &ActiveHeadData, relay_parent: Hash, statement: &SignedFullStatement, -) -> Result<(), ()> { +) -> std::result::Result<(), ()> { let signing_context = SigningContext { session_index: head.session_index, parent_hash: relay_parent, @@ -808,6 +824,7 @@ async fn circulate_statement_and_dependents( ctx: &mut impl SubsystemContext, relay_parent: Hash, statement: SignedFullStatement, + priority_peers: Vec<PeerId>, metrics: &Metrics, ) { let active_head = match active_heads.get_mut(&relay_parent) { @@ -827,7 +844,7 @@ async fn circulate_statement_and_dependents( { Some(( *stored.compact().candidate_hash(), - circulate_statement(peers, ctx, relay_parent, stored).await, + circulate_statement(peers, ctx, relay_parent, stored, priority_peers).await, )) }, _ => None, @@ -907,17 +924,45 @@ async fn circulate_statement( ctx: &mut impl SubsystemContext, relay_parent: Hash, stored: &StoredStatement, + mut priority_peers: Vec<PeerId>, ) -> Vec<PeerId> { let fingerprint = stored.fingerprint(); - let peers_to_send: Vec<PeerId> = peers.iter().filter_map(|(peer, data)| { + let mut peers_to_send: Vec<PeerId> = peers.iter().filter_map(|(peer, data)| { if data.can_send(&relay_parent, &fingerprint) { Some(peer.clone()) } else { None } }).collect(); - let peers_to_send = util::choose_random_sqrt_subset(peers_to_send, MIN_GOSSIP_PEERS); + + let good_peers: HashSet<&PeerId> = peers_to_send.iter().collect(); + // Only take priority peers we can send data to: + priority_peers.retain(|p| good_peers.contains(p)); + + // Avoid duplicates: + let priority_set: HashSet<&PeerId> = priority_peers.iter().collect(); + peers_to_send.retain(|p| !priority_set.contains(p)); + + let mut peers_to_send = + util::choose_random_sqrt_subset(peers_to_send, MIN_GOSSIP_PEERS); + // We don't want to use less peers, than we would without any priority peers: + let min_size = std::cmp::max(peers_to_send.len(), MIN_GOSSIP_PEERS); + // Make set full: + let needed_peers = min_size as i64 - priority_peers.len() as i64; + if needed_peers > 0 { + peers_to_send.truncate(needed_peers as usize); + // Order important here - priority peers are placed first, so will be sent first. + // This gives backers a chance to be among the first in requesting any large statement + // data. + priority_peers.append(&mut peers_to_send); + } + peers_to_send = priority_peers; + // We must not have duplicates: + debug_assert!( + peers_to_send.len() == peers_to_send.clone().into_iter().collect::<HashSet<_>>().len(), + "We filter out duplicates above. qed.", + ); let peers_to_send: Vec<(PeerId, bool)> = peers_to_send.into_iter() .map(|peer_id| { let new = peers.get_mut(&peer_id) @@ -1246,6 +1291,7 @@ async fn handle_incoming_message_and_circulate<'a>( ctx, relay_parent, statement, + Vec::new(), ).await; } } @@ -1425,6 +1471,7 @@ async fn update_peer_view_and_send_unlocked( async fn handle_network_update( peers: &mut HashMap<PeerId, PeerData>, + authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>, active_heads: &mut HashMap<Hash, ActiveHeadData>, ctx: &mut impl SubsystemContext, req_sender: &mpsc::Sender<RequesterMessage>, @@ -1432,7 +1479,7 @@ async fn handle_network_update( metrics: &Metrics, ) { match update { - NetworkBridgeEvent::PeerConnected(peer, role, _) => { + NetworkBridgeEvent::PeerConnected(peer, role, maybe_authority) => { tracing::trace!( target: LOG_TARGET, ?peer, @@ -1442,7 +1489,11 @@ async fn handle_network_update( peers.insert(peer, PeerData { view: Default::default(), view_knowledge: Default::default(), + maybe_authority: maybe_authority.clone(), }); + if let Some(authority) = maybe_authority { + authorities.insert(authority, peer); + } } NetworkBridgeEvent::PeerDisconnected(peer) => { tracing::trace!( @@ -1450,7 +1501,9 @@ async fn handle_network_update( ?peer, "Peer disconnected", ); - peers.remove(&peer); + if let Some(auth_id) = peers.remove(&peer).and_then(|p| p.maybe_authority) { + authorities.remove(&auth_id); + } } NetworkBridgeEvent::PeerMessage(peer, message) => { handle_incoming_message_and_circulate( @@ -1495,9 +1548,13 @@ impl StatementDistribution { async fn run( self, mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>, - ) -> SubsystemResult<()> { + ) -> std::result::Result<(), Fatal> { let mut peers: HashMap<PeerId, PeerData> = HashMap::new(); + let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new(); let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new(); + + let mut runtime = RuntimeInfo::new(self.keystore.clone()); + // Sender/Receiver for getting news from our statement fetching tasks. let (req_sender, mut req_receiver) = mpsc::channel(1); // Sender/Receiver for getting news from our responder task. @@ -1505,43 +1562,48 @@ impl StatementDistribution { loop { let message = Message::receive(&mut ctx, &mut req_receiver, &mut res_receiver).await; - let finished = match message { - Message::Subsystem(result) => - self.handle_subsystem_message( + match message { + Message::Subsystem(result) => { + let result = self.handle_subsystem_message( &mut ctx, + &mut runtime, &mut peers, + &mut authorities, &mut active_heads, &req_sender, &res_sender, result?, ) - .await?, - Message::Requester(result) => - self.handle_requester_message( + .await; + match result { + Ok(true) => break, + Ok(false) => {} + Err(Error(Fault::Fatal(f))) => return Err(f), + Err(Error(Fault::Err(error))) => + tracing::debug!(target: LOG_TARGET, ?error) + } + } + Message::Requester(result) => { + let result = self.handle_requester_message( &mut ctx, &mut peers, &mut active_heads, &req_sender, - result.ok_or(SubsystemError::Context( - "Failed to read from requester receiver (stream finished)" - .to_string() - ))? + result.ok_or(Fatal::RequesterReceiverFinished)? ) - .await?, - Message::Responder(result) => - self.handle_responder_message( + .await; + log_error(result.map_err(From::from), "handle_requester_message")?; + } + Message::Responder(result) => { + let result = self.handle_responder_message( &peers, &mut active_heads, - result.ok_or(SubsystemError::Context( - "Failed to read from responder receiver (stream finished)" - .to_string() - ))? + result.ok_or(Fatal::ResponderReceiverFinished)? ) - .await?, + .await; + log_error(result.map_err(From::from), "handle_responder_message")?; + } }; - if finished { - break - } } Ok(()) } @@ -1552,7 +1614,7 @@ impl StatementDistribution { peers: &HashMap<PeerId, PeerData>, active_heads: &mut HashMap<Hash, ActiveHeadData>, message: ResponderMessage, - ) -> SubsystemResult<bool> { + ) -> NonFatalResult<()> { match message { ResponderMessage::GetData { requesting_peer, @@ -1566,39 +1628,28 @@ impl StatementDistribution { &relay_parent, &candidate_hash ) { - tracing::warn!( - target: LOG_TARGET, - "Peer requested candidate, although we never announced it to that peer." - ); - return Ok(false) + return Err( + NonFatal::RequestedUnannouncedCandidate(requesting_peer, candidate_hash) + ) } - let active_head = match active_heads.get(&relay_parent) { - Some(head) => head, - None => return Ok(false), - }; + let active_head = active_heads + .get(&relay_parent) + .ok_or(NonFatal::NoSuchHead(relay_parent))?; + let committed = match active_head.waiting_large_statements.get(&candidate_hash) { Some(LargeStatementStatus::Fetched(committed)) => committed.clone(), _ => { - tracing::debug!( - target: LOG_TARGET, - ?candidate_hash, - "Requested data not found - this should not happen under normal circumstances." - ); - return Ok(false) + return Err( + NonFatal::NoSuchFetchedLargeStatement(relay_parent, candidate_hash) + ) } }; - if let Err(_) = tx.send(committed) { - tracing::debug!( - target: LOG_TARGET, - "Sending data to responder failed" - ); - return Ok(false) - } + tx.send(committed).map_err(|_| NonFatal::ResponderGetDataCanceled)?; } } - Ok(false) + Ok(()) } async fn handle_requester_message( @@ -1608,7 +1659,7 @@ impl StatementDistribution { active_heads: &mut HashMap<Hash, ActiveHeadData>, req_sender: &mpsc::Sender<RequesterMessage>, message: RequesterMessage, - ) -> SubsystemResult<bool> { + ) -> NonFatalResult<()> { match message { RequesterMessage::Finished { relay_parent, @@ -1622,10 +1673,9 @@ impl StatementDistribution { } report_peer(ctx, from_peer, BENEFIT_VALID_RESPONSE).await; - let active_head = match active_heads.get_mut(&relay_parent) { - Some(head) => head, - None => return Ok(false), - }; + let active_head = active_heads + .get_mut(&relay_parent) + .ok_or(NonFatal::NoSuchHead(relay_parent))?; let status = active_head .waiting_large_statements @@ -1634,15 +1684,12 @@ impl StatementDistribution { let info = match status { Some(LargeStatementStatus::Fetching(info)) => info, Some(LargeStatementStatus::Fetched(_)) => { - debug_assert!(false, "On status fetched, fetching task already succeeded. qed."); - return Ok(false) + panic!("On status fetched, fetching task already succeeded. qed."); } None => { - tracing::warn!( - target: LOG_TARGET, - "Received finished task event for non existent status - not supposed to happen." - ); - return Ok(false) + return Err( + NonFatal::NoSuchLargeStatementStatus(relay_parent, candidate_hash) + ) } }; @@ -1682,10 +1729,9 @@ impl StatementDistribution { candidate_hash, tx, } => { - let active_head = match active_heads.get_mut(&relay_parent) { - Some(head) => head, - None => return Ok(false), - }; + let active_head = active_heads + .get_mut(&relay_parent) + .ok_or(NonFatal::NoSuchHead(relay_parent))?; let status = active_head .waiting_large_statements @@ -1693,16 +1739,12 @@ impl StatementDistribution { let info = match status { Some(LargeStatementStatus::Fetching(info)) => info, - Some(LargeStatementStatus::Fetched(_)) => { - debug_assert!(false, "On status fetched, fetching task already succeeded. qed."); - return Ok(false) - } + Some(LargeStatementStatus::Fetched(_)) => + panic!("On status fetched, fetching task already succeeded. qed."), None => { - tracing::warn!( - target: LOG_TARGET, - "Received 'get more peers' event for non existent status - not supposed to happen." - ); - return Ok(false) + return Err( + NonFatal::NoSuchLargeStatementStatus(relay_parent, candidate_hash) + ) } }; @@ -1719,18 +1761,21 @@ impl StatementDistribution { RequesterMessage::ReportPeer(peer, rep) => report_peer(ctx, peer, rep).await, } - Ok(false) + Ok(()) } + async fn handle_subsystem_message( &self, ctx: &mut impl SubsystemContext, + runtime: &mut RuntimeInfo, peers: &mut HashMap<PeerId, PeerData>, + authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>, active_heads: &mut HashMap<Hash, ActiveHeadData>, req_sender: &mpsc::Sender<RequesterMessage>, res_sender: &mpsc::Sender<ResponderMessage>, message: FromOverseer<StatementDistributionMessage>, - ) -> SubsystemResult<bool> { + ) -> Result<bool> { let metrics = &self.metrics; match message { @@ -1746,45 +1791,12 @@ impl StatementDistribution { "New active leaf", ); - let (validators, session_index) = { - let (val_tx, val_rx) = oneshot::channel(); - let (session_tx, session_rx) = oneshot::channel(); - - let val_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(val_tx), - ), - ); - let session_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(session_tx), - ), - ); - ctx.send_messages( - std::iter::once(val_message).chain(std::iter::once(session_message)) - ).await; - - match (val_rx.await?, session_rx.await?) { - (Ok(v), Ok(s)) => (v, s), - (Err(e), _) | (_, Err(e)) => { - tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Failed to fetch runtime API data for active leaf", - ); - - // Lacking this bookkeeping might make us behave funny, although - // not in any slashable way. But we shouldn't take down the node - // on what are likely spurious runtime API errors. - return Ok(false) - } - } - }; + let session_index = runtime.get_session_index(ctx, relay_parent).await?; + let info = runtime.get_session_info_by_index(ctx, relay_parent, session_index).await?; + let session_info = &info.session_info; active_heads.entry(relay_parent) - .or_insert(ActiveHeadData::new(validators, session_index, span)); + .or_insert(ActiveHeadData::new(session_info.validators.clone(), session_index, span)); active_heads.retain(|h, _| { let live = !deactivated.contains(h); @@ -1810,19 +1822,11 @@ impl StatementDistribution { // Make sure we have data in cache: if is_statement_large(&statement) { if let Statement::Seconded(committed) = &statement.payload() { - let active_head = match active_heads.get_mut(&relay_parent) { - Some(h) => h, - None => { - // This should never be out-of-sync with our view if the view updates - // correspond to actual `StartWork` messages. So we just log and ignore. - tracing::warn!( - target: LOG_TARGET, - %relay_parent, - "our view out-of-sync with active heads; head not found", - ); - return Ok(false) - } - }; + let active_head = active_heads + .get_mut(&relay_parent) + // This should never be out-of-sync with our view if the view + // updates correspond to actual `StartWork` messages. + .ok_or(NonFatal::NoSuchHead(relay_parent))?; active_head.waiting_large_statements.insert( statement.payload().candidate_hash(), LargeStatementStatus::Fetched(committed.clone()) @@ -1830,12 +1834,36 @@ impl StatementDistribution { } } + let info = runtime.get_session_info(ctx, relay_parent).await?; + let session_info = &info.session_info; + let validator_info = &info.validator_info; + + // Get peers in our group, so we can make sure they get our statement + // directly: + let group_peers = { + if let Some(our_group) = validator_info.our_group { + let our_group = &session_info.validator_groups[our_group.0 as usize]; + + our_group.into_iter() + .filter_map(|i| { + if Some(*i) == validator_info.our_index { + return None + } + let authority_id = &session_info.discovery_keys[i.0 as usize]; + authorities.get(authority_id).map(|p| *p) + }) + .collect() + } else { + Vec::new() + } + }; circulate_statement_and_dependents( peers, active_heads, ctx, relay_parent, statement, + group_peers, metrics, ).await; } @@ -1844,6 +1872,7 @@ impl StatementDistribution { handle_network_update( peers, + authorities, active_heads, ctx, req_sender, @@ -1855,7 +1884,9 @@ impl StatementDistribution { ctx.spawn( "large-statement-responder", respond(receiver, res_sender.clone()).boxed() - ).await?; + ) + .await + .map_err(Fatal::SpawnTask)?; } } } @@ -2009,16 +2040,16 @@ mod tests { use super::*; use std::sync::Arc; use sp_keyring::Sr25519Keyring; - use sp_application_crypto::AppKey; + use sp_application_crypto::{AppKey, sr25519::Pair, Pair as TraitPair}; use polkadot_node_primitives::Statement; - use polkadot_primitives::v1::{CommittedCandidateReceipt, ValidationCode}; + use polkadot_primitives::v1::{CommittedCandidateReceipt, ValidationCode, SessionInfo}; use assert_matches::assert_matches; use futures::executor::{self, block_on}; use futures_timer::Delay; use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore}; use sc_keystore::LocalKeystore; use polkadot_node_network_protocol::{view, ObservedRole, request_response::Recipient}; - use polkadot_subsystem::{jaeger, ActivatedLeaf}; + use polkadot_subsystem::{jaeger, ActivatedLeaf, messages::{RuntimeApiMessage, RuntimeApiRequest}}; use polkadot_node_network_protocol::request_response::{ Requests, v1::{ @@ -2418,6 +2449,7 @@ mod tests { k }, + maybe_authority: None, }; let pool = sp_core::testing::TaskExecutor::new(); @@ -2505,6 +2537,7 @@ mod tests { let peer_data_from_view = |view: View| PeerData { view: view.clone(), view_knowledge: view.iter().map(|v| (v.clone(), Default::default())).collect(), + maybe_authority: None, }; let mut peer_data: HashMap<_, _> = vec![ @@ -2554,6 +2587,7 @@ mod tests { &mut ctx, hash_b, &statement, + Vec::new(), ).await; { @@ -2611,18 +2645,20 @@ mod tests { let peer_b = PeerId::random(); let validators = vec![ - Sr25519Keyring::Alice.public().into(), - Sr25519Keyring::Bob.public().into(), - Sr25519Keyring::Charlie.public().into(), + Sr25519Keyring::Alice.pair(), + Sr25519Keyring::Bob.pair(), + Sr25519Keyring::Charlie.pair(), ]; + let session_info = make_session_info(validators, vec![]); + let session_index = 1; let pool = sp_core::testing::TaskExecutor::new(); let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let bg = async move { - let s = StatementDistribution { metrics: Default::default() }; + let s = StatementDistribution { metrics: Default::default(), keystore: Arc::new(LocalKeystore::in_memory()) }; s.run(ctx).await.unwrap(); }; @@ -2640,22 +2676,22 @@ mod tests { assert_matches!( handle.recv().await, AllMessages::RuntimeApi( - RuntimeApiMessage::Request(r, RuntimeApiRequest::Validators(tx)) + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx)) ) if r == hash_a => { - let _ = tx.send(Ok(validators)); + let _ = tx.send(Ok(session_index)); } ); assert_matches!( handle.recv().await, AllMessages::RuntimeApi( - RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx)) + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx)) ) - if r == hash_a + if r == hash_a && sess_index == session_index => { - let _ = tx.send(Ok(session_index)); + let _ = tx.send(Ok(Some(session_info))); } ); @@ -2767,24 +2803,31 @@ mod tests { c }; - let peer_a = PeerId::random(); - let peer_b = PeerId::random(); - let peer_c = PeerId::random(); - let peer_bad = PeerId::random(); + let peer_a = PeerId::random(); // Alice + let peer_b = PeerId::random(); // Bob + let peer_c = PeerId::random(); // Charlie + let peer_bad = PeerId::random(); // No validator let validators = vec![ - Sr25519Keyring::Alice.public().into(), - Sr25519Keyring::Bob.public().into(), - Sr25519Keyring::Charlie.public().into(), + Sr25519Keyring::Alice.pair(), + Sr25519Keyring::Bob.pair(), + Sr25519Keyring::Charlie.pair(), + // We: + Sr25519Keyring::Ferdie.pair(), ]; + let session_info = make_session_info( + validators, + vec![vec![0,1,2,4], vec![3]] + ); + let session_index = 1; let pool = sp_core::testing::TaskExecutor::new(); let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let bg = async move { - let s = StatementDistribution { metrics: Default::default() }; + let s = StatementDistribution { metrics: Default::default(), keystore: make_ferdie_keystore()}; s.run(ctx).await.unwrap(); }; @@ -2808,40 +2851,52 @@ mod tests { assert_matches!( handle.recv().await, AllMessages::RuntimeApi( - RuntimeApiMessage::Request(r, RuntimeApiRequest::Validators(tx)) + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx)) ) if r == hash_a => { - let _ = tx.send(Ok(validators)); + let _ = tx.send(Ok(session_index)); } ); assert_matches!( handle.recv().await, AllMessages::RuntimeApi( - RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx)) + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx)) ) - if r == hash_a + if r == hash_a && sess_index == session_index => { - let _ = tx.send(Ok(session_index)); + let _ = tx.send(Ok(Some(session_info))); } ); // notify of peers and view handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None) + NetworkBridgeEvent::PeerConnected( + peer_a.clone(), + ObservedRole::Full, + Some(Sr25519Keyring::Alice.public().into()) + ) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None) + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + Some(Sr25519Keyring::Bob.public().into()) + ) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full, None) + NetworkBridgeEvent::PeerConnected( + peer_c.clone(), + ObservedRole::Full, + Some(Sr25519Keyring::Charlie.public().into()) + ) ) }).await; handle.send(FromOverseer::Communication { @@ -3111,7 +3166,15 @@ mod tests { ), ) ) => { - assert_eq!(recipients.sort(), vec![peer_b.clone(), peer_c.clone()].sort()); + tracing::debug!( + target: LOG_TARGET, + ?recipients, + "Recipients received" + ); + recipients.sort(); + let mut expected = vec![peer_b, peer_c, peer_bad]; + expected.sort(); + assert_eq!(recipients, expected); assert_eq!(meta.relay_parent, hash_a); assert_eq!(meta.candidate_hash, statement.payload().candidate_hash()); assert_eq!(meta.signed_by, statement.validator_index()); @@ -3181,4 +3244,307 @@ mod tests { executor::block_on(future::join(test_fut, bg)); } + + #[test] + fn share_prioritizes_backing_group() { + sp_tracing::try_init_simple(); + let hash_a = Hash::repeat_byte(1); + + let candidate = { + let mut c = CommittedCandidateReceipt::default(); + c.descriptor.relay_parent = hash_a; + c.descriptor.para_id = 1.into(); + c.commitments.new_validation_code = Some(ValidationCode(vec![1,2,3])); + c + }; + + let peer_a = PeerId::random(); // Alice + let peer_b = PeerId::random(); // Bob + let peer_c = PeerId::random(); // Charlie + let peer_bad = PeerId::random(); // No validator + let peer_other_group = PeerId::random(); //Ferdie + + let mut validators = vec![ + Sr25519Keyring::Alice.pair(), + Sr25519Keyring::Bob.pair(), + Sr25519Keyring::Charlie.pair(), + // other group + Sr25519Keyring::Dave.pair(), + // We: + Sr25519Keyring::Ferdie.pair(), + ]; + + // Strictly speaking we only need MIN_GOSSIP_PEERS - 3 to make sure only priority peers + // will be served, but by using a larger value we test for overflow errors: + let dummy_count = MIN_GOSSIP_PEERS; + + // We artificially inflate our group, so there won't be any free slots for other peers. (We + // want to test that our group is prioritized): + let dummy_pairs: Vec<_> = std::iter::repeat_with(|| Pair::generate().0).take(dummy_count).collect(); + let dummy_peers: Vec<_> = std::iter::repeat_with(|| PeerId::random()).take(dummy_count).collect(); + + validators = validators.into_iter().chain(dummy_pairs.clone()).collect(); + + let mut first_group = vec![0,1,2,4]; + first_group.append(&mut (0..dummy_count as u32).map(|v| v + 5).collect()); + let session_info = make_session_info( + validators, + vec![first_group, vec![3]] + ); + + let session_index = 1; + + let pool = sp_core::testing::TaskExecutor::new(); + let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + + let bg = async move { + let s = StatementDistribution { metrics: Default::default(), keystore: make_ferdie_keystore()}; + s.run(ctx).await.unwrap(); + }; + + let (mut tx_reqs, rx_reqs) = mpsc::channel(1); + + let test_fut = async move { + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::StatementFetchingReceiver(rx_reqs) + }).await; + + // register our active heads. + handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: vec![ActivatedLeaf { + hash: hash_a, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }].into(), + deactivated: vec![].into(), + }))).await; + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx)) + ) + if r == hash_a + => { + let _ = tx.send(Ok(session_index)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx)) + ) + if r == hash_a && sess_index == session_index + => { + let _ = tx.send(Ok(Some(session_info))); + } + ); + + // notify of dummy peers and view + for (peer, pair) in dummy_peers.clone().into_iter().zip(dummy_pairs) { + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer, + ObservedRole::Full, + Some(pair.public().into()), + ) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer, view![hash_a]) + ) + }).await; + } + + // notify of peers and view + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_a.clone(), + ObservedRole::Full, + Some(Sr25519Keyring::Alice.public().into()) + ) + ) + }).await; + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + Some(Sr25519Keyring::Bob.public().into()) + ) + ) + }).await; + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_c.clone(), + ObservedRole::Full, + Some(Sr25519Keyring::Charlie.public().into()) + ) + ) + }).await; + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None) + ) + }).await; + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_other_group.clone(), + ObservedRole::Full, + Some(Sr25519Keyring::Dave.public().into()) + ) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]) + ) + }).await; + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a]) + ) + }).await; + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a]) + ) + }).await; + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_other_group.clone(), view![hash_a]) + ) + }).await; + + // receive a seconded statement from peer A, which does not provide the request data, + // then get that data from peer C. It should be propagated onwards to peer B and to + // candidate backing. + let statement = { + let signing_context = SigningContext { + parent_hash: hash_a, + session_index, + }; + + let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory()); + let ferdie_public = CryptoStore::sr25519_generate_new( + &*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Ferdie.to_seed()) + ).await.unwrap(); + + SignedFullStatement::sign( + &keystore, + Statement::Seconded(candidate.clone()), + &signing_context, + ValidatorIndex(4), + &ferdie_public.into(), + ).await.ok().flatten().expect("should be signed") + }; + + let metadata = + protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()).get_metadata(); + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::Share(hash_a, statement.clone()) + }).await; + + // Messages should go out: + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + mut recipients, + protocol_v1::ValidationProtocol::StatementDistribution( + protocol_v1::StatementDistributionMessage::LargeStatement(meta) + ), + ) + ) => { + tracing::debug!( + target: LOG_TARGET, + ?recipients, + "Recipients received" + ); + recipients.sort(); + // We expect only our backing group to be the recipients, du to the inflated + // test group above: + let mut expected: Vec<_> = vec![peer_a, peer_b, peer_c].into_iter().chain(dummy_peers).collect(); + expected.sort(); + assert_eq!(recipients.len(), expected.len()); + assert_eq!(recipients, expected); + assert_eq!(meta.relay_parent, hash_a); + assert_eq!(meta.candidate_hash, statement.payload().candidate_hash()); + assert_eq!(meta.signed_by, statement.validator_index()); + assert_eq!(&meta.signature, statement.signature()); + } + ); + + // Now that it has the candidate it should answer requests accordingly: + + let (pending_response, response_rx) = oneshot::channel(); + let inner_req = StatementFetchingRequest { + relay_parent: metadata.relay_parent, + candidate_hash: metadata.candidate_hash, + }; + let req = sc_network::config::IncomingRequest { + peer: peer_b, + payload: inner_req.encode(), + pending_response, + }; + tx_reqs.send(req).await.unwrap(); + let StatementFetchingResponse::Statement(committed) = + Decode::decode(&mut response_rx.await.unwrap().result.unwrap().as_ref()).unwrap(); + assert_eq!(committed, candidate); + + handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::pin_mut!(test_fut); + futures::pin_mut!(bg); + + executor::block_on(future::join(test_fut, bg)); + } + + fn make_session_info(validators: Vec<Pair>, groups: Vec<Vec<u32>>) -> SessionInfo { + + let validator_groups: Vec<Vec<ValidatorIndex>> = groups + .iter().map(|g| g.into_iter().map(|v| ValidatorIndex(*v)).collect()).collect(); + + SessionInfo { + discovery_keys: validators.iter().map(|k| k.public().into()).collect(), + // Not used: + n_cores: validator_groups.len() as u32, + validator_groups, + validators: validators.iter().map(|k| k.public().into()).collect(), + // Not used values: + assignment_keys: Vec::new(), + zeroth_delay_tranche_width: 0, + relay_vrf_modulo_samples: 0, + n_delay_tranches: 0, + no_show_slots: 0, + needed_approvals: 0, + } + } + + pub fn make_ferdie_keystore() -> SyncCryptoStorePtr { + let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory()); + SyncCryptoStore::sr25519_generate_new( + &*keystore, + ValidatorId::ID, + Some(&Sr25519Keyring::Ferdie.to_seed()), + ) + .expect("Insert key into keystore"); + keystore + } } diff --git a/polkadot/node/network/statement-distribution/src/requester.rs b/polkadot/node/network/statement-distribution/src/requester.rs index 66a7979eda23477882d3d246a5fe221fb454d6ab..2368da3cd2c169d46dac20f3919fd5401b58c6be 100644 --- a/polkadot/node/network/statement-distribution/src/requester.rs +++ b/polkadot/node/network/statement-distribution/src/requester.rs @@ -20,7 +20,7 @@ use futures::{SinkExt, channel::{mpsc, oneshot}}; use polkadot_node_network_protocol::{ PeerId, UnifiedReputationChange, - request_response::{ + request_response::{ OutgoingRequest, Recipient, Requests, v1::{ StatementFetchingRequest, StatementFetchingResponse diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 28989d23f3568c5ff9604ee28473c2b31ae4767b..13582d3d233cf746121a2e320b94bf811817abba 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -530,6 +530,7 @@ where spawner.clone(), ), statement_distribution: StatementDistributionSubsystem::new( + keystore.clone(), Metrics::register(registry)?, ), approval_distribution: ApprovalDistributionSubsystem::new( diff --git a/polkadot/node/subsystem-util/src/error_handling.rs b/polkadot/node/subsystem-util/src/error_handling.rs new file mode 100644 index 0000000000000000000000000000000000000000..b2040e53ab00acbf679c25ae166f1e9d859d7f02 --- /dev/null +++ b/polkadot/node/subsystem-util/src/error_handling.rs @@ -0,0 +1,201 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! Utilities for general error handling in Polkadot. +//! +//! Goals: +//! +//! - Ergonomic API with little repetition. +//! - Still explicitness where it matters - fatal errors should be visible and justified. +//! - Easy recovering from non fatal errors. +//! - Errors start as non fatal and can be made fatal at the level where it is really clear they +//! are fatal. E.g. cancellation of a oneshot might be fatal in one case, but absolutely expected +//! in another. +//! - Good error messages. Fatal errors don't need to be properly structured (as we won't handle +//! them), but should provide good error messages of what is going on. +//! - Encourage many error types. One per module or even per function is totally fine - it makes +//! error handling robust, if you only need to handle errors that can actually happen, also error +//! messages will get better. + +use thiserror::Error; + +/// Error abstraction. +/// +/// Errors might either be fatal and should bring the subsystem down or are at least at the point +/// of occurrence deemed potentially recoverable. +/// +/// Upper layers might have a better view and might make a non fatal error of a called function a +/// fatal one. The opposite should not happen, therefore don't make an error fatal if you don't +/// know it is in all cases. +/// +/// Usage pattern: +/// +/// ``` +/// use thiserror::Error; +/// use polkadot_node_subsystem::errors::RuntimeApiError; +/// use polkadot_primitives::v1::SessionIndex; +/// use futures::channel::oneshot; +/// use polkadot_node_subsystem_util::{Fault, runtime}; +/// +/// #[derive(Debug, Error)] +/// #[error(transparent)] +/// pub struct Error(pub Fault<NonFatal, Fatal>); +/// +/// pub type Result<T> = std::result::Result<T, Error>; +/// pub type NonFatalResult<T> = std::result::Result<T, NonFatal>; +/// pub type FatalResult<T> = std::result::Result<T, Fatal>; +/// +/// // Make an error from a `NonFatal` one. +/// impl From<NonFatal> for Error { +/// fn from(e: NonFatal) -> Self { +/// Self(Fault::from_non_fatal(e)) +/// } +/// } +/// +/// // Make an Error from a `Fatal` one. +/// impl From<Fatal> for Error { +/// fn from(f: Fatal) -> Self { +/// Self(Fault::from_fatal(f)) +/// } +/// } +/// +/// // Easy conversion from sub error types from other modules: +/// impl From<runtime::Error> for Error { +/// fn from(o: runtime::Error) -> Self { +/// Self(Fault::from_other(o)) +/// } +/// } +/// +/// #[derive(Debug, Error)] +/// pub enum Fatal { +/// /// Really fatal stuff. +/// #[error("Something fatal happened.")] +/// SomeFatalError, +/// /// Errors coming from runtime::Runtime. +/// #[error("Error while accessing runtime information")] +/// Runtime(#[from] #[source] runtime::Fatal), +/// } +/// +/// #[derive(Debug, Error)] +/// pub enum NonFatal { +/// /// Some non fatal error. +/// /// For example if we prune a block we're requesting info about. +/// #[error("Non fatal error happened.")] +/// SomeNonFatalError, +/// +/// /// Errors coming from runtime::Runtime. +/// #[error("Error while accessing runtime information")] +/// Runtime(#[from] #[source] runtime::NonFatal), +/// } +/// ``` +/// Then mostly use `Error` in functions, you may also use `NonFatal` and `Fatal` directly in +/// functions that strictly only fail non fatal or fatal respectively, as `Fatal` and `NonFatal` +/// can automatically converted into the above defined `Error`. +/// ``` +#[derive(Debug, Error)] +pub enum Fault<E, F> + where + E: std::fmt::Debug + std::error::Error + 'static, + F: std::fmt::Debug + std::error::Error + 'static, { + /// Error is fatal and should be escalated up. + /// + /// While we usually won't want to pattern match on those, a concrete descriptive enum might + /// still be a good idea for easy auditing of what can go wrong in a module and also makes for + /// good error messages thanks to `thiserror`. + #[error("Fatal error occurred.")] + Fatal(#[source] F), + /// Error that is not fatal, at least not yet at this level of execution. + #[error("Non fatal error occurred.")] + Err(#[source] E), +} + +/// Due to typesystem constraints we cannot implement the following methods as standard +/// `From::from` implementations. So no auto conversions by default, a simple `Result::map_err` is +/// not too bad though. +impl<E, F> Fault<E, F> + where + E: std::fmt::Debug + std::error::Error + 'static, + F: std::fmt::Debug + std::error::Error + 'static, +{ + /// Build an `Fault` from compatible fatal error. + pub fn from_fatal<F1: Into<F>>(f: F1) -> Self { + Self::Fatal(f.into()) + } + + /// Build an `Fault` from compatible non fatal error. + pub fn from_non_fatal<E1: Into<E>>(e: E1) -> Self { + Self::Err(e.into()) + } + + /// Build an `Fault` from a compatible other `Fault`. + pub fn from_other<E1, F1>(e: Fault<E1, F1>) -> Self + where + E1: Into<E> + std::fmt::Debug + std::error::Error + 'static, + F1: Into<F> + std::fmt::Debug + std::error::Error + 'static, + { + match e { + Fault::Fatal(f) => Self::from_fatal(f), + Fault::Err(e) => Self::from_non_fatal(e), + } + } +} + +/// Unwrap non fatal error and report fatal one. +/// +/// This function is useful for top level error handling. Fatal errors will be extracted, +/// non fatal error will be returned for handling. +/// +/// Usage: +/// +/// ```no_run +/// # use thiserror::Error; +/// # use polkadot_node_subsystem_util::{Fault, unwrap_non_fatal}; +/// # use polkadot_node_subsystem::SubsystemError; +/// # #[derive(Error, Debug)] +/// # enum Fatal { +/// # } +/// # #[derive(Error, Debug)] +/// # enum NonFatal { +/// # } +/// # fn computation() -> Result<(), Fault<NonFatal, Fatal>> { +/// # panic!(); +/// # } +/// # +/// // Use run like so: +/// // run(ctx) +/// // .map_err(|e| SubsystemError::with_origin("subsystem-name", e)) +/// fn run() -> std::result::Result<(), Fatal> { +/// loop { +/// // .... +/// if let Some(err) = unwrap_non_fatal(computation())? { +/// println!("Something bad happened: {}", err); +/// continue +/// } +/// } +/// } +/// +/// ``` +pub fn unwrap_non_fatal<E,F>(result: Result<(), Fault<E,F>>) -> Result<Option<E>, F> + where + E: std::fmt::Debug + std::error::Error + 'static, + F: std::fmt::Debug + std::error::Error + Send + Sync + 'static +{ + match result { + Ok(()) => Ok(None), + Err(Fault::Fatal(f)) => Err(f), + Err(Fault::Err(e)) => Ok(Some(e)), + } +} diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index eb55d31cae32f98ac7192c74c1e2e80fa072d5bb..98f3dda1a345f5b68a0f90316b4bff109ff5742c 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -55,6 +55,11 @@ pub mod validator_discovery; pub use metered_channel as metered; pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS; +mod error_handling; + +/// Error classification. +pub use error_handling::{Fault, unwrap_non_fatal}; + /// These reexports are required so that external crates can use the `delegated_subsystem` macro properly. pub mod reexports { pub use sp_core::traits::SpawnNamed; diff --git a/polkadot/node/subsystem-util/src/runtime/error.rs b/polkadot/node/subsystem-util/src/runtime/error.rs index 9b298f5279e52ab90ad938769592cf3e0688ef35..94cbe05e945380420f0d45039842e38784087603 100644 --- a/polkadot/node/subsystem-util/src/runtime/error.rs +++ b/polkadot/node/subsystem-util/src/runtime/error.rs @@ -23,15 +23,36 @@ use futures::channel::oneshot; use polkadot_node_subsystem::errors::RuntimeApiError; use polkadot_primitives::v1::SessionIndex; +use crate::Fault; + pub type Result<T> = std::result::Result<T, Error>; -/// Errors for fetching of runtime information. +/// Errors for `Runtime` cache. +pub type Error = Fault<NonFatal, Fatal>; + +impl From<NonFatal> for Error { + fn from(e: NonFatal) -> Self { + Self::from_non_fatal(e) + } +} + +impl From<Fatal> for Error { + fn from(f: Fatal) -> Self { + Self::from_fatal(f) + } +} + +/// Fatal runtime errors. #[derive(Debug, Error)] -pub enum Error { +pub enum Fatal { /// Runtime API subsystem is down, which means we're shutting down. - #[error("Runtime request canceled")] + #[error("Runtime request got canceled")] RuntimeRequestCanceled(oneshot::Canceled), +} +/// Errors for fetching of runtime information. +#[derive(Debug, Error)] +pub enum NonFatal { /// Some request to the runtime failed. /// For example if we prune a block we're requesting info about. #[error("Runtime API error")] @@ -45,8 +66,9 @@ pub enum Error { /// Receive a response from a runtime request and convert errors. pub(crate) async fn recv_runtime<V>( r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>, -) -> std::result::Result<V, Error> { - r.await - .map_err(Error::RuntimeRequestCanceled)? - .map_err(Error::RuntimeRequest) +) -> Result<V> { + let result = r.await + .map_err(Fatal::RuntimeRequestCanceled)? + .map_err(NonFatal::RuntimeRequest)?; + Ok(result) } diff --git a/polkadot/node/subsystem-util/src/runtime/mod.rs b/polkadot/node/subsystem-util/src/runtime/mod.rs index 266415e0f5e4d530b2703188a1cf57828cc90f3c..0012c9f6b441bf9439c8028e3149025d1c38aca9 100644 --- a/polkadot/node/subsystem-util/src/runtime/mod.rs +++ b/polkadot/node/subsystem-util/src/runtime/mod.rs @@ -33,12 +33,12 @@ use crate::{ mod error; use error::{recv_runtime, Result}; -pub use error::Error; +pub use error::{Error, NonFatal, Fatal}; /// Caching of session info. /// /// It should be ensured that a cached session stays live in the cache as long as we might need it. -pub struct Runtime { +pub struct RuntimeInfo { /// Get the session index for a given relay parent. /// /// We query this up to a 100 times per block, so caching it here without roundtrips over the @@ -70,8 +70,8 @@ pub struct ValidatorInfo { pub our_group: Option<GroupIndex>, } -impl Runtime { - /// Create a new `Runtime` for convenient runtime fetches. +impl RuntimeInfo { + /// Create a new `RuntimeInfo` for convenient runtime fetches. pub fn new(keystore: SyncCryptoStorePtr) -> Self { Self { // Adjust, depending on how many forks we want to support. @@ -134,7 +134,7 @@ impl Runtime { let session_info = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await) .await? - .ok_or(Error::NoSuchSession(session_index))?; + .ok_or(NonFatal::NoSuchSession(session_index))?; let validator_info = self.get_validator_info(&session_info).await?; let full_info = ExtendedSessionInfo { diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 1c8fa9ab91c7050b5ff077db46d86b371a642bbc..7d6d39938ec081c0c7f86afc9daaf46d51d8a1e2 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -224,9 +224,13 @@ pub enum NetworkBridgeMessage { SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol), /// Send a batch of validation messages. + /// + /// NOTE: Messages will be processed in order (at least statement distribution relies on this). SendValidationMessages(Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>), /// Send a batch of collation messages. + /// + /// NOTE: Messages will be processed in order. SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>), /// Send requests via substrate request/response.