Unverified Commit 030502a0 authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Availability recovery subsystem (#2122)

* Adds message types

* Add code skeleton

* Adds subsystem code.

* Adds a first test

* Adds interaction result to availability_lru

* Use LruCache instead of a HashMap

* Whitespaces to tabs

* Do not ignore errors

* Change error type

* Add a timeout to chunk requests

* Add custom errors and log them

* Adds replace_availability_recovery method

* recovery_threshold computed by erasure crate

* change core to std

* adds docs to error type

* Adds a test for invalid reconstruction

* refactors interaction run into multiple methods

* Cleanup AwaitedChunks

* Even more fixes

* Test that recovery with wrong root is an error

* Break to launch another requests

* Styling fixes

* Add SessionIndex to API

* Proper relay parents for MakeRequest

* Remove validator_discovery and use message

* Remove a stream on exhaustion

* On cleanup free the request streams

* Fix merge and refactor
parent cd0e9186
Pipeline #120381 passed with stages
in 25 minutes and 59 seconds
......@@ -4942,6 +4942,33 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "polkadot-availability-recovery"
version = "0.1.0"
dependencies = [
"assert_matches",
"env_logger 0.8.2",
"futures 0.3.10",
"futures-timer 3.0.2",
"log",
"lru",
"polkadot-erasure-coding",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"rand 0.7.3",
"smallvec 1.6.1",
"sp-application-crypto",
"sp-core",
"sp-keyring",
"streamunordered",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
name = "polkadot-cli"
version = "0.8.27"
......@@ -5650,6 +5677,7 @@ dependencies = [
"pallet-transaction-payment-rpc-runtime-api",
"polkadot-availability-bitfield-distribution",
"polkadot-availability-distribution",
"polkadot-availability-recovery",
"polkadot-collator-protocol",
"polkadot-network-bridge",
"polkadot-node-collation-generation",
......
......@@ -57,6 +57,7 @@ members = [
"node/network/statement-distribution",
"node/network/bitfield-distribution",
"node/network/availability-distribution",
"node/network/availability-recovery",
"node/network/collator-protocol",
"node/overseer",
"node/primitives",
......
......@@ -119,6 +119,7 @@ impl CodeParams {
.expect("this struct is not created with invalid shard number; qed")
}
}
/// Returns the maximum number of allowed, faulty chunks
/// which does not prevent recovery given all other pieces
/// are correct.
......
[package]
name = "polkadot-availability-recovery"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.8"
lru = "0.6.1"
rand = "0.7.3"
thiserror = "1.0.21"
tracing = "0.1.22"
tracing-futures = "0.2.4"
polkadot-erasure-coding = { path = "../../../erasure-coding" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
futures-timer = "3.0.2"
streamunordered = "0.5.1"
[dev-dependencies]
assert_matches = "1.4.0"
env_logger = "0.8.1"
futures-timer = "3.0.2"
log = "0.4.11"
smallvec = "1.5.1"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" }
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The `Error` and `Result` types used by the subsystem.
use futures::channel::{mpsc, oneshot};
use thiserror::Error;
/// Error type used by the Availability Recovery subsystem.
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Subsystem(#[from] polkadot_subsystem::SubsystemError),
#[error("failed to query a chunk from store")]
CanceledQueryChunk(#[source] oneshot::Canceled),
#[error("failed to query session info")]
CanceledSessionInfo(#[source] oneshot::Canceled),
#[error("failed to send response")]
CanceledResponseSender,
#[error("to_state channel is closed")]
ClosedToState(#[source] mpsc::SendError),
#[error(transparent)]
Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError),
#[error(transparent)]
Erasure(#[from] polkadot_erasure_coding::Error),
#[error(transparent)]
Util(#[from] polkadot_node_subsystem_util::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Availability Recovery Subsystem of Polkadot.
#![warn(missing_docs)]
use std::collections::HashMap;
use std::time::Duration;
use std::pin::Pin;
use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered};
use futures_timer::Delay;
use lru::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use streamunordered::{StreamUnordered, StreamYield};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash,
Hash, ErasureChunk, ValidatorId, ValidatorIndex,
SessionInfo, SessionIndex, BlakeTwo256, HashT,
};
use polkadot_subsystem::{
SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
OverseerSignal, ActiveLeavesUpdate,
errors::RecoveryError,
messages::{
AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage,
},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, RequestId,
};
use polkadot_node_subsystem_util::{
Timeout, TimeoutExt,
request_session_info_ctx,
};
use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1};
mod error;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "availability_recovery";
const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_UNEXPECTED_CHUNK: Rep = Rep::new(-100, "Peer has sent an unexpected chunk");
// How many parallel requests interaction should have going at once.
const N_PARALLEL: usize = 50;
// Size of the LRU cache where we keep recovered data.
const LRU_SIZE: usize = 16;
// A timeout for a chunk request.
const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
// A period to poll and clean AwaitedChunks.
const AWAITED_CHUNKS_CLEANUP_INTERVAL: Duration = Duration::from_secs(1);
/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem;
type ChunkResponse = Result<(PeerId, ErasureChunk), RecoveryError>;
/// Data we keep around for every chunk that we are awaiting.
struct AwaitedChunk {
/// Index of the validator we have requested this chunk from.
validator_index: ValidatorIndex,
/// The hash of the candidate the chunks belongs to.
candidate_hash: CandidateHash,
/// Token to cancel the connection request to the validator.
token: usize,
/// Result sender.
response: oneshot::Sender<ChunkResponse>,
}
/// Accumulate all awaiting sides for some particular `AvailableData`.
struct InteractionHandle {
awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
}
/// A message received by main code from an async `Interaction` task.
#[derive(Debug)]
enum FromInteraction {
/// An interaction concluded.
Concluded(CandidateHash, Result<AvailableData, RecoveryError>),
/// Make a request of a particular chunk from a particular validator.
MakeRequest(
AuthorityDiscoveryId,
CandidateHash,
ValidatorIndex,
oneshot::Sender<ChunkResponse>,
),
/// Report a peer.
ReportPeer(
PeerId,
Rep,
),
}
/// A state of a single interaction reconstructing an available data.
struct Interaction {
/// A communication channel with the `State`.
to_state: mpsc::Sender<FromInteraction>,
/// Discovery ids of `validators`.
validator_authority_keys: Vec<AuthorityDiscoveryId>,
/// Validators relevant to this `Interaction`.
validators: Vec<ValidatorId>,
/// A random shuffling of the validators which indicates the order in which we connect
/// to the validators and request the chunk from them.
shuffling: Vec<ValidatorIndex>,
/// The number of pieces needed.
threshold: usize,
/// A hash of the relevant candidate.
candidate_hash: CandidateHash,
/// The root of the erasure encoding of the para block.
erasure_root: Hash,
/// The chunks that we have received from peers.
received_chunks: HashMap<PeerId, ErasureChunk>,
/// The chunk requests that are waiting to complete.
requesting_chunks: FuturesUnordered<Timeout<oneshot::Receiver<ChunkResponse>>>,
}
const fn is_unavailable(
received_chunks: usize,
requesting_chunks: usize,
n_validators: usize,
threshold: usize,
) -> bool {
received_chunks + requesting_chunks + n_validators < threshold
}
impl Interaction {
async fn launch_parallel_requests(&mut self) -> error::Result<()> {
while self.requesting_chunks.len() < N_PARALLEL {
if let Some(validator_index) = self.shuffling.pop() {
let (tx, rx) = oneshot::channel();
self.to_state.send(FromInteraction::MakeRequest(
self.validator_authority_keys[validator_index as usize].clone(),
self.candidate_hash.clone(),
validator_index,
tx,
)).await.map_err(error::Error::ClosedToState)?;
self.requesting_chunks.push(rx.timeout(CHUNK_REQUEST_TIMEOUT));
} else {
break;
}
}
Ok(())
}
async fn wait_for_chunks(&mut self) -> error::Result<()> {
// Check if the requesting chunks is not empty not to poll to completion.
if self.requesting_chunks.is_empty() {
return Ok(());
}
// Poll for new updates from requesting_chunks.
while let Some(request_result) = self.requesting_chunks.next().await {
match request_result {
Some(Ok(Ok((peer_id, chunk)))) => {
// Check merkle proofs of any received chunks, and any failures should
// lead to issuance of a FromInteraction::ReportPeer message.
if let Ok(anticipated_hash) = branch_hash(
&self.erasure_root,
&chunk.proof,
chunk.index as usize,
) {
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if erasure_chunk_hash != anticipated_hash {
self.to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await.map_err(error::Error::ClosedToState)?;
}
} else {
self.to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await.map_err(error::Error::ClosedToState)?;
}
self.received_chunks.insert(peer_id, chunk);
}
Some(Err(e)) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"A response channel was cacelled while waiting for a chunk",
);
}
Some(Ok(Err(e))) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"A chunk request ended with an error",
);
}
None => {
tracing::debug!(
target: LOG_TARGET,
"A chunk request has timed out",
);
// we break here to launch another request.
break;
}
}
}
Ok(())
}
async fn run(mut self) -> error::Result<()> {
loop {
if is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.len(),
self.shuffling.len(),
self.threshold,
) {
self.to_state.send(FromInteraction::Concluded(
self.candidate_hash,
Err(RecoveryError::Unavailable),
)).await.map_err(error::Error::ClosedToState)?;
return Ok(());
}
self.launch_parallel_requests().await?;
self.wait_for_chunks().await?;
// If received_chunks has more than threshold entries, attempt to recover the data.
// If that fails, or a re-encoding of it doesn't match the expected erasure root,
// break and issue a FromInteraction::Concluded(RecoveryError::Invalid).
// Otherwise, issue a FromInteraction::Concluded(Ok(())).
if self.received_chunks.len() >= self.threshold {
let concluded = match polkadot_erasure_coding::reconstruct_v1(
self.validators.len(),
self.received_chunks.values().map(|c| (&c.chunk[..], c.index as usize)),
) {
Ok(data) => {
if reconstructed_data_matches_root(self.validators.len(), &self.erasure_root, &data) {
FromInteraction::Concluded(self.candidate_hash.clone(), Ok(data))
} else {
FromInteraction::Concluded(
self.candidate_hash.clone(),
Err(RecoveryError::Invalid),
)
}
}
Err(_) => FromInteraction::Concluded(
self.candidate_hash.clone(),
Err(RecoveryError::Invalid),
),
};
self.to_state.send(concluded).await.map_err(error::Error::ClosedToState)?;
return Ok(());
}
}
}
}
fn reconstructed_data_matches_root(
n_validators: usize,
expected_root: &Hash,
data: &AvailableData,
) -> bool {
let chunks = match obtain_chunks_v1(n_validators, data) {
Ok(chunks) => chunks,
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"Failed to obtain chunks",
);
return false;
}
};
let branches = branches(&chunks);
branches.root() == *expected_root
}
struct State {
/// Each interaction is implemented as its own async task,
/// and these handles are for communicating with them.
interactions: HashMap<CandidateHash, InteractionHandle>,
/// A recent block hash for which state should be available.
live_block_hash: Hash,
/// We are waiting for these validators to connect and as soon as they
/// do to request the needed chunks we are awaitinf for.
discovering_validators: HashMap<AuthorityDiscoveryId, Vec<AwaitedChunk>>,
/// Requests that we have issued to the already connected validators
/// about the chunks we are interested in.
live_chunk_requests: HashMap<RequestId, (PeerId, AwaitedChunk)>,
/// Derive request ids from this.
next_request_id: RequestId,
connecting_validators: StreamUnordered<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,
/// interaction communication. This is cloned and given to interactions that are spun up.
from_interaction_tx: mpsc::Sender<FromInteraction>,
/// receiver for messages from interactions.
from_interaction_rx: mpsc::Receiver<FromInteraction>,
/// An LRU cache of recently recovered data.
availability_lru: LruCache<CandidateHash, Result<AvailableData, RecoveryError>>,
}
impl Default for State {
fn default() -> Self {
let (from_interaction_tx, from_interaction_rx) = mpsc::channel(16);
Self {
from_interaction_tx,
from_interaction_rx,
interactions: HashMap::new(),
live_block_hash: Hash::default(),
discovering_validators: HashMap::new(),
live_chunk_requests: HashMap::new(),
next_request_id: 0,
connecting_validators: StreamUnordered::new(),
availability_lru: LruCache::new(LRU_SIZE),
}
}
}
impl<C> Subsystem<C> for AvailabilityRecoverySubsystem
where C: SubsystemContext<Message = AvailabilityRecoveryMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
let future = self.run(ctx)
.map_err(|e| SubsystemError::with_origin("availability-recovery", e))
.boxed();
SpawnedSubsystem {
name: "availability-recovery-subsystem",
future,
}
}
}
/// Handles a signal from the overseer.
async fn handle_signal(
state: &mut State,
signal: OverseerSignal,
) -> SubsystemResult<bool> {
match signal {
OverseerSignal::Conclude => Ok(true),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => {
// if activated is non-empty, set state.live_block_hash to the first block in Activated.
if let Some(hash) = activated.get(0) {
state.live_block_hash = hash.0;
}
Ok(false)
}
OverseerSignal::BlockFinalized(_, _) => Ok(false)
}
}
/// Report a reputation change for a peer.
async fn report_peer(
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
peer: PeerId,
rep: Rep,
) {
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await;
}
/// Machinery around launching interactions into the background.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn launch_interaction(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
session_index: SessionIndex,
session_info: SessionInfo,
receipt: CandidateReceipt,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
) -> error::Result<()> {
let threshold = recovery_threshold(session_info.validators.len())?;
let to_state = state.from_interaction_tx.clone();
let candidate_hash = receipt.hash();
let erasure_root = receipt.descriptor.erasure_root;
let validators = session_info.validators.clone();
let validator_authority_keys = session_info.discovery_keys.clone();
let mut shuffling: Vec<_> = (0..validators.len() as ValidatorIndex).collect();
state.interactions.insert(
candidate_hash.clone(),
InteractionHandle {
awaiting: vec![response_sender],
}
);
{
// make borrow checker happy.
let mut rng = thread_rng();
shuffling.shuffle(&mut rng);
}
let interaction = Interaction {
to_state,
validator_authority_keys,
validators,
shuffling,
threshold,
candidate_hash,
erasure_root,
received_chunks: HashMap::new(),
requesting_chunks: FuturesUnordered::new(),
};
let future = async move {
if let Err(e) = interaction.run().await {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"Interaction finished with an error",
);
}