Unverified Commit affa668a authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Fix bug and further optimizations in availability distribution (#2104)



* Fix bug and further optimizations in availability distribution

- There was a bug that resulted in only getting one candidate per block
as the candidates were put into the hashmap with the relay block hash as
key. The solution for this is to use the candidate hash and the relay
block hash as key.
- We stored received/sent messages with the candidate hash and chunk
index as key. The candidate hash wasn't required in this case, as the
messages are already stored per candidate.

* Update node/core/bitfield-signing/src/lib.rs

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>

* Remove the reverse map

* major refactor of receipts & query_live

* finish refactoring

remove ancestory mapping,

improve relay-parent cleanup & receipts-cache cleanup,
add descriptor to `PerCandidate`

* rename and rewrite query_pending_availability

* add a bunch of consistency tests

* Add some last changes

* xy

* fz

* Make it compile again

* Fix one test

* Fix logging

* Remove some buggy code

* Make tests work again

* Move stuff around

* Remove dbg

* Remove state from test_harness

* More refactor and new test

* New test and fixes

* Move metric

* Remove "duplicated code"

* Fix tests

* New test

* Change break to continue

* Update node/core/av-store/src/lib.rs

* Update node/core/av-store/src/lib.rs

* Update node/core/bitfield-signing/src/lib.rs

Co-authored-by: Fedor Sakharov's avatarFedor Sakharov <fedor.sakharov@gmail.com>

* update guide to match live_candidates changes

* add comment

* fix bitfield signing

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>
Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>
Co-authored-by: Fedor Sakharov's avatarFedor Sakharov <fedor.sakharov@gmail.com>
parent c429e15c
Pipeline #117513 passed with stages
in 27 minutes and 31 seconds
......@@ -4914,10 +4914,8 @@ name = "polkadot-availability-distribution"
version = "0.1.0"
dependencies = [
"assert_matches",
"env_logger 0.8.2",
"futures 0.3.8",
"futures-timer 3.0.2",
"log",
"maplit",
"parity-scale-codec",
"polkadot-erasure-coding",
"polkadot-node-network-protocol",
......@@ -4926,11 +4924,11 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sc-keystore",
"smallvec 1.5.1",
"sp-application-crypto",
"sp-core",
"sp-keyring",
"sp-keystore",
"sp-tracing",
"thiserror",
"tracing",
"tracing-futures",
......
......@@ -713,25 +713,51 @@ where
match msg {
QueryAvailableData(hash, tx) => {
tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data))
.map_err(|_| oneshot::Canceled)?;
tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?;
}
QueryDataAvailability(hash, tx) => {
tx.send(available_data(&subsystem.inner, &hash).is_some())
.map_err(|_| oneshot::Canceled)?;
let result = available_data(&subsystem.inner, &hash).is_some();
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?hash,
availability = ?result,
"Queried data availability",
);
tx.send(result).map_err(|_| oneshot::Canceled)?;
}
QueryChunk(hash, id, tx) => {
tx.send(get_chunk(subsystem, &hash, id)?)
.map_err(|_| oneshot::Canceled)?;
tx.send(get_chunk(subsystem, &hash, id)?).map_err(|_| oneshot::Canceled)?;
}
QueryChunkAvailability(hash, id, tx) => {
tx.send(get_chunk(subsystem, &hash, id)?.is_some())
.map_err(|_| oneshot::Canceled)?;
let result = get_chunk(subsystem, &hash, id).map(|r| r.is_some());
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?hash,
availability = ?result,
"Queried chunk availability",
);
tx.send(result?).map_err(|_| oneshot::Canceled)?;
}
StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => {
let chunk_index = chunk.index;
// Current block number is relay_parent block number + 1.
let block_number = get_block_number(ctx, relay_parent).await? + 1;
match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) {
let result = store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number);
tracing::trace!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
%block_number,
?result,
"Stored chunk",
);
match result {
Err(e) => {
tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
return Err(e);
......@@ -742,7 +768,11 @@ where
}
}
StoreAvailableData(hash, id, n_validators, av_data, tx) => {
match store_available_data(subsystem, &hash, id, n_validators, av_data) {
let result = store_available_data(subsystem, &hash, id, n_validators, av_data);
tracing::trace!(target: LOG_TARGET, candidate_hash = ?hash, ?result, "Stored available data");
match result {
Err(e) => {
tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
return Err(e);
......
......@@ -78,6 +78,8 @@ async fn get_core_availability(
) -> Result<bool, Error> {
let span = jaeger::hash_span(&relay_parent, "core_availability");
if let CoreState::Occupied(core) = core {
tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "Getting core availability");
let _span = span.child("occupied");
let (tx, rx) = oneshot::channel();
sender
......@@ -93,7 +95,10 @@ async fn get_core_availability(
let committed_candidate_receipt = match rx.await? {
Ok(Some(ccr)) => ccr,
Ok(None) => return Ok(false),
Ok(None) => {
tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "No committed candidate");
return Ok(false)
},
Err(e) => {
// Don't take down the node on runtime API errors.
tracing::warn!(target: LOG_TARGET, err = ?e, "Encountered a runtime API error");
......@@ -103,6 +108,7 @@ async fn get_core_availability(
drop(_span);
let _span = span.child("query chunk");
let candidate_hash = committed_candidate_receipt.hash();
let (tx, rx) = oneshot::channel();
sender
......@@ -110,13 +116,24 @@ async fn get_core_availability(
.await
.send(
AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability(
committed_candidate_receipt.hash(),
candidate_hash,
validator_idx,
tx,
)).into(),
)
.await?;
return rx.await.map_err(Into::into);
let res = rx.await.map_err(Into::into);
tracing::trace!(
target: LOG_TARGET,
para_id = %core.para_id,
availability = ?res,
?candidate_hash,
"Candidate availability",
);
return res;
}
Ok(false)
......
......@@ -23,9 +23,7 @@ polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpe
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3.0.2"
env_logger = "0.8.2"
assert_matches = "1.4.0"
smallvec = "1.5.1"
log = "0.4.11"
maplit = "1.0"
......@@ -38,6 +38,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId,
SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
CandidateDescriptor,
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
......@@ -50,6 +51,7 @@ use polkadot_subsystem::{
SubsystemContext, SubsystemError,
};
use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::iter;
use thiserror::Error;
......@@ -116,6 +118,12 @@ pub struct AvailabilityGossipMessage {
pub erasure_chunk: ErasureChunk,
}
impl From<AvailabilityGossipMessage> for protocol_v1::AvailabilityDistributionMessage {
fn from(message: AvailabilityGossipMessage) -> Self {
Self::Chunk(message.candidate_hash, message.erasure_chunk)
}
}
/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone, Debug)]
......@@ -129,19 +137,8 @@ struct ProtocolState {
/// Caches a mapping of relay parents or ancestor to live candidate receipts.
/// Allows fast intersection of live candidates with views and consecutive unioning.
/// Maps relay parent / ancestor -> live candidate receipts + its hash.
receipts: HashMap<Hash, HashSet<(CandidateHash, CommittedCandidateReceipt)>>,
/// Allow reverse caching of view checks.
/// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`.
/// Note that the presence of this is not sufficient to determine if deletion is OK, i.e.
/// two histories could cover this.
reverse: HashMap<CandidateHash, Hash>,
/// Keeps track of which candidate receipts are required due to ancestors of which relay parents
/// of our view.
/// Maps ancestor -> relay parents in view
ancestry: HashMap<Hash, HashSet<Hash>>,
/// Maps relay parent / ancestor -> candidate receipts.
receipts: HashMap<Hash, HashSet<CandidateHash>>,
/// Track things needed to start and stop work on a particular relay parent.
per_relay_parent: HashMap<Hash, PerRelayParent>,
......@@ -157,24 +154,30 @@ struct PerCandidate {
/// candidate hash + erasure chunk index -> gossip message
message_vault: HashMap<u32, AvailabilityGossipMessage>,
/// Track received candidate hashes and validator indices from peers.
received_messages: HashMap<PeerId, HashSet<(CandidateHash, ValidatorIndex)>>,
/// Track received erasure chunk indices per peer.
received_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,
/// Track already sent candidate hashes and the erasure chunk index to the peers.
sent_messages: HashMap<PeerId, HashSet<(CandidateHash, ValidatorIndex)>>,
/// Track sent erasure chunk indices per peer.
sent_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,
/// The set of validators.
validators: Vec<ValidatorId>,
/// If this node is a validator, note the index in the validator set.
validator_index: Option<ValidatorIndex>,
/// The descriptor of this candidate.
descriptor: CandidateDescriptor,
/// The set of relay chain blocks this appears to be live in.
live_in: HashSet<Hash>,
}
impl PerCandidate {
/// Returns `true` iff the given `message` is required by the given `peer`.
fn message_required_by_peer(&self, peer: &PeerId, message: &(CandidateHash, ValidatorIndex)) -> bool {
self.received_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true)
&& self.sent_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true)
/// Returns `true` iff the given `validator_index` is required by the given `peer`.
fn message_required_by_peer(&self, peer: &PeerId, validator_index: &ValidatorIndex) -> bool {
self.received_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
&& self.sent_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
}
}
......@@ -182,139 +185,85 @@ impl PerCandidate {
struct PerRelayParent {
/// Set of `K` ancestors for this relay parent.
ancestors: Vec<Hash>,
/// Live candidates, according to this relay parent.
live_candidates: HashSet<CandidateHash>,
}
impl ProtocolState {
/// Collects the relay_parents ancestors including the relay parents themselfes.
#[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))]
fn extend_with_ancestors<'a>(
&'a self,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
) -> HashSet<Hash> {
relay_parents
.into_iter()
.map(|relay_parent| {
self.per_relay_parent
.get(relay_parent)
.into_iter()
.map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned())
.flatten()
.chain(iter::once(*relay_parent))
})
.flatten()
.collect::<HashSet<Hash>>()
}
/// Unionize all cached entries for the given relay parents and its ancestors.
/// Unionize all live candidate hashes of the given relay parents and their recent
/// ancestors.
///
/// Ignores all non existent relay parents, so this can be used directly with a peers view.
/// Returns a map from candidate hash -> receipt
/// Returns a set of candidate hashes.
#[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))]
fn cached_live_candidates_unioned<'a>(
&'a self,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
) -> HashMap<CandidateHash, CommittedCandidateReceipt> {
let relay_parents_and_ancestors = self.extend_with_ancestors(relay_parents);
relay_parents_and_ancestors
) -> HashSet<CandidateHash> {
relay_parents
.into_iter()
.filter_map(|relay_parent_or_ancestor| self.receipts.get(&relay_parent_or_ancestor))
.map(|receipt_set| receipt_set.into_iter())
.filter_map(|r| self.per_relay_parent.get(r))
.map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned())
.flatten()
.map(|(receipt_hash, receipt)| (receipt_hash.clone(), receipt.clone()))
.collect()
}
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn add_relay_parent<Context>(
#[tracing::instrument(level = "trace", skip(candidates), fields(subsystem = LOG_TARGET))]
fn add_relay_parent(
&mut self,
ctx: &mut Context,
relay_parent: Hash,
validators: Vec<ValidatorId>,
validator_index: Option<ValidatorIndex>,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let candidates = query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?;
candidates: HashMap<CandidateHash, FetchedLiveCandidate>,
ancestors: Vec<Hash>,
) {
let candidate_hashes: Vec<_> = candidates.keys().cloned().collect();
// register the relation of relay_parent to candidate..
// ..and the reverse association.
for (relay_parent_or_ancestor, (receipt_hash, receipt)) in candidates.clone() {
self.reverse
.insert(receipt_hash.clone(), relay_parent_or_ancestor.clone());
let per_candidate = self.per_candidate.entry(receipt_hash.clone()).or_default();
per_candidate.validator_index = validator_index.clone();
per_candidate.validators = validators.clone();
self.receipts
.entry(relay_parent_or_ancestor)
.or_default()
.insert((receipt_hash, receipt));
}
// collect the ancestors again from the hash map
let ancestors = candidates
.iter()
.filter_map(|(ancestor_or_relay_parent, _receipt)| {
if ancestor_or_relay_parent == &relay_parent {
None
} else {
Some(*ancestor_or_relay_parent)
}
})
.collect::<Vec<Hash>>();
// mark all the ancestors as "needed" by this newly added relay parent
for ancestor in ancestors.iter() {
self.ancestry
.entry(ancestor.clone())
.or_default()
.insert(relay_parent);
for (receipt_hash, fetched) in candidates {
let per_candidate = self.per_candidate.entry(receipt_hash).or_default();
// Cached candidates already have entries and thus don't need this
// information to be set.
if let FetchedLiveCandidate::Fresh(descriptor) = fetched {
per_candidate.validator_index = validator_index.clone();
per_candidate.validators = validators.clone();
per_candidate.descriptor = descriptor;
}
per_candidate.live_in.insert(relay_parent);
}
self.per_relay_parent
.entry(relay_parent)
.or_default()
.ancestors = ancestors;
Ok(())
let per_relay_parent = self.per_relay_parent.entry(relay_parent).or_default();
per_relay_parent.ancestors = ancestors;
per_relay_parent.live_candidates.extend(candidate_hashes);
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn remove_relay_parent(&mut self, relay_parent: &Hash) {
// we might be ancestor of some other relay_parent
if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) {
// if we were the last user, and it is
// not explicitly set to be worked on by the overseer
if descendants.is_empty() {
// remove from the ancestry index
self.ancestry.remove(relay_parent);
// and also remove the actual receipt
if let Some(candidates) = self.receipts.remove(relay_parent) {
candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); });
}
}
}
if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) {
// remove all "references" from the hash maps and sets for all ancestors
for ancestor in per_relay_parent.ancestors {
// one of our decendants might be ancestor of some other relay_parent
if let Some(ref mut descendants) = self.ancestry.get_mut(&ancestor) {
// we do not need this descendant anymore
descendants.remove(&relay_parent);
// if we were the last user, and it is
// not explicitly set to be worked on by the overseer
if descendants.is_empty() && !self.per_relay_parent.contains_key(&ancestor) {
// remove from the ancestry index
self.ancestry.remove(&ancestor);
// and also remove the actual receipt
if let Some(candidates) = self.receipts.remove(&ancestor) {
candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); });
}
for candidate_hash in per_relay_parent.live_candidates {
// Prune the candidate if this was the last member of our view
// to consider it live (including its ancestors).
if let Entry::Occupied(mut occ) = self.per_candidate.entry(candidate_hash) {
occ.get_mut().live_in.remove(relay_parent);
if occ.get().live_in.is_empty() {
occ.remove();
}
}
}
}
}
// Removes all entries from receipts which aren't referenced in the ancestry of
// one of our live relay-chain heads.
fn clean_up_receipts_cache(&mut self) {
let extended_view: HashSet<_> = self.per_relay_parent.iter()
.map(|(r_hash, v)| v.ancestors.iter().cloned().chain(std::iter::once(*r_hash)))
.flatten()
.collect();
self.receipts.retain(|ancestor_hash, _| extended_view.contains(ancestor_hash));
}
}
/// Deal with network bridge updates and track what needs to be tracked
......@@ -387,27 +336,30 @@ where
for added in view.difference(&old_view) {
let validators = query_validators(ctx, *added).await?;
let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await;
state
.add_relay_parent(ctx, *added, validators, validator_index)
.await?;
let (candidates, ancestors)
= query_live_candidates(ctx, &mut state.receipts, *added).await?;
state.add_relay_parent(
*added,
validators,
validator_index,
candidates,
ancestors,
);
}
// handle all candidates
for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(view.difference(&old_view)) {
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// assure the node has the validator role
if per_candidate.validator_index.is_none() {
continue;
};
for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) {
// If we are not a validator for this candidate, let's skip it.
if state.per_candidate.entry(candidate_hash).or_default().validator_index.is_none() {
continue
}
// check if the availability is present in the store exists
if !query_data_availability(ctx, candidate_hash).await? {
continue;
}
let validator_count = per_candidate.validators.len();
// obtain interested peers in the candidate hash
let peers: Vec<PeerId> = state
.peer_views
......@@ -417,77 +369,64 @@ where
// collect all direct interests of a peer w/o ancestors
state
.cached_live_candidates_unioned(view.heads.iter())
.contains_key(&candidate_hash)
.contains(&candidate_hash)
})
.map(|(peer, _view)| peer.clone())
.collect();
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
let validator_count = per_candidate.validators.len();
// distribute all erasure messages to interested peers
for chunk_index in 0u32..(validator_count as u32) {
// only the peers which did not receive this particular erasure chunk
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// obtain the chunks from the cache, if not fallback
// and query the availability store
let message_id = (candidate_hash, chunk_index);
let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
message.erasure_chunk.clone()
let message = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
tracing::trace!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
"Retrieved chunk from message vault",
);
message.clone()
} else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? {
erasure_chunk
tracing::trace!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
"Retrieved chunk from availability storage",
);
AvailabilityGossipMessage {
candidate_hash,
erasure_chunk,
}
} else {
tracing::error!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
"Availability store reported that we have the availability data, but we could not retrieve a chunk of it!",
);
continue;
};
debug_assert_eq!(erasure_chunk.index, chunk_index);
debug_assert_eq!(message.erasure_chunk.index, chunk_index);
let peers = peers
.iter()
.filter(|peer| per_candidate.message_required_by_peer(peer, &message_id))
.filter(|peer| per_candidate.message_required_by_peer(peer, &chunk_index))
.cloned()
.collect::<Vec<_>>();
let message = AvailabilityGossipMessage {
candidate_hash,
erasure_chunk,
};
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await;
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
}
}
// cleanup the removed relay parents and their states
let removed = old_view.difference(&view).collect::<Vec<_>>();
for removed in removed {
state.remove_relay_parent(&removed);
}
Ok(())
}
old_view.difference(&view).for_each(|r| state.remove_relay_parent(r));
state.clean_up_receipts_cache();
#[inline(always)]
async fn send_tracked_gossip_message_to_peers<Context>(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec<PeerId>,
message: AvailabilityGossipMessage,
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await
}
#[inline(always)]
async fn send_tracked_gossip_messages_to_peer<Context>(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peer: PeerId,