From 90a686266fc7284b6ac1716ba7038d6be2f0babe Mon Sep 17 00:00:00 2001 From: Fedor Sakharov <fedor.sakharov@gmail.com> Date: Fri, 15 Jan 2021 05:06:25 +0300 Subject: [PATCH] Availability recovery subsystem (#2122) * Adds message types * Add code skeleton * Adds subsystem code. * Adds a first test * Adds interaction result to availability_lru * Use LruCache instead of a HashMap * Whitespaces to tabs * Do not ignore errors * Change error type * Add a timeout to chunk requests * Add custom errors and log them * Adds replace_availability_recovery method * recovery_threshold computed by erasure crate * change core to std * adds docs to error type * Adds a test for invalid reconstruction * refactors interaction run into multiple methods * Cleanup AwaitedChunks * Even more fixes * Test that recovery with wrong root is an error * Break to launch another requests * Styling fixes * Add SessionIndex to API * Proper relay parents for MakeRequest * Remove validator_discovery and use message * Remove a stream on exhaustion * On cleanup free the request streams * Fix merge and refactor --- polkadot/Cargo.lock | 28 + polkadot/Cargo.toml | 1 + polkadot/erasure-coding/src/lib.rs | 1 + .../network/availability-recovery/Cargo.toml | 34 + .../availability-recovery/src/error.rs | 50 + .../network/availability-recovery/src/lib.rs | 854 ++++++++++++++++++ .../availability-recovery/src/tests.rs | 565 ++++++++++++ polkadot/node/network/protocol/src/lib.rs | 15 +- polkadot/node/overseer/src/lib.rs | 119 ++- polkadot/node/service/Cargo.toml | 2 + polkadot/node/service/src/lib.rs | 3 + polkadot/node/subsystem/src/errors.rs | 18 + polkadot/node/subsystem/src/messages.rs | 15 + 13 files changed, 1682 insertions(+), 23 deletions(-) create mode 100644 polkadot/node/network/availability-recovery/Cargo.toml create mode 100644 polkadot/node/network/availability-recovery/src/error.rs create mode 100644 polkadot/node/network/availability-recovery/src/lib.rs create mode 100644 polkadot/node/network/availability-recovery/src/tests.rs diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 3be3cf423f0..2a7c5d6b9f9 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4942,6 +4942,33 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "polkadot-availability-recovery" +version = "0.1.0" +dependencies = [ + "assert_matches", + "env_logger 0.8.2", + "futures 0.3.10", + "futures-timer 3.0.2", + "log", + "lru", + "polkadot-erasure-coding", + "polkadot-node-network-protocol", + "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", + "polkadot-primitives", + "rand 0.7.3", + "smallvec 1.6.1", + "sp-application-crypto", + "sp-core", + "sp-keyring", + "streamunordered", + "thiserror", + "tracing", + "tracing-futures", +] + [[package]] name = "polkadot-cli" version = "0.8.27" @@ -5650,6 +5677,7 @@ dependencies = [ "pallet-transaction-payment-rpc-runtime-api", "polkadot-availability-bitfield-distribution", "polkadot-availability-distribution", + "polkadot-availability-recovery", "polkadot-collator-protocol", "polkadot-network-bridge", "polkadot-node-collation-generation", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 2d771ceeb4d..5bc47fc78db 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -57,6 +57,7 @@ members = [ "node/network/statement-distribution", "node/network/bitfield-distribution", "node/network/availability-distribution", + "node/network/availability-recovery", "node/network/collator-protocol", "node/overseer", "node/primitives", diff --git a/polkadot/erasure-coding/src/lib.rs b/polkadot/erasure-coding/src/lib.rs index 2b335a81688..0b2b5fd592d 100644 --- a/polkadot/erasure-coding/src/lib.rs +++ b/polkadot/erasure-coding/src/lib.rs @@ -119,6 +119,7 @@ impl CodeParams { .expect("this struct is not created with invalid shard number; qed") } } + /// Returns the maximum number of allowed, faulty chunks /// which does not prevent recovery given all other pieces /// are correct. diff --git a/polkadot/node/network/availability-recovery/Cargo.toml b/polkadot/node/network/availability-recovery/Cargo.toml new file mode 100644 index 00000000000..25389f54c5c --- /dev/null +++ b/polkadot/node/network/availability-recovery/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "polkadot-availability-recovery" +version = "0.1.0" +authors = ["Parity Technologies <admin@parity.io>"] +edition = "2018" + +[dependencies] +futures = "0.3.8" +lru = "0.6.1" +rand = "0.7.3" +thiserror = "1.0.21" +tracing = "0.1.22" +tracing-futures = "0.2.4" + +polkadot-erasure-coding = { path = "../../../erasure-coding" } +polkadot-primitives = { path = "../../../primitives" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } +polkadot-node-network-protocol = { path = "../../network/protocol" } +futures-timer = "3.0.2" +streamunordered = "0.5.1" + +[dev-dependencies] +assert_matches = "1.4.0" +env_logger = "0.8.1" +futures-timer = "3.0.2" +log = "0.4.11" +smallvec = "1.5.1" + +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } + +polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/network/availability-recovery/src/error.rs b/polkadot/node/network/availability-recovery/src/error.rs new file mode 100644 index 00000000000..70545020dce --- /dev/null +++ b/polkadot/node/network/availability-recovery/src/error.rs @@ -0,0 +1,50 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! The `Error` and `Result` types used by the subsystem. + +use futures::channel::{mpsc, oneshot}; +use thiserror::Error; + +/// Error type used by the Availability Recovery subsystem. +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + Subsystem(#[from] polkadot_subsystem::SubsystemError), + + #[error("failed to query a chunk from store")] + CanceledQueryChunk(#[source] oneshot::Canceled), + + #[error("failed to query session info")] + CanceledSessionInfo(#[source] oneshot::Canceled), + + #[error("failed to send response")] + CanceledResponseSender, + + #[error("to_state channel is closed")] + ClosedToState(#[source] mpsc::SendError), + + #[error(transparent)] + Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError), + + #[error(transparent)] + Erasure(#[from] polkadot_erasure_coding::Error), + + #[error(transparent)] + Util(#[from] polkadot_node_subsystem_util::Error), +} + +pub type Result<T> = std::result::Result<T, Error>; diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs new file mode 100644 index 00000000000..d8ee040d5cd --- /dev/null +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -0,0 +1,854 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! Availability Recovery Subsystem of Polkadot. + +#![warn(missing_docs)] + +use std::collections::HashMap; +use std::time::Duration; +use std::pin::Pin; + +use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered}; +use futures_timer::Delay; +use lru::LruCache; +use rand::{seq::SliceRandom, thread_rng}; +use streamunordered::{StreamUnordered, StreamYield}; + +use polkadot_primitives::v1::{ + AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash, + Hash, ErasureChunk, ValidatorId, ValidatorIndex, + SessionInfo, SessionIndex, BlakeTwo256, HashT, +}; +use polkadot_subsystem::{ + SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer, + OverseerSignal, ActiveLeavesUpdate, + errors::RecoveryError, + messages::{ + AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage, + }, +}; +use polkadot_node_network_protocol::{ + v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, RequestId, +}; +use polkadot_node_subsystem_util::{ + Timeout, TimeoutExt, + request_session_info_ctx, +}; +use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1}; +mod error; + +#[cfg(test)] +mod tests; + +const LOG_TARGET: &str = "availability_recovery"; + +const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid"); +const COST_UNEXPECTED_CHUNK: Rep = Rep::new(-100, "Peer has sent an unexpected chunk"); + +// How many parallel requests interaction should have going at once. +const N_PARALLEL: usize = 50; + +// Size of the LRU cache where we keep recovered data. +const LRU_SIZE: usize = 16; + +// A timeout for a chunk request. +const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(3); + +// A period to poll and clean AwaitedChunks. +const AWAITED_CHUNKS_CLEANUP_INTERVAL: Duration = Duration::from_secs(1); + +/// The Availability Recovery Subsystem. +pub struct AvailabilityRecoverySubsystem; + +type ChunkResponse = Result<(PeerId, ErasureChunk), RecoveryError>; + +/// Data we keep around for every chunk that we are awaiting. +struct AwaitedChunk { + /// Index of the validator we have requested this chunk from. + validator_index: ValidatorIndex, + + /// The hash of the candidate the chunks belongs to. + candidate_hash: CandidateHash, + + /// Token to cancel the connection request to the validator. + token: usize, + + /// Result sender. + response: oneshot::Sender<ChunkResponse>, +} + +/// Accumulate all awaiting sides for some particular `AvailableData`. +struct InteractionHandle { + awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>, +} + +/// A message received by main code from an async `Interaction` task. +#[derive(Debug)] +enum FromInteraction { + /// An interaction concluded. + Concluded(CandidateHash, Result<AvailableData, RecoveryError>), + + /// Make a request of a particular chunk from a particular validator. + MakeRequest( + AuthorityDiscoveryId, + CandidateHash, + ValidatorIndex, + oneshot::Sender<ChunkResponse>, + ), + + /// Report a peer. + ReportPeer( + PeerId, + Rep, + ), +} + +/// A state of a single interaction reconstructing an available data. +struct Interaction { + /// A communication channel with the `State`. + to_state: mpsc::Sender<FromInteraction>, + + /// Discovery ids of `validators`. + validator_authority_keys: Vec<AuthorityDiscoveryId>, + + /// Validators relevant to this `Interaction`. + validators: Vec<ValidatorId>, + + /// A random shuffling of the validators which indicates the order in which we connect + /// to the validators and request the chunk from them. + shuffling: Vec<ValidatorIndex>, + + /// The number of pieces needed. + threshold: usize, + + /// A hash of the relevant candidate. + candidate_hash: CandidateHash, + + /// The root of the erasure encoding of the para block. + erasure_root: Hash, + + /// The chunks that we have received from peers. + received_chunks: HashMap<PeerId, ErasureChunk>, + + /// The chunk requests that are waiting to complete. + requesting_chunks: FuturesUnordered<Timeout<oneshot::Receiver<ChunkResponse>>>, +} + +const fn is_unavailable( + received_chunks: usize, + requesting_chunks: usize, + n_validators: usize, + threshold: usize, +) -> bool { + received_chunks + requesting_chunks + n_validators < threshold +} + +impl Interaction { + async fn launch_parallel_requests(&mut self) -> error::Result<()> { + while self.requesting_chunks.len() < N_PARALLEL { + if let Some(validator_index) = self.shuffling.pop() { + let (tx, rx) = oneshot::channel(); + + self.to_state.send(FromInteraction::MakeRequest( + self.validator_authority_keys[validator_index as usize].clone(), + self.candidate_hash.clone(), + validator_index, + tx, + )).await.map_err(error::Error::ClosedToState)?; + + self.requesting_chunks.push(rx.timeout(CHUNK_REQUEST_TIMEOUT)); + } else { + break; + } + } + + Ok(()) + } + + async fn wait_for_chunks(&mut self) -> error::Result<()> { + // Check if the requesting chunks is not empty not to poll to completion. + if self.requesting_chunks.is_empty() { + return Ok(()); + } + + // Poll for new updates from requesting_chunks. + while let Some(request_result) = self.requesting_chunks.next().await { + match request_result { + Some(Ok(Ok((peer_id, chunk)))) => { + // Check merkle proofs of any received chunks, and any failures should + // lead to issuance of a FromInteraction::ReportPeer message. + if let Ok(anticipated_hash) = branch_hash( + &self.erasure_root, + &chunk.proof, + chunk.index as usize, + ) { + let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk); + + if erasure_chunk_hash != anticipated_hash { + self.to_state.send(FromInteraction::ReportPeer( + peer_id.clone(), + COST_MERKLE_PROOF_INVALID, + )).await.map_err(error::Error::ClosedToState)?; + } + } else { + self.to_state.send(FromInteraction::ReportPeer( + peer_id.clone(), + COST_MERKLE_PROOF_INVALID, + )).await.map_err(error::Error::ClosedToState)?; + } + + self.received_chunks.insert(peer_id, chunk); + } + Some(Err(e)) => { + tracing::debug!( + target: LOG_TARGET, + err = ?e, + "A response channel was cacelled while waiting for a chunk", + ); + } + Some(Ok(Err(e))) => { + tracing::debug!( + target: LOG_TARGET, + err = ?e, + "A chunk request ended with an error", + ); + } + None => { + tracing::debug!( + target: LOG_TARGET, + "A chunk request has timed out", + ); + // we break here to launch another request. + break; + } + } + } + + Ok(()) + } + + async fn run(mut self) -> error::Result<()> { + loop { + if is_unavailable( + self.received_chunks.len(), + self.requesting_chunks.len(), + self.shuffling.len(), + self.threshold, + ) { + self.to_state.send(FromInteraction::Concluded( + self.candidate_hash, + Err(RecoveryError::Unavailable), + )).await.map_err(error::Error::ClosedToState)?; + + return Ok(()); + } + + self.launch_parallel_requests().await?; + + self.wait_for_chunks().await?; + + // If received_chunks has more than threshold entries, attempt to recover the data. + // If that fails, or a re-encoding of it doesn't match the expected erasure root, + // break and issue a FromInteraction::Concluded(RecoveryError::Invalid). + // Otherwise, issue a FromInteraction::Concluded(Ok(())). + if self.received_chunks.len() >= self.threshold { + let concluded = match polkadot_erasure_coding::reconstruct_v1( + self.validators.len(), + self.received_chunks.values().map(|c| (&c.chunk[..], c.index as usize)), + ) { + Ok(data) => { + if reconstructed_data_matches_root(self.validators.len(), &self.erasure_root, &data) { + FromInteraction::Concluded(self.candidate_hash.clone(), Ok(data)) + } else { + FromInteraction::Concluded( + self.candidate_hash.clone(), + Err(RecoveryError::Invalid), + ) + } + } + Err(_) => FromInteraction::Concluded( + self.candidate_hash.clone(), + Err(RecoveryError::Invalid), + ), + }; + + self.to_state.send(concluded).await.map_err(error::Error::ClosedToState)?; + return Ok(()); + } + } + } +} + +fn reconstructed_data_matches_root( + n_validators: usize, + expected_root: &Hash, + data: &AvailableData, +) -> bool { + let chunks = match obtain_chunks_v1(n_validators, data) { + Ok(chunks) => chunks, + Err(e) => { + tracing::debug!( + target: LOG_TARGET, + err = ?e, + "Failed to obtain chunks", + ); + return false; + } + }; + + let branches = branches(&chunks); + + branches.root() == *expected_root +} + +struct State { + /// Each interaction is implemented as its own async task, + /// and these handles are for communicating with them. + interactions: HashMap<CandidateHash, InteractionHandle>, + + /// A recent block hash for which state should be available. + live_block_hash: Hash, + + /// We are waiting for these validators to connect and as soon as they + /// do to request the needed chunks we are awaitinf for. + discovering_validators: HashMap<AuthorityDiscoveryId, Vec<AwaitedChunk>>, + + /// Requests that we have issued to the already connected validators + /// about the chunks we are interested in. + live_chunk_requests: HashMap<RequestId, (PeerId, AwaitedChunk)>, + + /// Derive request ids from this. + next_request_id: RequestId, + + connecting_validators: StreamUnordered<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>, + + /// interaction communication. This is cloned and given to interactions that are spun up. + from_interaction_tx: mpsc::Sender<FromInteraction>, + + /// receiver for messages from interactions. + from_interaction_rx: mpsc::Receiver<FromInteraction>, + + /// An LRU cache of recently recovered data. + availability_lru: LruCache<CandidateHash, Result<AvailableData, RecoveryError>>, +} + +impl Default for State { + fn default() -> Self { + let (from_interaction_tx, from_interaction_rx) = mpsc::channel(16); + + Self { + from_interaction_tx, + from_interaction_rx, + interactions: HashMap::new(), + live_block_hash: Hash::default(), + discovering_validators: HashMap::new(), + live_chunk_requests: HashMap::new(), + next_request_id: 0, + connecting_validators: StreamUnordered::new(), + availability_lru: LruCache::new(LRU_SIZE), + } + } +} + +impl<C> Subsystem<C> for AvailabilityRecoverySubsystem + where C: SubsystemContext<Message = AvailabilityRecoveryMessage> +{ + fn start(self, ctx: C) -> SpawnedSubsystem { + let future = self.run(ctx) + .map_err(|e| SubsystemError::with_origin("availability-recovery", e)) + .boxed(); + SpawnedSubsystem { + name: "availability-recovery-subsystem", + future, + } + } +} + +/// Handles a signal from the overseer. +async fn handle_signal( + state: &mut State, + signal: OverseerSignal, +) -> SubsystemResult<bool> { + match signal { + OverseerSignal::Conclude => Ok(true), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => { + // if activated is non-empty, set state.live_block_hash to the first block in Activated. + if let Some(hash) = activated.get(0) { + state.live_block_hash = hash.0; + } + + Ok(false) + } + OverseerSignal::BlockFinalized(_, _) => Ok(false) + } +} + +/// Report a reputation change for a peer. +async fn report_peer( + ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>, + peer: PeerId, + rep: Rep, +) { + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await; +} + +/// Machinery around launching interactions into the background. +#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +async fn launch_interaction( + state: &mut State, + ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>, + session_index: SessionIndex, + session_info: SessionInfo, + receipt: CandidateReceipt, + response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>, +) -> error::Result<()> { + let threshold = recovery_threshold(session_info.validators.len())?; + let to_state = state.from_interaction_tx.clone(); + let candidate_hash = receipt.hash(); + let erasure_root = receipt.descriptor.erasure_root; + let validators = session_info.validators.clone(); + let validator_authority_keys = session_info.discovery_keys.clone(); + let mut shuffling: Vec<_> = (0..validators.len() as ValidatorIndex).collect(); + + state.interactions.insert( + candidate_hash.clone(), + InteractionHandle { + awaiting: vec![response_sender], + } + ); + + { + // make borrow checker happy. + let mut rng = thread_rng(); + shuffling.shuffle(&mut rng); + } + + let interaction = Interaction { + to_state, + validator_authority_keys, + validators, + shuffling, + threshold, + candidate_hash, + erasure_root, + received_chunks: HashMap::new(), + requesting_chunks: FuturesUnordered::new(), + }; + + let future = async move { + if let Err(e) = interaction.run().await { + tracing::debug!( + target: LOG_TARGET, + err = ?e, + "Interaction finished with an error", + ); + } + }.boxed(); + + if let Err(e) = ctx.spawn("recovery interaction", future).await { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Failed to spawn a recovery interaction task", + ); + } + + Ok(()) +} + +/// Handles an availability recovery request. +#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +async fn handle_recover( + state: &mut State, + ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>, + receipt: CandidateReceipt, + session_index: SessionIndex, + response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>, +) -> error::Result<()> { + let candidate_hash = receipt.hash(); + + if let Some(result) = state.availability_lru.get(&candidate_hash) { + if let Err(e) = response_sender.send(result.clone()) { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Error responding with an availability recovery result", + ); + } + return Ok(()); + } + + if let Some(interaction) = state.interactions.get_mut(&candidate_hash) { + interaction.awaiting.push(response_sender); + return Ok(()); + } + + let session_info = request_session_info_ctx( + state.live_block_hash, + session_index, + ctx, + ).await?.await.map_err(error::Error::CanceledSessionInfo)??; + + match session_info { + Some(session_info) => { + launch_interaction( + state, + ctx, + session_index, + session_info, + receipt, + response_sender, + ).await + } + None => { + tracing::warn!( + target: LOG_TARGET, + "SessionInfo is `None` at {}", state.live_block_hash, + ); + response_sender + .send(Err(RecoveryError::Unavailable)) + .map_err(|_| error::Error::CanceledResponseSender)?; + Ok(()) + } + } +} + +/// Queries a chunk from av-store. +#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] +async fn query_chunk( + ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>, + candidate_hash: CandidateHash, + validator_index: ValidatorIndex, +) -> error::Result<Option<ErasureChunk>> { + let (tx, rx) = oneshot::channel(); + ctx.send_message(AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx), + )).await; + + Ok(rx.await.map_err(error::Error::CanceledQueryChunk)?) +} + +/// Handles message from interaction. +#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +async fn handle_from_interaction( + state: &mut State, + ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>, + from_interaction: FromInteraction, +) -> error::Result<()> { + match from_interaction { + FromInteraction::Concluded(candidate_hash, result) => { + // Load the entry from the interactions map. + // It should always exist, if not for logic errors. + if let Some(interaction) = state.interactions.remove(&candidate_hash) { + // Send the result to each member of awaiting. + for awaiting in interaction.awaiting { + if let Err(_) = awaiting.send(result.clone()) { + tracing::debug!( + target: LOG_TARGET, + "An awaiting side of the interaction has been canceled", + ); + } + } + } else { + tracing::warn!( + target: LOG_TARGET, + "Interaction under candidate hash {} is missing", + candidate_hash, + ); + } + + state.availability_lru.put(candidate_hash, result); + } + FromInteraction::MakeRequest(id, candidate_hash, validator_index, response) => { + let (tx, rx) = mpsc::channel(2); + + let message = NetworkBridgeMessage::ConnectToValidators { + validator_ids: vec![id.clone()], + connected: tx, + }; + + ctx.send_message(AllMessages::NetworkBridge(message)).await; + + let token = state.connecting_validators.push(rx); + + state.discovering_validators.entry(id).or_default().push(AwaitedChunk { + validator_index, + candidate_hash, + token, + response, + }); + } + FromInteraction::ReportPeer(peer_id, rep) => { + report_peer(ctx, peer_id, rep).await; + } + } + + Ok(()) +} + +/// Handles a network bridge update. +#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +async fn handle_network_update( + state: &mut State, + ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>, + update: NetworkBridgeEvent<protocol_v1::AvailabilityRecoveryMessage>, +) -> error::Result<()> { + match update { + NetworkBridgeEvent::PeerMessage(peer, message) => { + match message { + protocol_v1::AvailabilityRecoveryMessage::RequestChunk( + request_id, + candidate_hash, + validator_index, + ) => { + // Issue a + // AvailabilityStore::QueryChunk(candidate-hash, validator_index, response) + // message. + let chunk = query_chunk(ctx, candidate_hash, validator_index).await?; + + // Whatever the result, issue an + // AvailabilityRecoveryV1Message::Chunk(r_id, response) message. + let wire_message = protocol_v1::AvailabilityRecoveryMessage::Chunk( + request_id, + chunk, + ); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + vec![peer], + protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), + ), + )).await; + } + protocol_v1::AvailabilityRecoveryMessage::Chunk(request_id, chunk) => { + match state.live_chunk_requests.remove(&request_id) { + None => { + // If there doesn't exist one, report the peer and return. + report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await; + } + Some((peer_id, awaited_chunk)) if peer_id == peer => { + // If there exists an entry under r_id, remove it. + // Send the chunk response on the awaited_chunk for the interaction to handle. + if let Some(chunk) = chunk { + if awaited_chunk.response.send(Ok((peer_id, chunk))).is_err() { + tracing::debug!( + target: LOG_TARGET, + "A sending side of the recovery request is closed", + ); + } + } + } + Some(a) => { + // If the peer in the entry doesn't match the sending peer, + // reinstate the entry, report the peer, and return + state.live_chunk_requests.insert(request_id, a); + report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await; + } + } + } + } + } + // We do not really need to track the peers' views in this subsystem + // since the peers are _required_ to have the data we are interested in. + NetworkBridgeEvent::PeerViewChange(_, _) => {} + NetworkBridgeEvent::OurViewChange(_) => {} + // All peer connections are handled via validator discovery API. + NetworkBridgeEvent::PeerConnected(_, _) => {} + NetworkBridgeEvent::PeerDisconnected(_) => {} + } + + Ok(()) +} + +/// Issues a chunk request to the validator we've been waiting for to connect to us. +async fn issue_chunk_request( + state: &mut State, + ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>, + peer_id: PeerId, + awaited_chunk: AwaitedChunk, +) -> error::Result<()> { + let request_id = state.next_request_id; + state.next_request_id += 1; + + let wire_message = protocol_v1::AvailabilityRecoveryMessage::RequestChunk( + request_id, + awaited_chunk.candidate_hash, + awaited_chunk.validator_index, + ); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + vec![peer_id.clone()], + protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), + ), + )).await; + + state.live_chunk_requests.insert(request_id, (peer_id, awaited_chunk)); + + Ok(()) +} + +/// Handles a newly connected validator in the context of some relay leaf. +async fn handle_validator_connected( + state: &mut State, + ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>, + authority_id: AuthorityDiscoveryId, + peer_id: PeerId, +) -> error::Result<()> { + if let Some(discovering) = state.discovering_validators.remove(&authority_id) { + for chunk in discovering { + issue_chunk_request(state, ctx, peer_id.clone(), chunk).await?; + } + } + + Ok(()) +} + +/// Awaited chunks info that `State` holds has to be cleaned up +/// periodically since there is no way `Interaction` can communicate +/// a timedout request. +fn cleanup_awaited_chunks(state: &mut State) { + let mut removed_tokens = Vec::new(); + + for (_, v) in state.discovering_validators.iter_mut() { + v.retain(|e| if !e.response.is_canceled() { + removed_tokens.push(e.token); + false + } else { + true + }); + } + + for token in removed_tokens { + Pin::new(&mut state.connecting_validators).remove(token); + } + + state.discovering_validators.retain(|_, v| !v.is_empty()); + state.live_chunk_requests.retain(|_, v| !v.1.response.is_canceled()); +} + +impl AvailabilityRecoverySubsystem { + /// Create a new instance of `AvailabilityRecoverySubsystem`. + pub fn new() -> Self { + Self + } + + async fn run( + self, + mut ctx: impl SubsystemContext<Message = AvailabilityRecoveryMessage>, + ) -> SubsystemResult<()> { + let mut state = State::default(); + + let awaited_chunk_cleanup_interval = futures::stream::repeat(()).then(|_| async move { + Delay::new(AWAITED_CHUNKS_CLEANUP_INTERVAL).await; + }); + + futures::pin_mut!(awaited_chunk_cleanup_interval); + + loop { + futures::select_biased! { + _v = awaited_chunk_cleanup_interval.next() => { + cleanup_awaited_chunks(&mut state); + } + v = state.connecting_validators.next() => { + if let Some((v, token)) = v { + match v { + StreamYield::Item(v) => { + if let Err(e) = handle_validator_connected( + &mut state, + &mut ctx, + v.0, + v.1, + ).await { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Failed to handle a newly connected validator", + ); + } + } + StreamYield::Finished(_) => { + Pin::new(&mut state.connecting_validators).remove(token); + } + } + } + } + v = ctx.recv().fuse() => { + match v? { + FromOverseer::Signal(signal) => if handle_signal( + &mut state, + signal, + ).await? { + return Ok(()); + } + FromOverseer::Communication { msg } => { + match msg { + AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + response_sender, + ) => { + if let Err(e) = handle_recover( + &mut state, + &mut ctx, + receipt, + session_index, + response_sender, + ).await { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Error handling a recovery request", + ); + } + } + AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(event) => { + if let Err(e) = handle_network_update( + &mut state, + &mut ctx, + event, + ).await { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Error handling a network bridge update", + ); + } + } + } + } + } + } + from_interaction = state.from_interaction_rx.next() => { + if let Some(from_interaction) = from_interaction { + if let Err(e) = handle_from_interaction( + &mut state, + &mut ctx, + from_interaction, + ).await { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Error handling message from interaction", + ); + } + } + } + } + } + } +} diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs new file mode 100644 index 00000000000..3833b128203 --- /dev/null +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -0,0 +1,565 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +use std::time::Duration; +use std::sync::Arc; + +use futures::{executor, future}; +use futures_timer::Delay; +use assert_matches::assert_matches; +use smallvec::smallvec; + +use super::*; + +use polkadot_primitives::v1::{ + AuthorityDiscoveryId, PersistedValidationData, PoV, BlockData, HeadData, +}; +use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; +use polkadot_node_subsystem_util::TimeoutExt; +use polkadot_subsystem_testhelpers as test_helpers; +use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan}; + +type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>; + +struct TestHarness { + virtual_overseer: VirtualOverseer, +} + +fn test_harness<T: Future<Output = ()>>( + test: impl FnOnce(TestHarness) -> T, +) { + let _ = env_logger::builder() + .is_test(true) + .filter( + Some("polkadot_availability_recovery"), + log::LevelFilter::Trace, + ) + .try_init(); + + let pool = sp_core::testing::TaskExecutor::new(); + + let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + + let subsystem = AvailabilityRecoverySubsystem::new(); + let subsystem = subsystem.run(context); + + let test_fut = test(TestHarness { virtual_overseer }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::select(test_fut, subsystem)); +} + +const TIMEOUT: Duration = Duration::from_millis(100); + +macro_rules! delay { + ($delay:expr) => { + Delay::new(Duration::from_millis($delay)).await; + }; +} + +async fn overseer_signal( + overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>, + signal: OverseerSignal, +) { + delay!(50); + overseer + .send(FromOverseer::Signal(signal)) + .timeout(TIMEOUT) + .await + .expect("10ms is more than enough for sending signals."); +} + +async fn overseer_send( + overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>, + msg: AvailabilityRecoveryMessage, +) { + tracing::trace!(msg = ?msg, "sending message"); + overseer + .send(FromOverseer::Communication { msg }) + .timeout(TIMEOUT) + .await + .expect("10ms is more than enough for sending messages."); +} + +async fn overseer_recv( + overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>, +) -> AllMessages { + tracing::trace!("waiting for message ..."); + let msg = overseer + .recv() + .timeout(TIMEOUT) + .await + .expect("TIMEOUT is enough to recv."); + tracing::trace!(msg = ?msg, "received message"); + msg +} + + +use sp_keyring::Sr25519Keyring; + +#[derive(Clone)] +struct TestState { + validators: Vec<Sr25519Keyring>, + validator_public: Vec<ValidatorId>, + validator_authority_id: Vec<AuthorityDiscoveryId>, + validator_peer_id: Vec<PeerId>, + current: Hash, + candidate: CandidateReceipt, + session_index: SessionIndex, + + + persisted_validation_data: PersistedValidationData, + + available_data: AvailableData, + chunks: Vec<ErasureChunk>, +} + +impl TestState { + async fn test_runtime_api( + &self, + virtual_overseer: &mut VirtualOverseer, + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo( + session_index, + tx, + ) + )) => { + assert_eq!(relay_parent, self.current); + assert_eq!(session_index, self.session_index); + + tx.send(Ok(Some(SessionInfo { + validators: self.validator_public.clone(), + discovery_keys: self.validator_authority_id.clone(), + ..Default::default() + }))).unwrap(); + } + ); + } + + async fn test_connect_to_validators( + &self, + virtual_overseer: &mut VirtualOverseer, + ) { + // Channels by AuthorityDiscoveryId to send results to. + // Gather them here and send in batch after the loop not to race. + let mut results = HashMap::new(); + + for _ in 0..self.validator_public.len() { + // Connect to shuffled validators one by one. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + connected, + .. + } + ) => { + for validator_id in validator_ids { + let idx = self.validator_authority_id + .iter() + .position(|x| *x == validator_id) + .unwrap(); + + results.insert( + ( + self.validator_authority_id[idx].clone(), + self.validator_peer_id[idx].clone(), + ), + connected.clone(), + ); + } + } + ); + } + + for (k, mut v) in results.into_iter() { + v.send(k).await.unwrap(); + } + } + + async fn test_chunk_requests( + &self, + candidate_hash: CandidateHash, + virtual_overseer: &mut VirtualOverseer, + ) { + for _ in 0..self.validator_public.len() { + // Receive a request for a chunk. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + _peers, + protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), + ) + ) => { + let (request_id, validator_index) = assert_matches!( + wire_message, + protocol_v1::AvailabilityRecoveryMessage::RequestChunk( + request_id, + candidate_hash_recvd, + validator_index, + ) => { + assert_eq!(candidate_hash_recvd, candidate_hash); + (request_id, validator_index) + } + ); + + overseer_send( + virtual_overseer, + AvailabilityRecoveryMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + self.validator_peer_id[validator_index as usize].clone(), + protocol_v1::AvailabilityRecoveryMessage::Chunk( + request_id, + Some(self.chunks[validator_index as usize].clone()), + ) + ) + ) + ).await; + } + ); + } + } + + async fn test_faulty_chunk_requests( + &self, + candidate_hash: CandidateHash, + virtual_overseer: &mut VirtualOverseer, + faulty: &[bool], + ) { + for _ in 0..self.validator_public.len() { + // Receive a request for a chunk. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + _peers, + protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message), + ) + ) => { + let (request_id, validator_index) = assert_matches!( + wire_message, + protocol_v1::AvailabilityRecoveryMessage::RequestChunk( + request_id, + candidate_hash_recvd, + validator_index, + ) => { + assert_eq!(candidate_hash_recvd, candidate_hash); + (request_id, validator_index) + } + ); + + overseer_send( + virtual_overseer, + AvailabilityRecoveryMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + self.validator_peer_id[validator_index as usize].clone(), + protocol_v1::AvailabilityRecoveryMessage::Chunk( + request_id, + Some(self.chunks[validator_index as usize].clone()), + ) + ) + ) + ).await; + } + ); + } + + for i in 0..self.validator_public.len() { + if faulty[i] { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + peer, + rep, + ) + ) => { + assert_eq!(rep, COST_MERKLE_PROOF_INVALID); + + // These may arrive in any order since the interaction implementation + // uses `FuturesUnordered`. + assert!(self.validator_peer_id.iter().find(|p| **p == peer).is_some()); + } + ); + } + } + } +} + + +fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> { + val_ids.iter().map(|v| v.public().into()).collect() +} + +fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryId> { + val_ids.iter().map(|v| v.public().into()).collect() +} + +fn derive_erasure_chunks_with_proofs_and_root( + n_validators: usize, + available_data: &AvailableData, +) -> (Vec<ErasureChunk>, Hash) { + let chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap(); + + // create proofs for each erasure chunk + let branches = branches(chunks.as_ref()); + + let root = branches.root(); + let erasure_chunks = branches + .enumerate() + .map(|(index, (proof, chunk))| ErasureChunk { + chunk: chunk.to_vec(), + index: index as _, + proof, + }) + .collect::<Vec<ErasureChunk>>(); + + (erasure_chunks, root) +} + +impl Default for TestState { + fn default() -> Self { + let validators = vec![ + Sr25519Keyring::Ferdie, // <- this node, role: validator + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + ]; + + let validator_public = validator_pubkeys(&validators); + let validator_authority_id = validator_authority_id(&validators); + let validator_peer_id = std::iter::repeat_with(|| PeerId::random()) + .take(validator_public.len()) + .collect(); + + let current = Hash::repeat_byte(1); + + let mut candidate = CandidateReceipt::default(); + + let session_index = 10; + + let persisted_validation_data = PersistedValidationData { + parent_head: HeadData(vec![7, 8, 9]), + block_number: Default::default(), + hrmp_mqc_heads: Vec::new(), + dmq_mqc_head: Default::default(), + max_pov_size: 1024, + relay_storage_root: Default::default(), + }; + + let pov = PoV { + block_data: BlockData(vec![42; 64]), + }; + + let available_data = AvailableData { + validation_data: persisted_validation_data.clone(), + pov: Arc::new(pov), + }; + + let (chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root( + validators.len(), + &available_data, + ); + + candidate.descriptor.erasure_root = erasure_root; + candidate.descriptor.relay_parent = Hash::repeat_byte(10); + + Self { + validators, + validator_public, + validator_authority_id, + validator_peer_id, + current, + candidate, + session_index, + persisted_validation_data, + available_data, + chunks, + } + } +} + +#[test] +fn availability_is_recovered() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))], + deactivated: smallvec![], + }), + ).await; + + let (tx, rx) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + test_state.candidate.clone(), + test_state.session_index, + tx, + ) + ).await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + + test_state.test_connect_to_validators(&mut virtual_overseer).await; + + let candidate_hash = test_state.candidate.hash(); + + test_state.test_chunk_requests(candidate_hash, &mut virtual_overseer).await; + + // Recovered data should match the original one. + assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); + + let (tx, rx) = oneshot::channel(); + + // Test another candidate, send no chunks. + let mut new_candidate = CandidateReceipt::default(); + + new_candidate.descriptor.relay_parent = test_state.candidate.descriptor.relay_parent; + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + new_candidate, + test_state.session_index, + tx, + ) + ).await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + + test_state.test_connect_to_validators(&mut virtual_overseer).await; + + // A request times out with `Unavailable` error. + assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); + }); +} + +#[test] +fn a_faulty_chunk_leads_to_recovery_error() { + let mut test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))], + deactivated: smallvec![], + }), + ).await; + + let (tx, rx) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + test_state.candidate.clone(), + test_state.session_index, + tx, + ) + ).await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + + test_state.test_connect_to_validators(&mut virtual_overseer).await; + + let candidate_hash = test_state.candidate.hash(); + + // Create some faulty chunks. + test_state.chunks[0].chunk = vec![1; 32]; + test_state.chunks[1].chunk = vec![2; 32]; + let mut faulty = vec![false; test_state.chunks.len()]; + faulty[0] = true; + faulty[1] = true; + + test_state.test_faulty_chunk_requests( + candidate_hash, + &mut virtual_overseer, + &faulty, + ).await; + + // A request times out with `Unavailable` error. + assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid); + }); +} + +#[test] +fn a_wrong_chunk_leads_to_recovery_error() { + let mut test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))], + deactivated: smallvec![], + }), + ).await; + + let (tx, rx) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + test_state.candidate.clone(), + test_state.session_index, + tx, + ) + ).await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + + test_state.test_connect_to_validators(&mut virtual_overseer).await; + + let candidate_hash = test_state.candidate.hash(); + + // Send a wrong chunk so it passes proof check but fails to reconstruct. + test_state.chunks[1] = test_state.chunks[0].clone(); + test_state.chunks[2] = test_state.chunks[0].clone(); + test_state.chunks[3] = test_state.chunks[0].clone(); + test_state.chunks[4] = test_state.chunks[0].clone(); + + let faulty = vec![false; test_state.chunks.len()]; + + test_state.test_faulty_chunk_requests( + candidate_hash, + &mut virtual_overseer, + &faulty, + ).await; + + // A request times out with `Unavailable` error. + assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Invalid); + }); +} diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index c833ba4b151..937c17981a1 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -282,7 +282,7 @@ impl View { pub mod v1 { use polkadot_primitives::v1::{ Hash, CollatorId, Id as ParaId, ErasureChunk, CandidateReceipt, - SignedAvailabilityBitfield, PoV, CandidateHash, + SignedAvailabilityBitfield, PoV, CandidateHash, ValidatorIndex, }; use polkadot_node_primitives::SignedFullStatement; use parity_scale_codec::{Encode, Decode}; @@ -297,6 +297,16 @@ pub mod v1 { Chunk(CandidateHash, ErasureChunk), } + /// Network messages used by the availability recovery subsystem. + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub enum AvailabilityRecoveryMessage { + /// Request a chunk for a given candidate hash and validator index. + RequestChunk(RequestId, CandidateHash, ValidatorIndex), + /// Respond with chunk for a given candidate hash and validator index. + /// The response may be `None` if the requestee does not have the chunk. + Chunk(RequestId, Option<ErasureChunk>), + } + /// Network messages used by the bitfield distribution subsystem. #[derive(Debug, Clone, Encode, Decode, PartialEq)] pub enum BitfieldDistributionMessage { @@ -359,6 +369,9 @@ pub mod v1 { /// Statement distribution messages #[codec(index = "3")] StatementDistribution(StatementDistributionMessage), + /// Availability recovery messages + #[codec(index = "4")] + AvailabilityRecovery(AvailabilityRecoveryMessage), } impl_try_from!(ValidationProtocol, AvailabilityDistribution, AvailabilityDistributionMessage); diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index f274f24895c..5974ebc3e91 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -85,6 +85,7 @@ use polkadot_subsystem::messages::{ AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage, ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage, + AvailabilityRecoveryMessage, }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, @@ -524,6 +525,9 @@ pub struct Overseer<S> { /// An availability distribution subsystem. availability_distribution_subsystem: OverseenSubsystem<AvailabilityDistributionMessage>, + /// An availability recovery subsystem. + availability_recovery_subsystem: OverseenSubsystem<AvailabilityRecoveryMessage>, + /// A bitfield signing subsystem. bitfield_signing_subsystem: OverseenSubsystem<BitfieldSigningMessage>, @@ -593,8 +597,8 @@ pub struct Overseer<S> { /// message type that is specific to this [`Subsystem`]. At the moment not all /// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. pub struct AllSubsystems< - CV = (), CB = (), CS = (), SD = (), AD = (), BS = (), BD = (), P = (), - PoVD = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = () + CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (), + PoVD = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), > { /// A candidate validation subsystem. pub candidate_validation: CV, @@ -606,6 +610,8 @@ pub struct AllSubsystems< pub statement_distribution: SD, /// An availability distribution subsystem. pub availability_distribution: AD, + /// An availability recovery subsystem. + pub availability_recovery: AR, /// A bitfield signing subsystem. pub bitfield_signing: BS, /// A bitfield distribution subsystem. @@ -628,8 +634,8 @@ pub struct AllSubsystems< pub collator_protocol: CP, } -impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> - AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> +impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> + AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { /// Create a new instance of [`AllSubsystems`]. /// @@ -658,6 +664,7 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, DummySubsystem > { AllSubsystems { @@ -666,6 +673,7 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + availability_recovery: DummySubsystem, bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, @@ -683,13 +691,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_candidate_validation<NEW>( self, candidate_validation: NEW, - ) -> AllSubsystems<NEW, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<NEW, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -707,13 +716,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_candidate_backing<NEW>( self, candidate_backing: NEW, - ) -> AllSubsystems<CV, NEW, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, NEW, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -731,13 +741,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_candidate_selection<NEW>( self, candidate_selection: NEW, - ) -> AllSubsystems<CV, CB, NEW, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, NEW, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -755,13 +766,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_statement_distribution<NEW>( self, statement_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, NEW, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, NEW, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -779,13 +791,39 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_availability_distribution<NEW>( self, availability_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, NEW, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, NEW, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + pov_distribution: self.pov_distribution, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + } + } + + /// Replace the `availability_recovery` instance in `self`. + pub fn replace_availability_recovery<NEW>( + self, + availability_recovery: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, NEW, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -803,13 +841,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_bitfield_signing<NEW>( self, bitfield_signing: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, NEW, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, NEW, BD, P, PoVD, RA, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -827,13 +866,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_bitfield_distribution<NEW>( self, bitfield_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, BS, NEW, P, PoVD, RA, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, NEW, P, PoVD, RA, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution, provisioner: self.provisioner, @@ -851,13 +891,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_provisioner<NEW>( self, provisioner: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, NEW, PoVD, RA, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, NEW, PoVD, RA, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner, @@ -875,13 +916,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_pov_distribution<NEW>( self, pov_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, NEW, RA, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, NEW, RA, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -899,13 +941,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_runtime_api<NEW>( self, runtime_api: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, NEW, AS, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, NEW, AS, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -923,13 +966,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_availability_store<NEW>( self, availability_store: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, NEW, NB, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, NEW, NB, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -947,13 +991,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_network_bridge<NEW>( self, network_bridge: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NEW, CA, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NEW, CA, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -971,13 +1016,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_chain_api<NEW>( self, chain_api: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, NEW, CG, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, NEW, CG, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -995,13 +1041,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_collation_generation<NEW>( self, collation_generation: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, NEW, CP> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, NEW, CP> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -1019,13 +1066,14 @@ impl<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> pub fn replace_collator_protocol<NEW>( self, collator_protocol: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, NEW> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, NEW> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, bitfield_signing: self.bitfield_signing, bitfield_distribution: self.bitfield_distribution, provisioner: self.provisioner, @@ -1260,9 +1308,9 @@ where /// # /// # }); } /// ``` - pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>( + pub fn new<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>( leaves: impl IntoIterator<Item = BlockInfo>, - all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>, + all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>, prometheus_registry: Option<&prometheus::Registry>, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> @@ -1272,6 +1320,7 @@ where CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send, SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send, AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send, + AR: Subsystem<OverseerSubsystemContext<AvailabilityRecoveryMessage>> + Send, BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>> + Send, BD: Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>> + Send, P: Subsystem<OverseerSubsystemContext<ProvisionerMessage>> + Send, @@ -1357,6 +1406,15 @@ where &mut seed, )?; + let availability_recovery_subsystem = spawn( + &mut s, + &mut running_subsystems, + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + all_subsystems.availability_recovery, + &metrics, + &mut seed, + )?; + let bitfield_signing_subsystem = spawn( &mut s, &mut running_subsystems, @@ -1462,6 +1520,7 @@ where candidate_selection_subsystem, statement_distribution_subsystem, availability_distribution_subsystem, + availability_recovery_subsystem, bitfield_signing_subsystem, bitfield_distribution_subsystem, provisioner_subsystem, @@ -1493,6 +1552,7 @@ where let _ = self.candidate_selection_subsystem.send_signal(OverseerSignal::Conclude).await; let _ = self.statement_distribution_subsystem.send_signal(OverseerSignal::Conclude).await; let _ = self.availability_distribution_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.availability_recovery_subsystem.send_signal(OverseerSignal::Conclude).await; let _ = self.bitfield_signing_subsystem.send_signal(OverseerSignal::Conclude).await; let _ = self.bitfield_distribution_subsystem.send_signal(OverseerSignal::Conclude).await; let _ = self.provisioner_subsystem.send_signal(OverseerSignal::Conclude).await; @@ -1660,6 +1720,7 @@ where self.candidate_selection_subsystem.send_signal(signal.clone()).await?; self.statement_distribution_subsystem.send_signal(signal.clone()).await?; self.availability_distribution_subsystem.send_signal(signal.clone()).await?; + self.availability_recovery_subsystem.send_signal(signal.clone()).await?; self.bitfield_signing_subsystem.send_signal(signal.clone()).await?; self.bitfield_distribution_subsystem.send_signal(signal.clone()).await?; self.provisioner_subsystem.send_signal(signal.clone()).await?; @@ -1694,6 +1755,9 @@ where AllMessages::AvailabilityDistribution(msg) => { self.availability_distribution_subsystem.send_message(msg).await?; }, + AllMessages::AvailabilityRecovery(msg) => { + let _ = self.availability_recovery_subsystem.send_message(msg).await; + }, AllMessages::BitfieldDistribution(msg) => { self.bitfield_distribution_subsystem.send_message(msg).await?; }, @@ -2538,6 +2602,15 @@ mod tests { AvailabilityDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event()) } + fn test_availability_recovery_msg() -> AvailabilityRecoveryMessage { + let (sender, _) = oneshot::channel(); + AvailabilityRecoveryMessage::RecoverAvailableData( + Default::default(), + Default::default(), + sender, + ) + } + fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage { BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event()) } @@ -2589,6 +2662,7 @@ mod tests { collator_protocol: subsystem.clone(), statement_distribution: subsystem.clone(), availability_distribution: subsystem.clone(), + availability_recovery: subsystem.clone(), bitfield_signing: subsystem.clone(), bitfield_distribution: subsystem.clone(), provisioner: subsystem.clone(), @@ -2624,6 +2698,7 @@ mod tests { handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await; handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await; handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await; + handler.send_msg(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await; // handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await; handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await; handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await; @@ -2638,7 +2713,7 @@ mod tests { select! { res = overseer_fut => { - const NUM_SUBSYSTEMS: usize = 15; + const NUM_SUBSYSTEMS: usize = 16; assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); // x2 because of broadcast_signal on startup diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 8ee46f3b9f1..558cc0f4348 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -79,6 +79,7 @@ rococo-runtime = { path = "../../runtime/rococo" } # Polkadot Subsystems polkadot-availability-bitfield-distribution = { path = "../network/bitfield-distribution", optional = true } polkadot-availability-distribution = { path = "../network/availability-distribution", optional = true } +polkadot-availability-recovery = { path = "../network/availability-recovery", optional = true } polkadot-collator-protocol = { path = "../network/collator-protocol", optional = true } polkadot-network-bridge = { path = "../network/bridge", optional = true } polkadot-node-collation-generation = { path = "../collation-generation", optional = true } @@ -114,6 +115,7 @@ runtime-benchmarks = [ real-overseer = [ "polkadot-availability-bitfield-distribution", "polkadot-availability-distribution", + "polkadot-availability-recovery", "polkadot-collator-protocol", "polkadot-network-bridge", "polkadot-node-collation-generation", diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 0b37e3ae48f..d16486e60a7 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -397,12 +397,15 @@ where use polkadot_node_core_provisioner::ProvisioningSubsystem as ProvisionerSubsystem; use polkadot_node_core_runtime_api::RuntimeApiSubsystem; use polkadot_statement_distribution::StatementDistribution as StatementDistributionSubsystem; + use polkadot_availability_recovery::AvailabilityRecoverySubsystem; let all_subsystems = AllSubsystems { availability_distribution: AvailabilityDistributionSubsystem::new( keystore.clone(), Metrics::register(registry)?, ), + availability_recovery: AvailabilityRecoverySubsystem::new( + ), availability_store: AvailabilityStoreSubsystem::new_on_disk( availability_config, Metrics::register(registry)?, diff --git a/polkadot/node/subsystem/src/errors.rs b/polkadot/node/subsystem/src/errors.rs index 5af573c87fc..acd33cff1df 100644 --- a/polkadot/node/subsystem/src/errors.rs +++ b/polkadot/node/subsystem/src/errors.rs @@ -59,3 +59,21 @@ impl core::fmt::Display for ChainApiError { } impl std::error::Error for ChainApiError {} + +/// An error that may happen during Availability Recovery process. +#[derive(PartialEq, Debug, Clone)] +pub enum RecoveryError { + /// A chunk is recovered but is invalid. + Invalid, + + /// A requested chunk is unavailable. + Unavailable, +} + +impl std::fmt::Display for RecoveryError { + fn fmt(&self, f: &mut core::fmt::Formatter) -> Result<(), core::fmt::Error> { + write!(f, "{}", self) + } +} + +impl std::error::Error for RecoveryError {} diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 995256d5d40..e50b902366d 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -245,6 +245,19 @@ pub enum AvailabilityDistributionMessage { NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>), } +/// Availability Recovery Message. +#[derive(Debug)] +pub enum AvailabilityRecoveryMessage { + /// Recover available data from validators on the network. + RecoverAvailableData( + CandidateReceipt, + SessionIndex, + oneshot::Sender<Result<AvailableData, crate::errors::RecoveryError>>, + ), + /// Event from the network bridge. + NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::AvailabilityRecoveryMessage>), +} + impl AvailabilityDistributionMessage { /// If the current variant contains the relay parent hash, return it. pub fn relay_parent(&self) -> Option<Hash> { @@ -596,6 +609,8 @@ pub enum AllMessages { StatementDistribution(StatementDistributionMessage), /// Message for the availability distribution subsystem. AvailabilityDistribution(AvailabilityDistributionMessage), + /// Message for the availability recovery subsystem. + AvailabilityRecovery(AvailabilityRecoveryMessage), /// Message for the bitfield distribution subsystem. BitfieldDistribution(BitfieldDistributionMessage), /// Message for the bitfield signing subsystem. -- GitLab