Unverified Commit 39124b0a authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Request based PoV distribution (#2640)

* Indentation fix.

* Prepare request-response for PoV fetching.

* Drop old PoV distribution.

* WIP: Fetch PoV directly from backing.

* Backing compiles.

* Runtime access and connection management for PoV distribution.

* Get rid of seemingly dead code.

* Implement PoV fetching.

Backing does not yet use it.

* Don't send `ConnectToValidators` for empty list.

* Even better - no need to check over and over again.

* PoV fetching implemented.

+ Typechecks
+ Should work

Missing:

- Guide
- Tests
- Do fallback fetching in case fetching from seconding validator fails.

* Check PoV hash upon reception.

* Implement retry of PoV fetching in backing.

* Avoid pointless validation spawning.

* Add jaeger span to pov requesting.

* Add back tracing.

* Review remarks.

* Whitespace.

* Whitespace again.

* Cleanup + fix tests.

* Log to log target in overseer.

* Fix more tests.

* Don't fail if group cannot be found.

* Simple test for PoV fetcher.

* Handle missing group membership better.

* Add test for retry functionality.

* Fix flaky test.

* Spaces again.

* Guide updates.

* Spaces.
parent 549f0ebd
Pipeline #131079 failed with stages
in 25 minutes and 57 seconds
......@@ -5627,6 +5627,7 @@ dependencies = [
"sp-core",
"sp-keyring",
"sp-keystore",
"sp-tracing",
"thiserror",
"tracing",
]
......@@ -5778,6 +5779,7 @@ dependencies = [
"polkadot-primitives",
"sc-network",
"strum",
"thiserror",
]
[[package]]
......@@ -5926,25 +5928,6 @@ dependencies = [
"thiserror",
]
[[package]]
name = "polkadot-pov-distribution"
version = "0.1.0"
dependencies = [
"assert_matches",
"env_logger 0.8.2",
"futures 0.3.13",
"log",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-core",
"sp-keyring",
"thiserror",
"tracing",
]
[[package]]
name = "polkadot-primitives"
version = "0.8.30"
......@@ -6231,7 +6214,6 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-parachain",
"polkadot-pov-distribution",
"polkadot-primitives",
"polkadot-rpc",
"polkadot-runtime",
......
......@@ -57,7 +57,6 @@ members = [
"node/core/runtime-api",
"node/network/approval-distribution",
"node/network/bridge",
"node/network/pov-distribution",
"node/network/protocol",
"node/network/statement-distribution",
"node/network/bitfield-distribution",
......
......@@ -22,6 +22,7 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.12", features = ["thread-pool"] }
assert_matches = "1.4.0"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
This diff is collapsed.
......@@ -17,20 +17,26 @@
//! Error handling related code and Error/Result definitions.
use polkadot_node_network_protocol::request_response::request::RequestError;
use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::Error as UtilError;
use polkadot_primitives::v1::SessionIndex;
use polkadot_primitives::v1::{CompressedPoVError, SessionIndex};
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};
use crate::LOG_TARGET;
/// Errors of this subsystem.
#[derive(Debug, Error)]
pub enum Error {
#[error("Response channel to obtain QueryChunk failed")]
#[error("Response channel to obtain chunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Response channel to obtain available data failed")]
QueryAvailableDataResponseChannel(#[source] oneshot::Canceled),
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
......@@ -53,24 +59,43 @@ pub enum Error {
/// Sending response failed.
#[error("Sending a request's response failed.")]
SendResponse,
}
/// Error that we should handle gracefully by logging it.
#[derive(Debug)]
pub enum NonFatalError {
/// Some request to utility functions failed.
/// This can be either `RuntimeRequestCanceled` or `RuntimeApiError`.
#[error("Utility request failed")]
UtilRequest(UtilError),
/// Runtime API subsystem is down, which means we're shutting down.
#[error("Runtime request canceled")]
RuntimeRequestCanceled(oneshot::Canceled),
/// Some request to the runtime failed.
/// For example if we prune a block we're requesting info about.
#[error("Runtime API error")]
RuntimeRequest(RuntimeApiError),
/// We tried fetching a session info which was not available.
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),
/// Decompressing PoV failed.
#[error("PoV could not be decompressed")]
PoVDecompression(CompressedPoVError),
/// Fetching PoV failed with `RequestError`.
#[error("FetchPoV request error")]
FetchPoV(#[source] RequestError),
/// Fetching PoV failed as the received PoV did not match the expected hash.
#[error("Fetched PoV does not match expected hash")]
UnexpectedPoV,
#[error("Remote responded with `NoSuchPoV`")]
NoSuchPoV,
/// No validator with the index could be found in current session.
#[error("Given validator index could not be found")]
InvalidValidatorIndex,
}
pub type Result<T> = std::result::Result<T, Error>;
......@@ -87,9 +112,20 @@ pub(crate) async fn recv_runtime<V>(
oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
UtilError,
>,
) -> std::result::Result<V, NonFatalError> {
r.map_err(NonFatalError::UtilRequest)?
) -> std::result::Result<V, Error> {
r.map_err(Error::UtilRequest)?
.await
.map_err(NonFatalError::RuntimeRequestCanceled)?
.map_err(NonFatalError::RuntimeRequest)
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
}
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) {
if let Err(error) = result {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
}
}
......@@ -26,15 +26,23 @@ use polkadot_subsystem::{
/// Error and [`Result`] type for this subsystem.
mod error;
pub use error::Error;
use error::Result;
use error::{Result, log_error};
/// Runtime requests.
mod runtime;
use runtime::Runtime;
/// `Requester` taking care of requesting chunks for candidates pending availability.
mod requester;
use requester::Requester;
/// Handing requests for PoVs during backing.
mod pov_requester;
use pov_requester::PoVRequester;
/// Responding to erasure chunk requests:
mod responder;
use responder::answer_request_log;
use responder::{answer_chunk_request_log, answer_pov_request_log};
/// Cache for session information.
mod session_cache;
......@@ -52,6 +60,8 @@ const LOG_TARGET: &'static str = "parachain::availability-distribution";
pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: SyncCryptoStorePtr,
/// Easy and efficient runtime access for this subsystem.
runtime: Runtime,
/// Prometheus metrics.
metrics: Metrics,
}
......@@ -74,17 +84,20 @@ where
}
impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
let runtime = Runtime::new(keystore.clone());
Self { keystore, runtime, metrics }
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(self, mut ctx: Context) -> Result<()>
async fn run<Context>(mut self, mut ctx: Context) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
let mut requester = Requester::new(self.keystore.clone(), self.metrics.clone()).fuse();
let mut pov_requester = PoVRequester::new();
loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
......@@ -107,14 +120,14 @@ impl AvailabilityDistributionSubsystem {
};
match message {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
// Update the relay chain heads we are fetching our pieces for:
if let Some(e) = requester
.get_mut()
.update_fetching_heads(&mut ctx, update)
.await?
{
tracing::debug!(target: LOG_TARGET, "Error processing ActiveLeavesUpdate: {:?}", e);
}
log_error(
pov_requester.update_connected_validators(&mut ctx, &mut self.runtime, &update).await,
"PoVRequester::update_connected_validators"
);
log_error(
requester.get_mut().update_fetching_heads(&mut ctx, update).await,
"Error in Requester::update_fetching_heads"
);
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => {
......@@ -123,7 +136,34 @@ impl AvailabilityDistributionSubsystem {
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::ChunkFetchingRequest(req),
} => {
answer_request_log(&mut ctx, req, &self.metrics).await
answer_chunk_request_log(&mut ctx, req, &self.metrics).await
}
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::PoVFetchingRequest(req),
} => {
answer_pov_request_log(&mut ctx, req, &self.metrics).await
}
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::FetchPoV {
relay_parent,
from_validator,
candidate_hash,
pov_hash,
tx,
},
} => {
log_error(
pov_requester.fetch_pov(
&mut ctx,
&mut self.runtime,
relay_parent,
from_validator,
candidate_hash,
pov_hash,
tx,
).await,
"PoVRequester::fetch_pov"
);
}
}
}
......
......@@ -24,7 +24,7 @@ pub const SUCCEEDED: &'static str = "succeeded";
/// Label for fail counters.
pub const FAILED: &'static str = "failed";
/// Label for chunks that could not be served, because they were not available.
/// Label for chunks/PoVs that could not be served, because they were not available.
pub const NOT_FOUND: &'static str = "not-found";
/// Availability Distribution metrics.
......@@ -47,6 +47,12 @@ struct MetricsInner {
/// to a chunk request. This includes `NoSuchChunk` responses.
served_chunks: CounterVec<U64>,
/// Number of PoVs served.
///
/// Note: Right now, `Succeeded` gets incremented whenever we were able to successfully respond
/// to a PoV request. This includes `NoSuchPoV` responses.
served_povs: CounterVec<U64>,
/// Number of times our first set of validators did not provide the needed chunk and we had to
/// query further validators.
retries: Counter<U64>,
......@@ -66,12 +72,19 @@ impl Metrics {
}
/// Increment counter on served chunks.
pub fn on_served(&self, label: &'static str) {
pub fn on_served_chunk(&self, label: &'static str) {
if let Some(metrics) = &self.0 {
metrics.served_chunks.with_label_values(&[label]).inc()
}
}
/// Increment counter on served PoVs.
pub fn on_served_pov(&self, label: &'static str) {
if let Some(metrics) = &self.0 {
metrics.served_povs.with_label_values(&[label]).inc()
}
}
/// Increment retry counter.
pub fn on_retry(&self) {
if let Some(metrics) = &self.0 {
......@@ -103,6 +116,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
served_povs: prometheus::register(
CounterVec::new(
Opts::new(
"parachain_served_povs_total",
"Total number of povs served by this backer.",
),
&["success"]
)?,
registry,
)?,
retries: prometheus::register(
Counter::new(
"parachain_fetch_retries_total",
......
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! PoV requester takes care of requesting PoVs from validators of a backing group.
use futures::{FutureExt, channel::{mpsc, oneshot}, future::BoxFuture};
use lru::LruCache;
use polkadot_subsystem::jaeger;
use polkadot_node_network_protocol::{
PeerId, peer_set::PeerSet,
request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse}}
};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateHash, Hash, PoV, SessionIndex, ValidatorIndex
};
use polkadot_subsystem::{
ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
messages::{AllMessages, NetworkBridgeMessage, IfDisconnected}
};
use crate::{error::{Error, log_error}, runtime::{Runtime, ValidatorInfo}};
/// Number of sessions we want to keep in the LRU.
const NUM_SESSIONS: usize = 2;
pub struct PoVRequester {
/// We only ever care about being connected to validators of at most two sessions.
///
/// So we keep an LRU for managing connection requests of size 2.
/// Cache will contain `None` if we are not a validator in that session.
connected_validators: LruCache<SessionIndex, Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>>,
}
impl PoVRequester {
/// Create a new requester for PoVs.
pub fn new() -> Self {
Self {
connected_validators: LruCache::new(NUM_SESSIONS),
}
}
/// Make sure we are connected to the right set of validators.
///
/// On every `ActiveLeavesUpdate`, we check whether we are connected properly to our current
/// validator group.
pub async fn update_connected_validators<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut Runtime,
update: &ActiveLeavesUpdate,
) -> super::Result<()>
where
Context: SubsystemContext,
{
let activated = update.activated.iter().map(|ActivatedLeaf { hash: h, .. }| h);
let activated_sessions =
get_activated_sessions(ctx, runtime, activated).await?;
for (parent, session_index) in activated_sessions {
if self.connected_validators.contains(&session_index) {
continue
}
let rx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?;
self.connected_validators.put(session_index, rx);
}
Ok(())
}
/// Start background worker for taking care of fetching the requested `PoV` from the network.
pub async fn fetch_pov<Context>(
&self,
ctx: &mut Context,
runtime: &mut Runtime,
parent: Hash,
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
pov_hash: Hash,
tx: oneshot::Sender<PoV>
) -> super::Result<()>
where
Context: SubsystemContext,
{
let info = &runtime.get_session_info(ctx, parent).await?.session_info;
let authority_id = info.discovery_keys.get(from_validator.0 as usize)
.ok_or(Error::InvalidValidatorIndex)?
.clone();
let (req, pending_response) = OutgoingRequest::new(
Recipient::Authority(authority_id),
PoVFetchingRequest {
candidate_hash,
},
);
let full_req = Requests::PoVFetching(req);
ctx.send_message(
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
vec![full_req],
// We are supposed to be connected to validators of our group via `PeerSet`,
// but at session boundaries that is kind of racy, in case a connection takes
// longer to get established, so we try to connect in any case.
IfDisconnected::TryConnect
)
)).await;
let span = jaeger::Span::new(candidate_hash, "fetch-pov")
.with_validator_index(from_validator);
ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
.await
.map_err(|e| Error::SpawnTask(e))
}
}
/// Future to be spawned for taking care of handling reception and sending of PoV.
async fn fetch_pov_job(
pov_hash: Hash,
pending_response: BoxFuture<'static, Result<PoVFetchingResponse, RequestError>>,
span: jaeger::Span,
tx: oneshot::Sender<PoV>,
) {
log_error(
do_fetch_pov(pov_hash, pending_response, span, tx).await,
"fetch_pov_job",
)
}
/// Do the actual work of waiting for the response.
async fn do_fetch_pov(
pov_hash: Hash,
pending_response: BoxFuture<'static, Result<PoVFetchingResponse, RequestError>>,
_span: jaeger::Span,
tx: oneshot::Sender<PoV>,
)
-> super::Result<()>
{
let response = pending_response.await.map_err(Error::FetchPoV)?;
let pov = match response {
PoVFetchingResponse::PoV(compressed) => {
compressed.decompress().map_err(Error::PoVDecompression)?
}
PoVFetchingResponse::NoSuchPoV => {
return Err(Error::NoSuchPoV)
}
};
if pov.hash() == pov_hash {
tx.send(pov).map_err(|_| Error::SendResponse)
} else {
Err(Error::UnexpectedPoV)
}
}
/// Get the session indeces for the given relay chain parents.
async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut Runtime, new_heads: impl Iterator<Item = &Hash>)
-> super::Result<impl Iterator<Item = (Hash, SessionIndex)>>
where
Context: SubsystemContext,
{
let mut sessions = Vec::new();
for parent in new_heads {
sessions.push((*parent, runtime.get_session_index(ctx, *parent).await?));
}
Ok(sessions.into_iter())
}
/// Connect to validators of our validator group.
async fn connect_to_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
parent: Hash,
session: SessionIndex
)
-> super::Result<Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>>
where
Context: SubsystemContext,
{
if let Some(validator_ids) = determine_relevant_validators(ctx, runtime, parent, session).await? {
// We don't actually care about `PeerId`s, just keeping receiver so we stay connected:
let (tx, rx) = mpsc::channel(0);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
validator_ids, peer_set: PeerSet::Validation, connected: tx
})).await;
Ok(Some(rx))
} else {
Ok(None)
}
}
/// Get the validators in our validator group.
///
/// Return: `None` if not a validator.
async fn determine_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
parent: Hash,
session: SessionIndex,
)
-> super::Result<Option<Vec<AuthorityDiscoveryId>>>
where
Context: SubsystemContext,
{
let info = runtime.get_session_info_by_index(ctx, parent, session).await?;
if let ValidatorInfo {
our_index: Some(our_index),
our_group: Some(our_group)
} = &info.validator_info {
let indeces = info.session_info.validator_groups.get(our_group.0 as usize)
.expect("Our group got retrieved from that session info, it must exist. qed.")
.clone();
Ok(Some(
indeces.into_iter()
.filter(|i| *i != *our_index)
.map(|i| info.session_info.discovery_keys[i.0 as usize].clone())
.collect()
))
} else {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use futures::{executor, future};
use parity_scale_codec::Encode;
use sp_core::testing::TaskExecutor;
use polkadot_primitives::v1::{BlockData, CandidateHash, CompressedPoV, Hash, ValidatorIndex};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::messages::{AvailabilityDistributionMessage, RuntimeApiMessage, RuntimeApiRequest};
use super::*;
use crate::LOG_TARGET;