Unverified Commit 8198a100 authored by Bernhard Schuster's avatar Bernhard Schuster Committed by GitHub
Browse files

impl availability distribution

Closes #1237
parent 84043d49
Pipeline #103527 passed with stages
in 25 minutes and 10 seconds
This diff is collapsed.
......@@ -57,6 +57,7 @@ members = [
"node/network/pov-distribution",
"node/network/statement-distribution",
"node/network/bitfield-distribution",
"node/network/availability-distribution",
"node/overseer",
"node/primitives",
"node/service",
......
[package]
name = "polkadot-availability-distribution"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.11"
streamunordered = "0.5.1"
codec = { package="parity-scale-codec", version = "1.3.4", features = ["std"] }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-erasure-coding = { path = "../../../erasure-coding" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-network-bridge = { path = "../../network/bridge" }
polkadot-network = { path = "../../../network" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
derive_more = "0.99.9"
sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
[dev-dependencies]
polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" }
bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
parking_lot = "0.11.0"
futures-timer = "3.0.2"
smol-timeout = "0.1.0"
env_logger = "0.7.1"
assert_matches = "1.3.0"
smallvec = "1"
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
......@@ -21,9 +21,9 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
parking_lot = "0.10.0"
parking_lot = "0.11.0"
maplit = "1.0.2"
smol = "0.2.0"
smol = "0.3.3"
smol-timeout = "0.1.0"
env_logger = "0.7.1"
assert_matches = "1.3.0"
......@@ -46,9 +46,9 @@ const COST_MESSAGE_NOT_DECODABLE: ReputationChange =
ReputationChange::new(-100, "Not interested in that parent hash");
const COST_PEER_DUPLICATE_MESSAGE: ReputationChange =
ReputationChange::new(-500, "Peer sent the same message multiple times");
const GAIN_VALID_MESSAGE_FIRST: ReputationChange =
const BENEFIT_VALID_MESSAGE_FIRST: ReputationChange =
ReputationChange::new(15, "Valid message with new information");
const GAIN_VALID_MESSAGE: ReputationChange =
const BENEFIT_VALID_MESSAGE: ReputationChange =
ReputationChange::new(10, "Valid message");
/// Checked signed availability bitfield that is distributed
......@@ -396,14 +396,14 @@ where
"Already received a message for validator at index {}",
validator_index
);
modify_reputation(ctx, origin, GAIN_VALID_MESSAGE).await?;
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?;
return Ok(());
}
one_per_validator.insert(validator.clone(), message.clone());
relay_message(ctx, job_data, &mut state.peer_views, validator, message).await?;
modify_reputation(ctx, origin, GAIN_VALID_MESSAGE_FIRST).await
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await
} else {
modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await
}
......@@ -479,14 +479,14 @@ where
{
let current = state.peer_views.entry(origin.clone()).or_default();
let delta_vec: Vec<Hash> = (*current).difference(&view).cloned().collect();
let added: Vec<Hash> = view.difference(&*current).cloned().collect();
*current = view;
// Send all messages we've seen before and the peer is now interested
// in to that peer.
let delta_set: Vec<(ValidatorId, BitfieldGossipMessage)> = delta_vec
let delta_set: Vec<(ValidatorId, BitfieldGossipMessage)> = added
.into_iter()
.filter_map(|new_relay_parent_interest| {
if let Some(job_data) = (&*state).per_relay_parent.get(&new_relay_parent_interest) {
......@@ -558,7 +558,7 @@ where
{
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "bitfield-distribution",
name: "bitfield-distribution-subsystem",
future: Box::pin(async move { Self::run(ctx) }.map(|_| ())),
}
}
......@@ -870,7 +870,7 @@ mod test {
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, GAIN_VALID_MESSAGE_FIRST)
assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
}
);
......@@ -887,7 +887,7 @@ mod test {
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, GAIN_VALID_MESSAGE)
assert_eq!(rep, BENEFIT_VALID_MESSAGE)
}
);
......@@ -993,7 +993,7 @@ mod test {
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, GAIN_VALID_MESSAGE_FIRST)
assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
}
);
......
......@@ -229,4 +229,4 @@ pub fn make_subsystem_context<M, S>(spawn: S)
rx: all_messages_rx
},
)
}
}
\ No newline at end of file
......@@ -220,4 +220,4 @@ impl<C: SubsystemContext> Subsystem<C> for DummySubsystem {
future,
}
}
}
}
\ No newline at end of file
......@@ -188,12 +188,6 @@ impl NetworkBridgeMessage {
/// Availability Distribution Message.
#[derive(Debug)]
pub enum AvailabilityDistributionMessage {
/// Distribute an availability chunk to other validators.
DistributeChunk(Hash, ErasureChunk),
/// Fetch an erasure chunk from networking by candidate hash and chunk index.
FetchChunk(Hash, u32),
/// Event from the network bridge.
NetworkBridgeUpdate(NetworkBridgeEvent),
}
......@@ -202,8 +196,6 @@ impl AvailabilityDistributionMessage {
/// If the current variant contains the relay parent hash, return it.
pub fn relay_parent(&self) -> Option<Hash> {
match self {
Self::DistributeChunk(hash, _) => Some(*hash),
Self::FetchChunk(hash, _) => Some(*hash),
Self::NetworkBridgeUpdate(_) => None,
}
}
......@@ -255,7 +247,7 @@ pub enum AvailabilityStoreMessage {
/// megabytes of data to get a single bit of information.
QueryDataAvailability(Hash, oneshot::Sender<bool>),
/// Query an `ErasureChunk` from the AV store.
/// Query an `ErasureChunk` from the AV store by the candidate hash and validator index.
QueryChunk(Hash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),
/// Query whether an `ErasureChunk` exists within the AV Store.
......@@ -513,6 +505,6 @@ pub enum AllMessages {
AvailabilityStore(AvailabilityStoreMessage),
/// Message for the network bridge subsystem.
NetworkBridge(NetworkBridgeMessage),
/// Message for the Chain API subsystem
/// Message for the Chain API subsystem.
ChainApi(ChainApiMessage),
}
......@@ -33,7 +33,7 @@ pub use polkadot_core_primitives::BlockNumber as RelayChainBlockNumber;
/// Parachain head data included in the chain.
#[derive(PartialEq, Eq, Clone, PartialOrd, Ord, Encode, Decode, RuntimeDebug)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Default))]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Default, Hash))]
pub struct HeadData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
impl From<Vec<u8>> for HeadData {
......@@ -44,7 +44,7 @@ impl From<Vec<u8>> for HeadData {
/// Parachain validation code.
#[derive(Default, PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Hash))]
pub struct ValidationCode(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
impl From<Vec<u8>> for ValidationCode {
......@@ -186,7 +186,7 @@ impl<T: Encode + Decode + Default> AccountIdConversion<T> for Id {
/// Which origin a parachain's message to the relay chain should be dispatched from.
#[derive(Clone, PartialEq, Eq, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug))]
#[cfg_attr(feature = "std", derive(Debug, Hash))]
#[repr(u8)]
pub enum ParachainDispatchOrigin {
/// As a simple `Origin::Signed`, using `ParaId::account_id` as its value. This is good when
......@@ -215,7 +215,7 @@ impl sp_std::convert::TryFrom<u8> for ParachainDispatchOrigin {
/// A message from a parachain to its Relay Chain.
#[derive(Clone, PartialEq, Eq, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug))]
#[cfg_attr(feature = "std", derive(Debug, Hash))]
pub struct UpwardMessage {
/// The origin for the message to be sent from.
pub origin: ParachainDispatchOrigin,
......
......@@ -612,7 +612,7 @@ pub struct AvailableData {
/// A chunk of erasure-encoded block data.
#[derive(PartialEq, Eq, Clone, Encode, Decode, Default)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash))]
pub struct ErasureChunk {
/// The erasure-encoded chunk of data belonging to the candidate block.
pub chunk: Vec<u8>,
......@@ -624,8 +624,8 @@ pub struct ErasureChunk {
/// Statements that can be made about parachain candidates. These are the
/// actual values that are signed.
#[derive(Clone, PartialEq, Eq, Encode, Decode, Hash)]
#[cfg_attr(feature = "std", derive(Debug))]
#[derive(Clone, PartialEq, Eq, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug, Hash))]
pub enum CompactStatement {
/// Proposal of a parachain candidate.
#[codec(index = "1")]
......
......@@ -25,7 +25,7 @@ use runtime_primitives::traits::AppVerify;
use inherents::InherentIdentifier;
use sp_arithmetic::traits::{BaseArithmetic, Saturating, Zero};
use runtime_primitives::traits::{BlakeTwo256, Hash as HashT};
pub use runtime_primitives::traits::{BlakeTwo256, Hash as HashT};
// Export some core primitives.
pub use polkadot_core_primitives::v1::{
......@@ -106,7 +106,7 @@ pub fn validation_data_hash<N: Encode>(
/// A unique descriptor of the candidate receipt.
#[derive(PartialEq, Eq, Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug, Default))]
#[cfg_attr(feature = "std", derive(Debug, Default, Hash))]
pub struct CandidateDescriptor<H = Hash> {
/// The ID of the para this is a candidate for.
pub para_id: Id,
......@@ -176,7 +176,7 @@ pub struct FullCandidateReceipt<H = Hash, N = BlockNumber> {
/// A candidate-receipt with commitments directly included.
#[derive(PartialEq, Eq, Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug, Default))]
#[cfg_attr(feature = "std", derive(Debug, Default, Hash))]
pub struct CommittedCandidateReceipt<H = Hash> {
/// The descriptor of the candidate.
pub descriptor: CandidateDescriptor<H>,
......@@ -266,7 +266,7 @@ pub struct GlobalValidationData<N = BlockNumber> {
/// Commitments made in a `CandidateReceipt`. Many of these are outputs of validation.
#[derive(PartialEq, Eq, Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug, Default))]
#[cfg_attr(feature = "std", derive(Debug, Default, Hash))]
pub struct CandidateCommitments {
/// Fees paid from the chain to the relay chain validators.
pub fees: Balance,
......@@ -484,7 +484,7 @@ impl CoreAssignment {
/// Validation data omitted from most candidate descriptor structs, as it can be derived from the
/// relay-parent.
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq, Debug))]
#[cfg_attr(feature = "std", derive(PartialEq, Debug, Default))]
pub struct OmittedValidationData {
/// The global validation schedule.
pub global_validation: GlobalValidationData,
......
......@@ -23,16 +23,17 @@ Output:
For each relay-parent in our local view update, look at all backed candidates pending availability. Distribute via gossip all erasure chunks for all candidates that we have to peers.
We define an operation `live_candidates(relay_heads) -> Set<CommittedCandidateReceipt>` which returns a set of [`CommittedCandidateReceipt`s](../../types/candidate.md#committed-candidate-receipt) a given set of relay chain heads that implies a set of candidates whose availability chunks should be currently gossiped. This is defined as all candidates pending availability in any of those relay-chain heads or any of their last `K` ancestors. We assume that state is not pruned within `K` blocks of the chain-head.
We define an operation `live_candidates(relay_heads) -> Set<CommittedCandidateReceipt>` which returns a set of [`CommittedCandidateReceipt`s](../../types/candidate.md#committed-candidate-receipt).
This is defined as all candidates pending availability in any of those relay-chain heads or any of their last `K` ancestors in the same session. We assume that state is not pruned within `K` blocks of the chain-head. `K` commonly is small and is currently fixed to `K=3`.
We will send any erasure-chunks that correspond to candidates in `live_candidates(peer_most_recent_view_update)`. Likewise, we only accept and forward messages pertaining to a candidate in `live_candidates(current_heads)`. Each erasure chunk should be accompanied by a merkle proof that it is committed to by the erasure trie root in the candidate receipt, and this gossip system is responsible for checking such proof.
We will send any erasure-chunks that correspond to candidates in `live_candidates(peer_most_recent_view_update)`.
Likewise, we only accept and forward messages pertaining to a candidate in `live_candidates(current_heads)`.
Each erasure chunk should be accompanied by a merkle proof that it is committed to by the erasure trie root in the candidate receipt, and this gossip system is responsible for checking such proof.
We re-attempt to send anything live to a peer upon any view update from that peer.
On our view change, for all live candidates, we will check if we have the PoV by issuing a `QueryPoV` message and waiting for the response. If the query returns `Some`, we will perform the erasure-coding and distribute all messages to peers that will accept them.
On our view change, for all live candidates, we will check if we have the PoV by issuing a `QueryAvailabileData` message and waiting for the response. If the query returns `Some`, we will perform the erasure-coding and distribute all messages to peers that will accept them.
If we are operating as a validator, we note our index `i` in the validator set and keep the `i`th availability chunk for any live candidate, as we receive it. We keep the chunk and its merkle proof in the [Availability Store](../utility/availability-store.md) by sending a `StoreChunk` command. This includes chunks and proofs generated as the result of a successful `QueryPoV`.
> TODO: back-and-forth is kind of ugly but drastically simplifies the pruning in the availability store, as it creates an invariant that chunks are only stored if the candidate was actually backed
>
> K=3?
The back-and-forth seems suboptimal at first glance, but drastically simplifies the pruning in the availability store, as it creates an invariant that chunks are only stored if the candidate was actually backed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment