Unverified Commit 531ee2c6 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Retry availability until the receiver of the request is dropped (#2763)

* guide updates

* keep interactions alive until receivers drop

* retry indefinitely

* cancel approval tasks on finality

* use swap_remove instead of remove
parent e0da1e64
Pipeline #131844 failed with stages
in 22 minutes and 42 seconds
...@@ -53,6 +53,7 @@ use sp_application_crypto::Pair; ...@@ -53,6 +53,7 @@ use sp_application_crypto::Pair;
use kvdb::KeyValueDB; use kvdb::KeyValueDB;
use futures::prelude::*; use futures::prelude::*;
use futures::future::RemoteHandle;
use futures::channel::{mpsc, oneshot}; use futures::channel::{mpsc, oneshot};
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
...@@ -444,6 +445,7 @@ enum Action { ...@@ -444,6 +445,7 @@ enum Action {
WriteCandidateEntry(CandidateHash, CandidateEntry), WriteCandidateEntry(CandidateHash, CandidateEntry),
LaunchApproval { LaunchApproval {
indirect_cert: IndirectAssignmentCert, indirect_cert: IndirectAssignmentCert,
relay_block_number: BlockNumber,
candidate_index: CandidateIndex, candidate_index: CandidateIndex,
session: SessionIndex, session: SessionIndex,
candidate: CandidateReceipt, candidate: CandidateReceipt,
...@@ -452,6 +454,8 @@ enum Action { ...@@ -452,6 +454,8 @@ enum Action {
Conclude, Conclude,
} }
type BackgroundTaskMap = BTreeMap<BlockNumber, Vec<RemoteHandle<()>>>;
async fn run<C>( async fn run<C>(
mut ctx: C, mut ctx: C,
subsystem: ApprovalVotingSubsystem, subsystem: ApprovalVotingSubsystem,
...@@ -472,6 +476,9 @@ async fn run<C>( ...@@ -472,6 +476,9 @@ async fn run<C>(
let mut wakeups = Wakeups::default(); let mut wakeups = Wakeups::default();
// map block numbers to background work.
let mut background_tasks = BTreeMap::new();
let mut last_finalized_height: Option<BlockNumber> = None; let mut last_finalized_height: Option<BlockNumber> = None;
let mut background_rx = background_rx.fuse(); let mut background_rx = background_rx.fuse();
...@@ -489,7 +496,7 @@ async fn run<C>( ...@@ -489,7 +496,7 @@ async fn run<C>(
)? )?
} }
next_msg = ctx.recv().fuse() => { next_msg = ctx.recv().fuse() => {
handle_from_overseer( let actions = handle_from_overseer(
&mut ctx, &mut ctx,
&mut state, &mut state,
&subsystem.metrics, &subsystem.metrics,
...@@ -497,7 +504,13 @@ async fn run<C>( ...@@ -497,7 +504,13 @@ async fn run<C>(
next_msg?, next_msg?,
&mut last_finalized_height, &mut last_finalized_height,
&wakeups, &wakeups,
).await? ).await?;
if let Some(finalized_height) = last_finalized_height {
cleanup_background_tasks(finalized_height, &mut background_tasks);
}
actions
} }
background_request = background_rx.next().fuse() => { background_request = background_rx.next().fuse() => {
if let Some(req) = background_request { if let Some(req) = background_request {
...@@ -519,6 +532,7 @@ async fn run<C>( ...@@ -519,6 +532,7 @@ async fn run<C>(
&mut wakeups, &mut wakeups,
db_writer, db_writer,
&background_tx, &background_tx,
&mut background_tasks,
actions, actions,
).await? { ).await? {
break; break;
...@@ -535,6 +549,7 @@ async fn handle_actions( ...@@ -535,6 +549,7 @@ async fn handle_actions(
wakeups: &mut Wakeups, wakeups: &mut Wakeups,
db: &dyn KeyValueDB, db: &dyn KeyValueDB,
background_tx: &mpsc::Sender<BackgroundRequest>, background_tx: &mpsc::Sender<BackgroundRequest>,
background_tasks: &mut BackgroundTaskMap,
actions: impl IntoIterator<Item = Action>, actions: impl IntoIterator<Item = Action>,
) -> SubsystemResult<bool> { ) -> SubsystemResult<bool> {
let mut transaction = approval_db::v1::Transaction::default(); let mut transaction = approval_db::v1::Transaction::default();
...@@ -555,6 +570,7 @@ async fn handle_actions( ...@@ -555,6 +570,7 @@ async fn handle_actions(
} }
Action::LaunchApproval { Action::LaunchApproval {
indirect_cert, indirect_cert,
relay_block_number,
candidate_index, candidate_index,
session, session,
candidate, candidate,
...@@ -569,7 +585,7 @@ async fn handle_actions( ...@@ -569,7 +585,7 @@ async fn handle_actions(
candidate_index, candidate_index,
).into()); ).into());
launch_approval( let handle = launch_approval(
ctx, ctx,
background_tx.clone(), background_tx.clone(),
session, session,
...@@ -578,7 +594,11 @@ async fn handle_actions( ...@@ -578,7 +594,11 @@ async fn handle_actions(
block_hash, block_hash,
candidate_index as _, candidate_index as _,
backing_group, backing_group,
).await? ).await?;
if let Some(handle) = handle {
background_tasks.entry(relay_block_number).or_default().push(handle);
}
} }
Action::Conclude => { conclude = true; } Action::Conclude => { conclude = true; }
} }
...@@ -594,6 +614,19 @@ async fn handle_actions( ...@@ -594,6 +614,19 @@ async fn handle_actions(
Ok(conclude) Ok(conclude)
} }
// Clean up all background tasks which are no longer needed as they correspond to a
// finalized block.
fn cleanup_background_tasks(
current_finalized_block: BlockNumber,
tasks: &mut BackgroundTaskMap,
) {
let after = tasks.split_off(&(current_finalized_block + 1));
*tasks = after;
// tasks up to the finalized block are dropped, and `RemoteHandle` cancels
// the task on drop.
}
// Handle an incoming signal from the overseer. Returns true if execution should conclude. // Handle an incoming signal from the overseer. Returns true if execution should conclude.
async fn handle_from_overseer( async fn handle_from_overseer(
ctx: &mut impl SubsystemContext, ctx: &mut impl SubsystemContext,
...@@ -1533,6 +1566,7 @@ fn process_wakeup( ...@@ -1533,6 +1566,7 @@ fn process_wakeup(
// sanity: should always be present. // sanity: should always be present.
actions.push(Action::LaunchApproval { actions.push(Action::LaunchApproval {
indirect_cert, indirect_cert,
relay_block_number: block_entry.block_number(),
candidate_index: i as _, candidate_index: i as _,
session: block_entry.session(), session: block_entry.session(),
candidate: candidate_entry.candidate_receipt().clone(), candidate: candidate_entry.candidate_receipt().clone(),
...@@ -1566,6 +1600,9 @@ fn process_wakeup( ...@@ -1566,6 +1600,9 @@ fn process_wakeup(
Ok(actions) Ok(actions)
} }
// Launch approval work, returning an `AbortHandle` which corresponds to the background task
// spawned. When the background work is no longer needed, the `AbortHandle` should be dropped
// to cancel the background work and any requests it has spawned.
async fn launch_approval( async fn launch_approval(
ctx: &mut impl SubsystemContext, ctx: &mut impl SubsystemContext,
mut background_tx: mpsc::Sender<BackgroundRequest>, mut background_tx: mpsc::Sender<BackgroundRequest>,
...@@ -1575,7 +1612,7 @@ async fn launch_approval( ...@@ -1575,7 +1612,7 @@ async fn launch_approval(
block_hash: Hash, block_hash: Hash,
candidate_index: usize, candidate_index: usize,
backing_group: GroupIndex, backing_group: GroupIndex,
) -> SubsystemResult<()> { ) -> SubsystemResult<Option<RemoteHandle<()>>> {
let (a_tx, a_rx) = oneshot::channel(); let (a_tx, a_rx) = oneshot::channel();
let (code_tx, code_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel();
let (context_num_tx, context_num_rx) = oneshot::channel(); let (context_num_tx, context_num_rx) = oneshot::channel();
...@@ -1610,7 +1647,7 @@ async fn launch_approval( ...@@ -1610,7 +1647,7 @@ async fn launch_approval(
candidate.descriptor.relay_parent, candidate.descriptor.relay_parent,
); );
return Ok(()); return Ok(None);
} }
}; };
...@@ -1719,7 +1756,10 @@ async fn launch_approval( ...@@ -1719,7 +1756,10 @@ async fn launch_approval(
} }
}; };
ctx.spawn("approval-checks", Box::pin(background)).await let (background, remote_handle) = background.remote_handle();
ctx.spawn("approval-checks", Box::pin(background))
.await
.map(move |()| Some(remote_handle))
} }
// Issue and import a local approval vote. Should only be invoked after approval checks // Issue and import a local approval vote. Should only be invoked after approval checks
......
...@@ -397,6 +397,11 @@ impl BlockEntry { ...@@ -397,6 +397,11 @@ impl BlockEntry {
pub fn candidates(&self) -> &[(CoreIndex, CandidateHash)] { pub fn candidates(&self) -> &[(CoreIndex, CandidateHash)] {
&self.candidates &self.candidates
} }
/// Access the block number of the block entry.
pub fn block_number(&self) -> BlockNumber {
self.block_number
}
} }
impl From<crate::approval_db::v1::BlockEntry> for BlockEntry { impl From<crate::approval_db::v1::BlockEntry> for BlockEntry {
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
//! The `Error` and `Result` types used by the subsystem. //! The `Error` and `Result` types used by the subsystem.
use futures::channel::{mpsc, oneshot}; use futures::channel::oneshot;
use thiserror::Error; use thiserror::Error;
/// Error type used by the Availability Recovery subsystem. /// Error type used by the Availability Recovery subsystem.
...@@ -34,9 +34,6 @@ pub enum Error { ...@@ -34,9 +34,6 @@ pub enum Error {
#[error("failed to send response")] #[error("failed to send response")]
CanceledResponseSender, CanceledResponseSender,
#[error("to_state channel is closed")]
ClosedToState(#[source] mpsc::SendError),
#[error(transparent)] #[error(transparent)]
Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError), Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError),
......
...@@ -19,9 +19,11 @@ ...@@ -19,9 +19,11 @@
#![warn(missing_docs)] #![warn(missing_docs)]
use std::collections::HashMap; use std::collections::HashMap;
use std::pin::Pin;
use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered}; use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
use futures::future::BoxFuture; use futures::future::{BoxFuture, RemoteHandle, FutureExt};
use futures::task::{Context, Poll};
use lru::LruCache; use lru::LruCache;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
...@@ -33,7 +35,7 @@ use polkadot_primitives::v1::{ ...@@ -33,7 +35,7 @@ use polkadot_primitives::v1::{
use polkadot_node_primitives::{ErasureChunk, AvailableData}; use polkadot_node_primitives::{ErasureChunk, AvailableData};
use polkadot_subsystem::{ use polkadot_subsystem::{
SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer, SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
OverseerSignal, ActiveLeavesUpdate, OverseerSignal, ActiveLeavesUpdate, SubsystemSender,
errors::RecoveryError, errors::RecoveryError,
jaeger, jaeger,
messages::{ messages::{
...@@ -67,21 +69,6 @@ pub struct AvailabilityRecoverySubsystem { ...@@ -67,21 +69,6 @@ pub struct AvailabilityRecoverySubsystem {
fast_path: bool, fast_path: bool,
} }
/// Accumulate all awaiting sides for some particular `AvailableData`.
struct InteractionHandle {
awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
}
/// A message received by main code from an async `Interaction` task.
#[derive(Debug)]
enum FromInteraction {
/// An interaction concluded.
Concluded(CandidateHash, Result<AvailableData, RecoveryError>),
/// Send a request on the network service.
NetworkRequest(Requests),
}
struct RequestFromBackersPhase { struct RequestFromBackersPhase {
// a random shuffling of the validators from the backing group which indicates the order // a random shuffling of the validators from the backing group which indicates the order
// in which we connect to them and request the chunk. // in which we connect to them and request the chunk.
...@@ -95,7 +82,7 @@ struct RequestChunksPhase { ...@@ -95,7 +82,7 @@ struct RequestChunksPhase {
received_chunks: HashMap<ValidatorIndex, ErasureChunk>, received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
requesting_chunks: FuturesUnordered<BoxFuture< requesting_chunks: FuturesUnordered<BoxFuture<
'static, 'static,
Result<Option<ErasureChunk>, RequestError>>, Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
>, >,
} }
...@@ -122,9 +109,8 @@ enum InteractionPhase { ...@@ -122,9 +109,8 @@ enum InteractionPhase {
} }
/// A state of a single interaction reconstructing an available data. /// A state of a single interaction reconstructing an available data.
struct Interaction { struct Interaction<S> {
/// A communication channel with the `State`. sender: S,
to_state: mpsc::Sender<FromInteraction>,
/// The parameters of the interaction. /// The parameters of the interaction.
params: InteractionParams, params: InteractionParams,
...@@ -142,13 +128,12 @@ impl RequestFromBackersPhase { ...@@ -142,13 +128,12 @@ impl RequestFromBackersPhase {
} }
} }
// Run this phase to completion, returning `true` if data was successfully recovered and // Run this phase to completion.
// false otherwise.
async fn run( async fn run(
&mut self, &mut self,
params: &InteractionParams, params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction> sender: &mut impl SubsystemSender,
) -> Result<bool, mpsc::SendError> { ) -> Result<AvailableData, RecoveryError> {
tracing::trace!( tracing::trace!(
target: LOG_TARGET, target: LOG_TARGET,
candidate_hash = ?params.candidate_hash, candidate_hash = ?params.candidate_hash,
...@@ -158,7 +143,7 @@ impl RequestFromBackersPhase { ...@@ -158,7 +143,7 @@ impl RequestFromBackersPhase {
loop { loop {
// Pop the next backer, and proceed to next phase if we're out. // Pop the next backer, and proceed to next phase if we're out.
let validator_index = match self.shuffled_backers.pop() { let validator_index = match self.shuffled_backers.pop() {
None => return Ok(false), None => return Err(RecoveryError::Unavailable),
Some(i) => i, Some(i) => i,
}; };
...@@ -168,21 +153,21 @@ impl RequestFromBackersPhase { ...@@ -168,21 +153,21 @@ impl RequestFromBackersPhase {
req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash }, req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash },
); );
to_state.send(FromInteraction::NetworkRequest(Requests::AvailableDataFetching(req))).await?; sender.send_message(NetworkBridgeMessage::SendRequests(
vec![Requests::AvailableDataFetching(req)],
IfDisconnected::TryConnect,
).into()).await;
match res.await { match res.await {
Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) { if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
to_state.send(
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
).await?;
tracing::trace!( tracing::trace!(
target: LOG_TARGET, target: LOG_TARGET,
candidate_hash = ?params.candidate_hash, candidate_hash = ?params.candidate_hash,
"Received full data", "Received full data",
); );
return Ok(true);
return Ok(data);
} else { } else {
tracing::debug!( tracing::debug!(
target: LOG_TARGET, target: LOG_TARGET,
...@@ -222,8 +207,8 @@ impl RequestChunksPhase { ...@@ -222,8 +207,8 @@ impl RequestChunksPhase {
async fn launch_parallel_requests( async fn launch_parallel_requests(
&mut self, &mut self,
params: &InteractionParams, params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>, sender: &mut impl SubsystemSender,
) -> Result<(), mpsc::SendError> { ) {
let max_requests = std::cmp::min(N_PARALLEL, params.threshold); let max_requests = std::cmp::min(N_PARALLEL, params.threshold);
while self.requesting_chunks.len() < max_requests { while self.requesting_chunks.len() < max_requests {
if let Some(validator_index) = self.shuffling.pop() { if let Some(validator_index) = self.shuffling.pop() {
...@@ -247,39 +232,36 @@ impl RequestChunksPhase { ...@@ -247,39 +232,36 @@ impl RequestChunksPhase {
raw_request.clone(), raw_request.clone(),
); );
to_state.send(FromInteraction::NetworkRequest(Requests::ChunkFetching(req))).await?; sender.send_message(NetworkBridgeMessage::SendRequests(
vec![Requests::ChunkFetching(req)],
IfDisconnected::TryConnect,
).into()).await;
self.requesting_chunks.push(Box::pin(async move { self.requesting_chunks.push(Box::pin(async move {
match res.await { match res.await {
Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk))
=> Ok(Some(chunk.recombine_into_chunk(&raw_request))), => Ok(Some(chunk.recombine_into_chunk(&raw_request))),
Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None), Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None),
Err(e) => Err(e), Err(e) => Err((validator_index, e)),
} }
})); }));
} else { } else {
break; break;
} }
} }
Ok(())
} }
async fn wait_for_chunks( async fn wait_for_chunks(
&mut self, &mut self,
params: &InteractionParams, params: &InteractionParams,
) -> Result<(), mpsc::SendError> { ) {
// Check if the requesting chunks is not empty not to poll to completion.
if self.requesting_chunks.is_empty() {
return Ok(());
}
// Poll for new updates from requesting_chunks. // Poll for new updates from requesting_chunks.
while let Some(request_result) = self.requesting_chunks.next().await { while let Poll::Ready(Some(request_result))
= futures::poll!(self.requesting_chunks.next())
{
match request_result { match request_result {
Ok(Some(chunk)) => { Ok(Some(chunk)) => {
// Check merkle proofs of any received chunks, and any failures should // Check merkle proofs of any received chunks.
// lead to issuance of a FromInteraction::ReportPeer message.
let validator_index = chunk.index; let validator_index = chunk.index;
...@@ -313,24 +295,30 @@ impl RequestChunksPhase { ...@@ -313,24 +295,30 @@ impl RequestChunksPhase {
} }
} }
Ok(None) => {} Ok(None) => {}
Err(e) => { Err((validator_index, e)) => {
tracing::debug!( tracing::debug!(
target: LOG_TARGET, target: LOG_TARGET,
err = ?e, err = ?e,
?validator_index,
"Failure requesting chunk", "Failure requesting chunk",
); );
match e {
RequestError::InvalidResponse(_) => {}
RequestError::NetworkError(_) | RequestError::Canceled(_) => {
self.shuffling.push(validator_index);
}
}
} }
} }
} }
Ok(())
} }
async fn run( async fn run(
&mut self, &mut self,
params: &InteractionParams, params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>, sender: &mut impl SubsystemSender,
) -> Result<(), mpsc::SendError> { ) -> Result<AvailableData, RecoveryError> {
loop { loop {
if is_unavailable( if is_unavailable(
self.received_chunks.len(), self.received_chunks.len(),
...@@ -347,23 +335,18 @@ impl RequestChunksPhase { ...@@ -347,23 +335,18 @@ impl RequestChunksPhase {
n_validators = %params.validators.len(), n_validators = %params.validators.len(),
"Data recovery is not possible", "Data recovery is not possible",
); );
to_state.send(FromInteraction::Concluded(
params.candidate_hash,
Err(RecoveryError::Unavailable),
)).await?;
return Ok(()); return Err(RecoveryError::Unavailable);
} }