Unverified Commit 117466aa authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Remove request multiplexer (#3624)

* WIP: Get rid of request multiplexer.

* WIP

* Receiver for handling of incoming requests.

* Get rid of useless `Fault` abstraction.

The things the type system let us do are not worth getting abstracted in
its own type. Instead error handling is going to be merely a pattern.

* Make most things compile again.

* Port availability distribution away from request multiplexer.

* Formatting.

* Port dispute distribution over.

* Fixup statement distribution.

* Handle request directly in collator protocol.

+ Only allow fatal errors at top level.

* Use direct request channel for availability recovery.

* Finally get rid of request multiplexer

Fixes #2842 and paves the way for more back pressure possibilities.

* Fix overseer and statement distribution tests.

* Fix collator protocol and network bridge tests.

* Fix tests in availability recovery.

* Fix availability distribution tests.

* Fix dispute distribution tests.

* Add missing dependency

* Typos.

* Review remarks.

* More remarks.
parent 686e3180
Pipeline #152582 passed with stages
in 42 minutes and 6 seconds
......@@ -1532,13 +1532,14 @@ dependencies = [
[[package]]
name = "derive_more"
version = "0.99.14"
version = "0.99.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc7b9cef1e351660e5443924e4f43ab25fbbed3e9a5f052df3677deb4d6b320"
checksum = "40eebddd2156ce1bb37b20bbe5151340a31828b1f2d22ba4141f3531710e38df"
dependencies = [
"convert_case",
"proc-macro2",
"quote",
"rustc_version 0.3.3",
"syn",
]
......@@ -2476,7 +2477,7 @@ dependencies = [
"cc",
"libc",
"log",
"rustc_version",
"rustc_version 0.2.3",
"winapi 0.3.9",
]
......@@ -2845,7 +2846,7 @@ dependencies = [
"itoa",
"log",
"net2",
"rustc_version",
"rustc_version 0.2.3",
"time",
"tokio 0.1.22",
"tokio-buf",
......@@ -5650,7 +5651,7 @@ checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252"
dependencies = [
"lock_api 0.3.4",
"parking_lot_core 0.6.2",
"rustc_version",
"rustc_version 0.2.3",
]
[[package]]
......@@ -5684,7 +5685,7 @@ dependencies = [
"cloudabi 0.0.3",
"libc",
"redox_syscall 0.1.56",
"rustc_version",
"rustc_version 0.2.3",
"smallvec 0.6.13",
"winapi 0.3.9",
]
......@@ -5942,6 +5943,7 @@ name = "polkadot-availability-distribution"
version = "0.9.9"
dependencies = [
"assert_matches",
"derive_more",
"futures 0.3.16",
"futures-timer 3.0.2",
"lru",
......@@ -6053,10 +6055,12 @@ version = "0.9.9"
dependencies = [
"always-assert",
"assert_matches",
"derive_more",
"env_logger 0.8.4",
"futures 0.3.16",
"futures-timer 3.0.2",
"log",
"parity-scale-codec",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
......@@ -6089,6 +6093,7 @@ version = "0.9.9"
dependencies = [
"assert_matches",
"async-trait",
"derive_more",
"futures 0.3.16",
"futures-timer 3.0.2",
"lazy_static",
......@@ -6170,7 +6175,6 @@ dependencies = [
"sp-consensus",
"sp-core",
"sp-keyring",
"strum",
"tracing",
]
......@@ -6522,6 +6526,7 @@ name = "polkadot-node-network-protocol"
version = "0.9.9"
dependencies = [
"async-trait",
"derive_more",
"futures 0.3.16",
"parity-scale-codec",
"polkadot-node-jaeger",
......@@ -6628,6 +6633,7 @@ version = "0.9.9"
dependencies = [
"assert_matches",
"async-trait",
"derive_more",
"env_logger 0.8.4",
"futures 0.3.16",
"futures-timer 3.0.2",
......@@ -7023,6 +7029,7 @@ dependencies = [
"polkadot-node-core-parachains-inherent",
"polkadot-node-core-provisioner",
"polkadot-node-core-runtime-api",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
......@@ -7144,6 +7151,7 @@ version = "0.9.9"
dependencies = [
"arrayvec 0.5.2",
"assert_matches",
"derive_more",
"futures 0.3.16",
"futures-timer 3.0.2",
"indexmap",
......@@ -8152,6 +8160,15 @@ dependencies = [
"semver 0.9.0",
]
[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver 0.11.0",
]
[[package]]
name = "rustls"
version = "0.18.0"
......@@ -9663,7 +9680,7 @@ dependencies = [
"rand 0.7.3",
"rand_core 0.5.1",
"ring",
"rustc_version",
"rustc_version 0.2.3",
"sha2 0.9.2",
"subtle 2.2.3",
"x25519-dalek 0.6.0",
......
......@@ -20,6 +20,7 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master",
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
thiserror = "1.0.26"
rand = "0.8.3"
derive_more = "0.99.11"
lru = "0.6.6"
[dev-dependencies]
......
......@@ -17,35 +17,31 @@
//! Error handling related code and Error/Result definitions.
use polkadot_node_network_protocol::request_response::request::RequestError;
use polkadot_node_network_protocol::request_response::outgoing::RequestError;
use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::{runtime, unwrap_non_fatal, Fault};
use polkadot_node_subsystem_util::runtime;
use polkadot_subsystem::SubsystemError;
use crate::LOG_TARGET;
#[derive(Debug, Error)]
#[derive(Debug, Error, derive_more::From)]
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);
impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}
impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
pub enum Error {
/// All fatal errors.
Fatal(Fatal),
/// All nonfatal/potentially recoverable errors.
NonFatal(NonFatal),
}
impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
match o {
runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)),
runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)),
}
}
}
......@@ -107,15 +103,23 @@ pub enum NonFatal {
Runtime(#[from] runtime::NonFatal),
}
/// General result type for fatal/nonfatal errors.
pub type Result<T> = std::result::Result<T, Error>;
/// Results which are never fatal.
pub type NonFatalResult<T> = std::result::Result<T, NonFatal>;
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), Fatal> {
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
match result {
Err(Error::Fatal(f)) => Err(f),
Err(Error::NonFatal(error)) => {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
Ok(())
},
Ok(()) => Ok(()),
}
Ok(())
}
......@@ -18,6 +18,7 @@ use futures::{future::Either, FutureExt, StreamExt, TryFutureExt};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_network_protocol::request_response::{v1, IncomingRequestReceiver};
use polkadot_subsystem::{
messages::AvailabilityDistributionMessage, overseer, FromOverseer, OverseerSignal,
SpawnedSubsystem, SubsystemContext, SubsystemError,
......@@ -38,7 +39,7 @@ mod pov_requester;
/// Responding to erasure chunk requests:
mod responder;
use responder::{answer_chunk_request_log, answer_pov_request_log};
use responder::{run_chunk_receiver, run_pov_receiver};
mod metrics;
/// Prometheus `Metrics` for availability distribution.
......@@ -53,10 +54,20 @@ const LOG_TARGET: &'static str = "parachain::availability-distribution";
pub struct AvailabilityDistributionSubsystem {
/// Easy and efficient runtime access for this subsystem.
runtime: RuntimeInfo,
/// Receivers to receive messages from.
recvs: IncomingRequestReceivers,
/// Prometheus metrics.
metrics: Metrics,
}
/// Receivers to be passed into availability distribution.
pub struct IncomingRequestReceivers {
/// Receiver for incoming PoV requests.
pub pov_req_receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
/// Receiver for incoming availability chunk requests.
pub chunk_req_receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
}
impl<Context> overseer::Subsystem<Context, SubsystemError> for AvailabilityDistributionSubsystem
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
......@@ -74,18 +85,41 @@ where
impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
pub fn new(
keystore: SyncCryptoStorePtr,
recvs: IncomingRequestReceivers,
metrics: Metrics,
) -> Self {
let runtime = RuntimeInfo::new(Some(keystore));
Self { runtime, metrics }
Self { runtime, recvs, metrics }
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), Fatal>
async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), Fatal>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Context: overseer::SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let mut requester = Requester::new(self.metrics.clone()).fuse();
let Self { mut runtime, recvs, metrics } = self;
let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs;
let mut requester = Requester::new(metrics.clone()).fuse();
{
let sender = ctx.sender().clone();
ctx.spawn(
"pov-receiver",
run_pov_receiver(sender.clone(), pov_req_receiver, metrics.clone()).boxed(),
)
.map_err(Fatal::SpawnTask)?;
ctx.spawn(
"chunk-receiver",
run_chunk_receiver(sender, chunk_req_receiver, metrics.clone()).boxed(),
)
.map_err(Fatal::SpawnTask)?;
}
loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
......@@ -110,19 +144,13 @@ impl AvailabilityDistributionSubsystem {
log_error(
requester
.get_mut()
.update_fetching_heads(&mut ctx, &mut self.runtime, update)
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await,
"Error in Requester::update_fetching_heads",
)?;
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::ChunkFetchingRequest(req),
} => answer_chunk_request_log(&mut ctx, req, &self.metrics).await,
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::PoVFetchingRequest(req),
} => answer_pov_request_log(&mut ctx, req, &self.metrics).await,
FromOverseer::Communication {
msg:
AvailabilityDistributionMessage::FetchPoV {
......@@ -136,7 +164,7 @@ impl AvailabilityDistributionSubsystem {
log_error(
pov_requester::fetch_pov(
&mut ctx,
&mut self.runtime,
&mut runtime,
relay_parent,
from_validator,
candidate_hash,
......
......@@ -19,7 +19,7 @@
use futures::{channel::oneshot, future::BoxFuture, FutureExt};
use polkadot_node_network_protocol::request_response::{
request::{RequestError, Requests},
outgoing::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse},
OutgoingRequest, Recipient,
};
......
......@@ -24,7 +24,7 @@ use futures::{
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::request_response::{
request::{OutgoingRequest, Recipient, RequestError, Requests},
outgoing::{OutgoingRequest, Recipient, RequestError, Requests},
v1::{ChunkFetchingRequest, ChunkFetchingResponse},
};
use polkadot_node_primitives::ErasureChunk;
......
......@@ -20,28 +20,93 @@ use std::sync::Arc;
use futures::channel::oneshot;
use polkadot_node_network_protocol::request_response::{request::IncomingRequest, v1};
use polkadot_node_network_protocol::{
request_response::{incoming, v1, IncomingRequest, IncomingRequestReceiver},
UnifiedReputationChange as Rep,
};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_primitives::v1::{CandidateHash, ValidatorIndex};
use polkadot_subsystem::{jaeger, messages::AvailabilityStoreMessage, SubsystemContext};
use polkadot_subsystem::{jaeger, messages::AvailabilityStoreMessage, SubsystemSender};
use crate::{
error::{NonFatal, Result},
error::{NonFatal, NonFatalResult, Result},
metrics::{Metrics, FAILED, NOT_FOUND, SUCCEEDED},
LOG_TARGET,
};
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
/// Receiver task to be forked as a separate task to handle PoV requests.
pub async fn run_pov_receiver<Sender>(
mut sender: Sender,
mut receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
metrics: Metrics,
) where
Sender: SubsystemSender,
{
loop {
match receiver.recv(|| vec![COST_INVALID_REQUEST]).await {
Ok(msg) => {
answer_pov_request_log(&mut sender, msg, &metrics).await;
},
Err(incoming::Error::Fatal(f)) => {
tracing::debug!(
target: LOG_TARGET,
error = ?f,
"Shutting down POV receiver."
);
return
},
Err(incoming::Error::NonFatal(error)) => {
tracing::debug!(target: LOG_TARGET, ?error, "Error decoding incoming PoV request.");
},
}
}
}
/// Receiver task to be forked as a separate task to handle chunk requests.
pub async fn run_chunk_receiver<Sender>(
mut sender: Sender,
mut receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
metrics: Metrics,
) where
Sender: SubsystemSender,
{
loop {
match receiver.recv(|| vec![COST_INVALID_REQUEST]).await {
Ok(msg) => {
answer_chunk_request_log(&mut sender, msg, &metrics).await;
},
Err(incoming::Error::Fatal(f)) => {
tracing::debug!(
target: LOG_TARGET,
error = ?f,
"Shutting down chunk receiver."
);
return
},
Err(incoming::Error::NonFatal(error)) => {
tracing::debug!(
target: LOG_TARGET,
?error,
"Error decoding incoming chunk request."
);
},
}
}
}
/// Variant of `answer_pov_request` that does Prometheus metric and logging on errors.
///
/// Any errors of `answer_pov_request` will simply be logged.
pub async fn answer_pov_request_log<Context>(
ctx: &mut Context,
pub async fn answer_pov_request_log<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::PoVFetchingRequest>,
metrics: &Metrics,
) where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let res = answer_pov_request(ctx, req).await;
let res = answer_pov_request(sender, req).await;
match res {
Ok(result) => metrics.on_served_pov(if result { SUCCEEDED } else { NOT_FOUND }),
Err(err) => {
......@@ -58,15 +123,15 @@ pub async fn answer_pov_request_log<Context>(
/// Variant of `answer_chunk_request` that does Prometheus metric and logging on errors.
///
/// Any errors of `answer_request` will simply be logged.
pub async fn answer_chunk_request_log<Context>(
ctx: &mut Context,
pub async fn answer_chunk_request_log<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::ChunkFetchingRequest>,
metrics: &Metrics,
) -> ()
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let res = answer_chunk_request(ctx, req).await;
let res = answer_chunk_request(sender, req).await;
match res {
Ok(result) => metrics.on_served_chunk(if result { SUCCEEDED } else { NOT_FOUND }),
Err(err) => {
......@@ -83,16 +148,16 @@ where
/// Answer an incoming PoV fetch request by querying the av store.
///
/// Returns: `Ok(true)` if chunk was found and served.
pub async fn answer_pov_request<Context>(
ctx: &mut Context,
pub async fn answer_pov_request<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::PoVFetchingRequest>,
) -> Result<bool>
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let _span = jaeger::Span::new(req.payload.candidate_hash, "answer-pov-request");
let av_data = query_available_data(ctx, req.payload.candidate_hash).await?;
let av_data = query_available_data(sender, req.payload.candidate_hash).await?;
let result = av_data.is_some();
......@@ -111,18 +176,18 @@ where
/// Answer an incoming chunk request by querying the av store.
///
/// Returns: `Ok(true)` if chunk was found and served.
pub async fn answer_chunk_request<Context>(
ctx: &mut Context,
pub async fn answer_chunk_request<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::ChunkFetchingRequest>,
) -> Result<bool>
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let span = jaeger::Span::new(req.payload.candidate_hash, "answer-chunk-request");
let _child_span = span.child("answer-chunk-request").with_chunk_index(req.payload.index.0);
let chunk = query_chunk(ctx, req.payload.candidate_hash, req.payload.index).await?;
let chunk = query_chunk(sender, req.payload.candidate_hash, req.payload.index).await?;
let result = chunk.is_some();
......@@ -145,16 +210,19 @@ where
}
/// Query chunk from the availability store.
async fn query_chunk<Context>(
ctx: &mut Context,
async fn query_chunk<Sender>(
sender: &mut Sender,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
) -> Result<Option<ErasureChunk>>
) -> NonFatalResult<Option<ErasureChunk>>
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx))
sender
.send_message(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx).into(),
)
.await;
let result = rx.await.map_err(|e| {
......@@ -171,15 +239,16 @@ where
}
/// Query PoV from the availability store.
async fn query_available_data<Context>(
ctx: &mut Context,
async fn query_available_data<Sender>(
sender: &mut Sender,
candidate_hash: CandidateHash,
) -> Result<Option<AvailableData>>
) -> NonFatalResult<Option<AvailableData>>
where
Context: SubsystemContext,
Sender: SubsystemSender,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx))
sender
.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx).into())
.await;
let result = rx.await.map_err(|e| NonFatal::QueryAvailableDataResponseChannel(e))?;
......
......@@ -18,6 +18,7 @@ use std::collections::HashSet;
use futures::{executor, future, Future};
use polkadot_node_network_protocol::request_response::IncomingRequest;
use polkadot_primitives::v1::CoreState;
use sp_keystore::SyncCryptoStorePtr;
......@@ -41,17 +42,21 @@ fn test_harness<T: Future<Output = ()>>(
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default());
{
let subsystem = subsystem.run(context);
let (pov_req_receiver, pov_req_cfg) = IncomingRequest::get_config_receiver();
let (chunk_req_receiver, chunk_req_cfg) = IncomingRequest::get_config_receiver();
let subsystem = AvailabilityDistributionSubsystem::new(
keystore,
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
Default::default(),
);
let subsystem = subsystem.run(context);
let test_fut = test_fx(TestHarness { virtual_overseer, pool });
let test_fut = test_fx(TestHarness { virtual_overseer, pov_req_cfg, chunk_req_cfg, pool });
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::join(test_fut, subsystem)).1.unwrap();
}
executor::block_on(future::join(test_fut, subsystem)).1.unwrap();
}
/// Simple basic check, whether the subsystem works as expected.
......