Unverified Commit dcd672cb authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Send statements to own backing group first (#2927)

* Factor out runtime module into utils.

* First fatal error design.

* Better error handling infra.

* Error handling cleanup.

* Send to peers of our group first.

* Finish backing group prioritization.

* Little cleanup.

* More cleanup.

* Forgot to checkin error.rs.

* Notes.

* Runtime -> RuntimeInfo

* qed in debug assert.

* PolkaErr -> Fault.
parent 34896969
Pipeline #136034 failed with stages
in 31 minutes and 21 seconds
......@@ -6568,6 +6568,7 @@ dependencies = [
"sp-keystore",
"sp-staking",
"sp-tracing",
"thiserror",
"tracing",
]
......
......@@ -23,30 +23,67 @@ use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::{
runtime,
Error as UtilError,
};
use polkadot_node_subsystem_util::{Fault, Error as UtilError, runtime, unwrap_non_fatal};
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};
use crate::LOG_TARGET;
/// Errors of this subsystem.
#[derive(Debug, Error)]
pub enum Error {
#[error("Response channel to obtain chunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);
#[error("Response channel to obtain available data failed")]
QueryAvailableDataResponseChannel(#[source] oneshot::Canceled),
impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
}
impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
}
}
/// Fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum Fatal {
/// Spawning a running task failed.
#[error("Spawning subsystem task failed")]
SpawnTask(#[source] SubsystemError),
/// Runtime API subsystem is down, which means we're shutting down.
#[error("Runtime request canceled")]
RuntimeRequestCanceled(oneshot::Canceled),
/// Requester stream exhausted.
#[error("Erasure chunk requester stream exhausted")]
RequesterExhausted,
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::Fatal),
}
/// Non fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum NonFatal {
/// av-store will drop the sender on any error that happens.
#[error("Response channel to obtain chunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
/// av-store will drop the sender on any error that happens.
#[error("Response channel to obtain available data failed")]
QueryAvailableDataResponseChannel(#[source] oneshot::Canceled),
/// We tried accessing a session that was not cached.
#[error("Session is not cached.")]
NoSuchCachedSession,
......@@ -55,11 +92,7 @@ pub enum Error {
#[error("Not a validator.")]
NotAValidator,
/// Requester stream exhausted.
#[error("Erasure chunk requester stream exhausted")]
RequesterExhausted,
/// Sending response failed.
/// Sending request response failed (Can happen on timeouts for example).
#[error("Sending a request's response failed.")]
SendResponse,
......@@ -68,10 +101,6 @@ pub enum Error {
#[error("Utility request failed")]
UtilRequest(UtilError),
/// Runtime API subsystem is down, which means we're shutting down.
#[error("Runtime request canceled")]
RuntimeRequestCanceled(oneshot::Canceled),
/// Some request to the runtime failed.
/// For example if we prune a block we're requesting info about.
#[error("Runtime API error")]
......@@ -98,39 +127,30 @@ pub enum Error {
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[source] runtime::Error),
Runtime(#[from] #[source] runtime::NonFatal),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<runtime::Error> for Error {
fn from(err: runtime::Error) -> Self {
Self::Runtime(err)
}
}
impl From<SubsystemError> for Error {
fn from(err: SubsystemError) -> Self {
Self::IncomingMessageChannel(err)
}
}
/// Receive a response from a runtime request and convert errors.
pub(crate) async fn recv_runtime<V>(
r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
) -> std::result::Result<V, Error> {
r.await
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
}
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) {
if let Err(error) = result {
pub fn log_error(result: Result<()>, ctx: &'static str)
-> std::result::Result<(), Fatal>
{
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
}
Ok(())
}
/// Receive a response from a runtime request and convert errors.
pub(crate) async fn recv_runtime<V>(
r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
) -> Result<V> {
let result = r.await
.map_err(Fatal::RuntimeRequestCanceled)?
.map_err(NonFatal::RuntimeRequest)?;
Ok(result)
}
......@@ -25,10 +25,10 @@ use polkadot_subsystem::{
/// Error and [`Result`] type for this subsystem.
mod error;
pub use error::Error;
pub use error::{Fatal, NonFatal};
use error::{Result, log_error};
use polkadot_node_subsystem_util::runtime::Runtime;
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
/// `Requester` taking care of requesting chunks for candidates pending availability.
mod requester;
......@@ -59,7 +59,7 @@ pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: SyncCryptoStorePtr,
/// Easy and efficient runtime access for this subsystem.
runtime: Runtime,
runtime: RuntimeInfo,
/// Prometheus metrics.
metrics: Metrics,
}
......@@ -85,12 +85,12 @@ impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
let runtime = Runtime::new(keystore.clone());
let runtime = RuntimeInfo::new(keystore.clone());
Self { keystore, runtime, metrics }
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(mut self, mut ctx: Context) -> Result<()>
async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), Fatal>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
......@@ -108,10 +108,10 @@ impl AvailabilityDistributionSubsystem {
// Handle task messages sending:
let message = match action {
Either::Left(subsystem_msg) => {
subsystem_msg.map_err(|e| Error::IncomingMessageChannel(e))?
subsystem_msg.map_err(|e| Fatal::IncomingMessageChannel(e))?
}
Either::Right(from_task) => {
let from_task = from_task.ok_or(Error::RequesterExhausted)?;
let from_task = from_task.ok_or(Fatal::RequesterExhausted)?;
ctx.send_message(from_task).await;
continue;
}
......@@ -133,7 +133,7 @@ impl AvailabilityDistributionSubsystem {
log_error(
requester.get_mut().update_fetching_heads(&mut ctx, update).await,
"Error in Requester::update_fetching_heads"
);
)?;
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => {
......@@ -169,7 +169,7 @@ impl AvailabilityDistributionSubsystem {
tx,
).await,
"PoVRequester::fetch_pov"
);
)?;
}
}
}
......
......@@ -33,9 +33,10 @@ use polkadot_subsystem::{
ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
messages::{AllMessages, NetworkBridgeMessage, IfDisconnected}
};
use polkadot_node_subsystem_util::runtime::{Runtime, ValidatorInfo};
use polkadot_node_subsystem_util::runtime::{RuntimeInfo, ValidatorInfo};
use crate::error::{Error, log_error};
use crate::error::{Fatal, NonFatal};
use crate::LOG_TARGET;
/// Number of sessions we want to keep in the LRU.
const NUM_SESSIONS: usize = 2;
......@@ -63,7 +64,7 @@ impl PoVRequester {
pub async fn update_connected_validators<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut Runtime,
runtime: &mut RuntimeInfo,
update: &ActiveLeavesUpdate,
) -> super::Result<()>
where
......@@ -87,7 +88,7 @@ impl PoVRequester {
pub async fn fetch_pov<Context>(
&self,
ctx: &mut Context,
runtime: &mut Runtime,
runtime: &mut RuntimeInfo,
parent: Hash,
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
......@@ -99,7 +100,7 @@ impl PoVRequester {
{
let info = &runtime.get_session_info(ctx, parent).await?.session_info;
let authority_id = info.discovery_keys.get(from_validator.0 as usize)
.ok_or(Error::InvalidValidatorIndex)?
.ok_or(NonFatal::InvalidValidatorIndex)?
.clone();
let (req, pending_response) = OutgoingRequest::new(
Recipient::Authority(authority_id),
......@@ -125,7 +126,8 @@ impl PoVRequester {
.with_relay_parent(parent);
ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
.await
.map_err(|e| Error::SpawnTask(e))
.map_err(|e| Fatal::SpawnTask(e))?;
Ok(())
}
}
......@@ -136,10 +138,13 @@ async fn fetch_pov_job(
span: jaeger::Span,
tx: oneshot::Sender<PoV>,
) {
log_error(
do_fetch_pov(pov_hash, pending_response, span, tx).await,
"fetch_pov_job",
)
if let Err(err) = do_fetch_pov(pov_hash, pending_response, span, tx).await {
tracing::warn!(
target: LOG_TARGET,
?err,
"fetch_pov_job"
);
}
}
/// Do the actual work of waiting for the response.
......@@ -149,24 +154,24 @@ async fn do_fetch_pov(
_span: jaeger::Span,
tx: oneshot::Sender<PoV>,
)
-> super::Result<()>
-> std::result::Result<(), NonFatal>
{
let response = pending_response.await.map_err(Error::FetchPoV)?;
let response = pending_response.await.map_err(NonFatal::FetchPoV)?;
let pov = match response {
PoVFetchingResponse::PoV(pov) => pov,
PoVFetchingResponse::NoSuchPoV => {
return Err(Error::NoSuchPoV)
return Err(NonFatal::NoSuchPoV)
}
};
if pov.hash() == pov_hash {
tx.send(pov).map_err(|_| Error::SendResponse)
tx.send(pov).map_err(|_| NonFatal::SendResponse)
} else {
Err(Error::UnexpectedPoV)
Err(NonFatal::UnexpectedPoV)
}
}
/// Get the session indeces for the given relay chain parents.
async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut Runtime, new_heads: impl Iterator<Item = &Hash>)
async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut RuntimeInfo, new_heads: impl Iterator<Item = &Hash>)
-> super::Result<impl Iterator<Item = (Hash, SessionIndex)>>
where
Context: SubsystemContext,
......@@ -181,7 +186,7 @@ where
/// Connect to validators of our validator group.
async fn connect_to_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
runtime: &mut RuntimeInfo,
parent: Hash,
session: SessionIndex
)
......@@ -206,7 +211,7 @@ where
/// Return: `None` if not a validator.
async fn determine_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
runtime: &mut RuntimeInfo,
parent: Hash,
session: SessionIndex,
)
......@@ -275,7 +280,7 @@ mod tests {
let (mut context, mut virtual_overseer) =
test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone());
let keystore = make_ferdie_keystore();
let mut runtime = polkadot_node_subsystem_util::runtime::Runtime::new(keystore);
let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(keystore);
let (tx, rx) = oneshot::channel();
let testee = async {
......
......@@ -34,7 +34,7 @@ use polkadot_subsystem::messages::{
use polkadot_subsystem::{SubsystemContext, jaeger};
use crate::{
error::{Error, Result},
error::{Fatal, Result},
session_cache::{BadValidators, SessionInfo},
LOG_TARGET,
metrics::{Metrics, SUCCEEDED, FAILED},
......@@ -191,7 +191,7 @@ impl FetchTask {
ctx.spawn("chunk-fetcher", running.run(kill).boxed())
.await
.map_err(|e| Error::SpawnTask(e))?;
.map_err(|e| Fatal::SpawnTask(e))?;
Ok(FetchTask {
live_in,
......
......@@ -28,7 +28,7 @@ use polkadot_subsystem::{
SubsystemContext, jaeger,
};
use crate::error::{Error, Result};
use crate::error::{NonFatal, Result};
use crate::{LOG_TARGET, metrics::{Metrics, SUCCEEDED, FAILED, NOT_FOUND}};
/// Variant of `answer_pov_request` that does Prometheus metric and logging on errors.
......@@ -107,7 +107,7 @@ where
}
};
req.send_response(response).map_err(|_| Error::SendResponse)?;
req.send_response(response).map_err(|_| NonFatal::SendResponse)?;
Ok(result)
}
......@@ -144,7 +144,7 @@ where
Some(chunk) => v1::ChunkFetchingResponse::Chunk(chunk.into()),
};
req.send_response(response).map_err(|_| Error::SendResponse)?;
req.send_response(response).map_err(|_| NonFatal::SendResponse)?;
Ok(result)
}
......@@ -164,7 +164,7 @@ where
))
.await;
rx.await.map_err(|e| {
let result = rx.await.map_err(|e| {
tracing::trace!(
target: LOG_TARGET,
?validator_index,
......@@ -172,8 +172,9 @@ where
error = ?e,
"Error retrieving chunk",
);
Error::QueryChunkResponseChannel(e)
})
NonFatal::QueryChunkResponseChannel(e)
})?;
Ok(result)
}
/// Query PoV from the availability store.
......@@ -191,5 +192,6 @@ where
))
.await;
rx.await.map_err(|e| Error::QueryAvailableDataResponseChannel(e))
let result = rx.await.map_err(|e| NonFatal::QueryAvailableDataResponseChannel(e))?;
Ok(result)
}
......@@ -33,7 +33,7 @@ use polkadot_primitives::v1::{
use polkadot_subsystem::SubsystemContext;
use super::{
error::{recv_runtime, Error},
error::{recv_runtime, Error, NonFatal},
LOG_TARGET,
};
......@@ -189,9 +189,9 @@ impl SessionCache {
let session = self
.session_info_cache
.get_mut(&report.session_index)
.ok_or(Error::NoSuchCachedSession)?
.ok_or(NonFatal::NoSuchCachedSession)?
.as_mut()
.ok_or(Error::NotAValidator)?;
.ok_or(NonFatal::NotAValidator)?;
let group = session
.validator_groups
.get_mut(report.group_index.0 as usize)
......@@ -231,7 +231,7 @@ impl SessionCache {
..
} = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await)
.await?
.ok_or(Error::NoSuchSession(session_index))?;
.ok_or(NonFatal::NoSuchSession(session_index))?;
if let Some(our_index) = self.get_our_index(validators).await {
// Get our group index:
......
......@@ -10,6 +10,7 @@ futures = "0.3.12"
tracing = "0.1.25"
polkadot-primitives = { path = "../../../primitives" }
sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-primitives = { path = "../../primitives" }
......@@ -18,6 +19,7 @@ polkadot-node-network-protocol = { path = "../../network/protocol" }
arrayvec = "0.5.2"
indexmap = "1.6.1"
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
thiserror = "1.0.23"
[dev-dependencies]
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
......
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//
//! Error handling related code and Error/Result definitions.
use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::{CandidateHash, Hash};
use polkadot_subsystem::SubsystemError;
use thiserror::Error;
use polkadot_node_subsystem_util::{Fault, runtime, unwrap_non_fatal};
use crate::LOG_TARGET;
/// General result.
pub type Result<T> = std::result::Result<T, Error>;
/// Result for non fatal only failures.
pub type NonFatalResult<T> = std::result::Result<T, NonFatal>;
/// Result for fatal only failures.
pub type FatalResult<T> = std::result::Result<T, Fatal>;
/// Errors for statement distribution.
#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);
impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}
impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
}
impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
}
}
/// Fatal runtime errors.
#[derive(Debug, Error)]
pub enum Fatal {
/// Requester channel is never closed.
#[error("Requester receiver stream finished.")]
RequesterReceiverFinished,
/// Responder channel is never closed.
#[error("Responder receiver stream finished.")]
ResponderReceiverFinished,
/// Spawning a running task failed.
#[error("Spawning subsystem task failed")]
SpawnTask(#[source] SubsystemError),
/// Receiving subsystem message from overseer failed.
#[error("Receiving message from overseer failed")]
SubsystemReceive(#[source] SubsystemError),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::Fatal),
}
/// Errors for fetching of runtime information.
#[derive(Debug, Error)]
pub enum NonFatal {
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::NonFatal),
/// Relay parent was not present in active heads.
#[error("Relay parent could not be found in active heads")]
NoSuchHead(Hash),
/// Peer requested statement data for candidate that was never announced to it.
#[error("Peer requested data for candidate it never received a notification for")]
RequestedUnannouncedCandidate(PeerId, CandidateHash),
/// A large statement status was requested, which could not be found.
#[error("Statement status does not exist")]
NoSuchLargeStatementStatus(Hash, CandidateHash),
/// A fetched large statement was requested, but could not be found.
#[error("Fetched large statement does not exist")]
NoSuchFetchedLargeStatement(Hash, CandidateHash),
/// Responder no longer waits for our data. (Should not happen right now.)
#[error("Oneshot `GetData` channel closed")]
ResponderGetDataCanceled,
}
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them.
pub fn log_error(result: Result<()>, ctx: &'static str)
-> FatalResult<()>
{
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::debug!(target: LOG_TARGET, error = ?error, ctx)
}
Ok(())