Unverified Commit 8fab1d8f authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Further improved availability recovery (#3711)



* WiP.

* Things compile.

* cargo fmt

* Passing tests + fix warnings.

* Metrics for availability recovery.

* Basic test.

* Fix typos and actually check for overflow.

* cargo fmt

* Register metrics.

* More tests.

* Fix warning.

* cargo +nightly fmt

* Fix metrics

* Get rid of unsafe.

* tabify

* spellcheck
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
Co-authored-by: default avatarBastian Köcher <info@kchr.de>
parent cada9124
Pipeline #154508 passed with stages
in 35 minutes and 36 seconds
......@@ -18,6 +18,7 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
[dev-dependencies]
assert_matches = "1.4.0"
......
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! FuturesUndead: A `FuturesUnordered` with support for semi canceled futures. Those undead
//! futures will still get polled, but will not count towards length. So length will only count
//! futures, which are still considered live.
//!
//! Usecase: If futures take longer than we would like them too, we maybe able to request the data
//! from somewhere else as well. We don't really want to cancel the old future, because maybe it
//! was almost done, thus we would have wasted time with our impatience. By simply making them
//! not count towards length, we can make sure to have enough "live" requests ongoing, while at the
//! same time taking advantage of some maybe "late" response from the undead.
//!
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{future::BoxFuture, stream::FuturesUnordered, Future, Stream, StreamExt};
use polkadot_node_subsystem_util::TimeoutExt;
/// FuturesUndead - `FuturesUnordered` with semi canceled (undead) futures.
///
/// Limitations: Keeps track of undead futures by means of a counter, which is limited to 64
/// bits, so after `1.8*10^19` pushed futures, this implementation will panic.
pub struct FuturesUndead<Output> {
/// Actual `FuturesUnordered`.
inner: FuturesUnordered<Undead<Output>>,
/// Next sequence number to assign to the next future that gets pushed.
next_sequence: SequenceNumber,
/// Sequence number of first future considered live.
first_live: Option<SequenceNumber>,
/// How many undead are there right now.
undead: usize,
}
/// All futures get a number, to determine which are live.
#[derive(Eq, PartialEq, Copy, Clone, Debug, PartialOrd)]
struct SequenceNumber(usize);
struct Undead<Output> {
inner: BoxFuture<'static, Output>,
our_sequence: SequenceNumber,
}
impl<Output> FuturesUndead<Output> {
pub fn new() -> Self {
Self {
inner: FuturesUnordered::new(),
next_sequence: SequenceNumber(0),
first_live: None,
undead: 0,
}
}
pub fn push(&mut self, f: BoxFuture<'static, Output>) {
self.inner.push(Undead { inner: f, our_sequence: self.next_sequence });
self.next_sequence.inc();
}
/// Make all contained futures undead.
///
/// They will no longer be counted on a call to `len`.
pub fn soft_cancel(&mut self) {
self.undead = self.inner.len();
self.first_live = Some(self.next_sequence);
}
/// Number of contained futures minus undead.
pub fn len(&self) -> usize {
self.inner.len() - self.undead
}
/// Total number of futures, including undead.
pub fn total_len(&self) -> usize {
self.inner.len()
}
/// Wait for next future to return with timeout.
///
/// When timeout passes, return `None` and make all currently contained futures undead.
pub async fn next_with_timeout(&mut self, timeout: Duration) -> Option<Output> {
match self.next().timeout(timeout).await {
// Timeout:
None => {
self.soft_cancel();
None
},
Some(inner) => inner,
}
}
}
impl<Output> Stream for FuturesUndead<Output> {
type Item = Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.inner.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some((sequence, v))) => {
// Cleanup in case we became completely empty:
if self.inner.len() == 0 {
*self = Self::new();
return Poll::Ready(Some(v))
}
let first_live = match self.first_live {
None => return Poll::Ready(Some(v)),
Some(first_live) => first_live,
};
// An undead came back:
if sequence < first_live {
self.undead = self.undead.saturating_sub(1);
}
Poll::Ready(Some(v))
},
}
}
}
impl SequenceNumber {
pub fn inc(&mut self) {
self.0 = self.0.checked_add(1).expect(
"We don't expect an `UndeadFuture` to live long enough for 2^64 entries ever getting inserted."
);
}
}
impl<T> Future for Undead<T> {
type Output = (SequenceNumber, T);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(v) => Poll::Ready((self.our_sequence, v)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor, pending, FutureExt};
#[test]
fn cancel_sets_len_to_zero() {
let mut undead = FuturesUndead::new();
undead.push((async { () }).boxed());
assert_eq!(undead.len(), 1);
undead.soft_cancel();
assert_eq!(undead.len(), 0);
}
#[test]
fn finished_undead_does_not_change_len() {
executor::block_on(async {
let mut undead = FuturesUndead::new();
undead.push(async { 1_i32 }.boxed());
undead.push(async { 2_i32 }.boxed());
assert_eq!(undead.len(), 2);
undead.soft_cancel();
assert_eq!(undead.len(), 0);
undead.push(
async {
pending!();
0_i32
}
.boxed(),
);
undead.next().await;
assert_eq!(undead.len(), 1);
undead.push(async { 9_i32 }.boxed());
undead.soft_cancel();
assert_eq!(undead.len(), 0);
});
}
#[test]
fn len_stays_correct_when_live_future_ends() {
executor::block_on(async {
let mut undead = FuturesUndead::new();
undead.push(
async {
pending!();
1_i32
}
.boxed(),
);
undead.push(
async {
pending!();
2_i32
}
.boxed(),
);
assert_eq!(undead.len(), 2);
undead.soft_cancel();
assert_eq!(undead.len(), 0);
undead.push(async { 0_i32 }.boxed());
undead.push(async { 1_i32 }.boxed());
undead.next().await;
assert_eq!(undead.len(), 1);
undead.next().await;
assert_eq!(undead.len(), 0);
undead.push(async { 9_i32 }.boxed());
assert_eq!(undead.len(), 1);
});
}
#[test]
fn cleanup_works() {
executor::block_on(async {
let mut undead = FuturesUndead::new();
undead.push(async { 1_i32 }.boxed());
undead.soft_cancel();
undead.push(async { 2_i32 }.boxed());
undead.next().await;
undead.next().await;
assert_eq!(undead.first_live, None);
});
}
}
......@@ -26,7 +26,7 @@ use std::{
use futures::{
channel::oneshot,
future::{BoxFuture, FutureExt, RemoteHandle},
future::{FutureExt, RemoteHandle},
pin_mut,
prelude::*,
stream::FuturesUnordered,
......@@ -36,6 +36,8 @@ use lru::LruCache;
use rand::seq::SliceRandom;
use polkadot_erasure_coding::{branch_hash, branches, obtain_chunks_v1, recovery_threshold};
#[cfg(not(test))]
use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
use polkadot_node_network_protocol::{
request_response::{
self as req_res, incoming, outgoing::RequestError, v1 as request_v1,
......@@ -44,7 +46,7 @@ use polkadot_node_network_protocol::{
IfDisconnected, UnifiedReputationChange as Rep,
};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_node_subsystem_util::{request_session_info, TimeoutExt};
use polkadot_node_subsystem_util::request_session_info;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex,
Hash, HashT, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex,
......@@ -59,6 +61,12 @@ use polkadot_subsystem::{
};
mod error;
mod futures_undead;
mod metrics;
use metrics::Metrics;
use futures_undead::FuturesUndead;
use sc_network::{OutboundFailure, RequestFailure};
#[cfg(test)]
mod tests;
......@@ -73,15 +81,27 @@ const LRU_SIZE: usize = 16;
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
/// Max time we want to wait for responses, before calling `launch_parallel_requests` again to fill
/// up slots.
const MAX_CHUNK_WAIT: Duration = Duration::from_secs(1);
/// Time after which we consider a request to have failed
///
/// and we should try more peers. Note in theory the request times out at the network level,
/// measurements have shown, that in practice requests might actually take longer to fail in
/// certain occasions. (The very least, authority discovery is not part of the timeout.)
///
/// For the time being this value is the same as the timeout on the networking layer, but as this
/// timeout is more soft than the networking one, it might make sense to pick different values as
/// well.
#[cfg(not(test))]
const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
#[cfg(test)]
const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(4);
/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
fast_path: bool,
/// Receiver for available data requests.
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
/// Metrics for this subsystem.
metrics: Metrics,
}
struct RequestFromBackersPhase {
......@@ -91,13 +111,18 @@ struct RequestFromBackersPhase {
}
struct RequestChunksPhase {
// a random shuffling of the validators which indicates the order in which we connect to the validators and
// request the chunk from them.
/// How many request have been unsuccessful so far.
error_count: usize,
/// Total number of responses that have been received.
///
/// including failed ones.
total_received_responses: usize,
/// a random shuffling of the validators which indicates the order in which we connect to the validators and
/// request the chunk from them.
shuffling: VecDeque<ValidatorIndex>,
received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
requesting_chunks: FuturesUnordered<
BoxFuture<'static, Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
>,
/// Pending chunk requests with soft timeout.
requesting_chunks: FuturesUndead<Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
}
struct InteractionParams {
......@@ -115,6 +140,9 @@ struct InteractionParams {
/// The root of the erasure encoding of the para block.
erasure_root: Hash,
/// Metrics to report
metrics: Metrics,
}
enum InteractionPhase {
......@@ -219,16 +247,18 @@ impl RequestChunksPhase {
shuffling.shuffle(&mut rand::thread_rng());
RequestChunksPhase {
error_count: 0,
total_received_responses: 0,
shuffling: shuffling.into(),
received_chunks: HashMap::new(),
requesting_chunks: FuturesUnordered::new(),
requesting_chunks: FuturesUndead::new(),
}
}
fn is_unavailable(&self, params: &InteractionParams) -> bool {
is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.len(),
self.requesting_chunks.total_len(),
self.shuffling.len(),
params.threshold,
)
......@@ -238,13 +268,40 @@ impl RequestChunksPhase {
self.received_chunks.len() >= params.threshold || self.is_unavailable(params)
}
/// Desired number of parallel requests.
///
/// For the given threshold (total required number of chunks) get the desired number of
/// requests we want to have running in parallel at this time.
fn get_desired_request_count(&self, threshold: usize) -> usize {
// Upper bound for parallel requests.
// We want to limit this, so requests can be processed within the timeout and we limit the
// following feedback loop:
// 1. Requests fail due to timeout
// 2. We request more chunks to make up for it
// 3. Bandwidth is spread out even more, so we get even more timeouts
// 4. We request more chunks to make up for it ...
let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
// How many chunks are still needed?
let remaining_chunks = threshold.saturating_sub(self.received_chunks.len());
// What is the current error rate, so we can make up for it?
let inv_error_rate =
self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
// Actual number of requests we want to have in flight in parallel:
std::cmp::min(
max_requests_boundary,
remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
)
}
async fn launch_parallel_requests(
&mut self,
params: &InteractionParams,
sender: &mut impl SubsystemSender,
) {
let max_requests = std::cmp::min(N_PARALLEL, params.threshold);
while self.requesting_chunks.len() < max_requests {
let num_requests = self.get_desired_request_count(params.threshold);
let mut requests = Vec::with_capacity(num_requests - self.requesting_chunks.len());
while self.requesting_chunks.len() < num_requests {
if let Some(validator_index) = self.shuffling.pop_back() {
let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
tracing::trace!(
......@@ -263,18 +320,13 @@ impl RequestChunksPhase {
let (req, res) =
OutgoingRequest::new(Recipient::Authority(validator), raw_request.clone());
requests.push(Requests::ChunkFetching(req));
sender
.send_message(
NetworkBridgeMessage::SendRequests(
vec![Requests::ChunkFetching(req)],
IfDisconnected::TryConnect,
)
.into(),
)
.await;
params.metrics.on_chunk_request_issued();
let timer = params.metrics.time_chunk_request();
self.requesting_chunks.push(Box::pin(async move {
let _timer = timer;
match res.await {
Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) =>
Ok(Some(chunk.recombine_into_chunk(&raw_request))),
......@@ -286,15 +338,25 @@ impl RequestChunksPhase {
break
}
}
sender
.send_message(
NetworkBridgeMessage::SendRequests(requests, IfDisconnected::TryConnect).into(),
)
.await;
}
async fn wait_for_chunks(&mut self, params: &InteractionParams) {
let metrics = &params.metrics;
// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
// We will also stop, if there has not been a response for `MAX_CHUNK_WAIT`, so
// `launch_parallel_requests` cann fill up slots again.
// We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will
// return in that case for `launch_parallel_requests` to fill up slots again.
while let Some(request_result) =
self.requesting_chunks.next().timeout(MAX_CHUNK_WAIT).await.flatten()
self.requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
{
self.total_received_responses += 1;
match request_result {
Ok(Some(chunk)) => {
// Check merkle proofs of any received chunks.
......@@ -309,6 +371,9 @@ impl RequestChunksPhase {
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if erasure_chunk_hash != anticipated_hash {
metrics.on_chunk_request_invalid();
self.error_count += 1;
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
......@@ -316,6 +381,8 @@ impl RequestChunksPhase {
"Merkle proof mismatch",
);
} else {
metrics.on_chunk_request_succeeded();
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
......@@ -325,6 +392,9 @@ impl RequestChunksPhase {
self.received_chunks.insert(validator_index, chunk);
}
} else {
metrics.on_chunk_request_invalid();
self.error_count += 1;
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
......@@ -333,8 +403,13 @@ impl RequestChunksPhase {
);
}
},
Ok(None) => {},
Ok(None) => {
metrics.on_chunk_request_no_such_chunk();
self.error_count += 1;
},
Err((validator_index, e)) => {
self.error_count += 1;
tracing::debug!(
target: LOG_TARGET,
candidate_hash= ?params.candidate_hash,
......@@ -344,8 +419,21 @@ impl RequestChunksPhase {
);
match e {
RequestError::InvalidResponse(_) => {},
RequestError::NetworkError(_) | RequestError::Canceled(_) => {
RequestError::InvalidResponse(_) => {
metrics.on_chunk_request_invalid();
},
RequestError::NetworkError(err) => {
if let RequestFailure::Network(OutboundFailure::Timeout) = err {
metrics.on_chunk_request_timeout();
} else {
metrics.on_chunk_request_error();
}
self.shuffling.push_front(validator_index);
},
RequestError::Canceled(_) => {
metrics.on_chunk_request_error();
self.shuffling.push_front(validator_index);
},
}
......@@ -403,6 +491,7 @@ impl RequestChunksPhase {
erasure_root = ?params.erasure_root,
received = %self.received_chunks.len(),
requesting = %self.requesting_chunks.len(),
total_requesting = %self.requesting_chunks.total_len(),
n_validators = %params.validators.len(),
"Data recovery is not possible",
);
......@@ -653,6 +742,7 @@ async fn launch_interaction<Context>(
receipt: CandidateReceipt,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
metrics: &Metrics,
) -> error::Result<()>
where
Context: SubsystemContext<Message = AvailabilityRecoveryMessage>,
......@@ -666,6 +756,7 @@ where
threshold: recovery_threshold(session_info.validators.len())?,
candidate_hash,
erasure_root: receipt.descriptor.erasure_root,
metrics: metrics.clone(),
};
let phase = backing_group
......@@ -706,6 +797,7 @@ async fn handle_recover<Context>(
session_index: SessionIndex,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
metrics: &Metrics,
) -> error::Result<()>
where
Context: SubsystemContext<Message = AvailabilityRecoveryMessage>,
......@@ -741,8 +833,16 @@ where
let _span = span.child("session-info-ctx-received");
match session_info {
Some(session_info) =>
launch_interaction(state, ctx, session_info, receipt, backing_group, response_sender)
.await,
launch_interaction(
state,
ctx,
session_info,
receipt,
backing_group,
response_sender,
metrics,
)
.await,
None => {
tracing::warn!(target: LOG_TARGET, "SessionInfo is `None` at {:?}", state.live_block);
response_sender
......@@ -770,18 +870,21 @@ where
}
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to request data from backers.
/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
/// request data from backers.
pub fn with_fast_path(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: true, req_receiver }
Self { fast_path: true, req_receiver, metrics }
}
/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
pub fn with_chunks_only(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, req_receiver }
Self { fast_path: false, req_receiver, metrics }
}
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()>
......@@ -790,7 +893,7 @@ impl AvailabilityRecoverySubsystem {
Context: overseer::SubsystemContext<Message = AvailabilityRecoveryMessage>,
{