Unverified Commit a891884e authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Improve Network Spans (#2169)

* utility functions for erasure-coding threshold

* add candidate-hash tag to candidate jaeger spans

* debug implementation for jaeger span

* add a span to each live candidate in availability dist.

* availability span covers only our piece

* fix tests

* keep span alive slightly longer

* remove spammy bitfield-gossip-received log

* Revert "remove spammy bitfield-gossip-received log"

This reverts commit 831a2db5

.

* add claimed validator to bitfield-gossip span

* add peer-id to handle-incoming span

* add peer-id to availability distribution span

* Update node/network/availability-distribution/src/lib.rs

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

* Update erasure-coding/src/lib.rs

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

* Update node/subsystem/src/jaeger.rs

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>
parent 50538aa5
Pipeline #118600 passed with stages
in 26 minutes and 38 seconds
......@@ -119,12 +119,18 @@ impl CodeParams {
.expect("this struct is not created with invalid shard number; qed")
}
}
fn code_params(n_validators: usize) -> Result<CodeParams, Error> {
/// Returns the maximum number of allowed, faulty chunks
/// which does not prevent recovery given all other pieces
/// are correct.
const fn n_faulty(n_validators: usize) -> Result<usize, Error> {
if n_validators > MAX_VALIDATORS { return Err(Error::TooManyValidators) }
if n_validators <= 1 { return Err(Error::NotEnoughValidators) }
let n_faulty = n_validators.saturating_sub(1) / 3;
Ok(n_validators.saturating_sub(1) / 3)
}
fn code_params(n_validators: usize) -> Result<CodeParams, Error> {
let n_faulty = n_faulty(n_validators)?;
let n_good = n_validators - n_faulty;
Ok(CodeParams {
......@@ -133,6 +139,13 @@ fn code_params(n_validators: usize) -> Result<CodeParams, Error> {
})
}
/// Obtain a threshold of chunks that should be enough to recover the data.
pub fn recovery_threshold(n_validators: usize) -> Result<usize, Error> {
let n_faulty = n_faulty(n_validators)?;
Ok(n_faulty + 1)
}
/// Obtain erasure-coded chunks for v0 `AvailableData`, one for each validator.
///
/// Works only up to 65536 validators, and `n_validators` must be non-zero.
......
......@@ -121,7 +121,7 @@ impl From<AvailabilityGossipMessage> for protocol_v1::AvailabilityDistributionMe
/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone, Debug)]
#[derive(Debug, Default)]
struct ProtocolState {
/// Track all active peers and their views
/// to determine what is relevant to them.
......@@ -142,7 +142,7 @@ struct ProtocolState {
per_candidate: HashMap<CandidateHash, PerCandidate>,
}
#[derive(Debug, Clone, Default)]
#[derive(Debug)]
struct PerCandidate {
/// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that.
/// This is _across_ peers and not specific to a particular one.
......@@ -166,13 +166,30 @@ struct PerCandidate {
/// The set of relay chain blocks this appears to be live in.
live_in: HashSet<Hash>,
/// A Jaeger span relating to this candidate.
span: jaeger::JaegerSpan,
}
impl PerCandidate {
/// Returns `true` iff the given `validator_index` is required by the given `peer`.
fn message_required_by_peer(&self, peer: &PeerId, validator_index: &ValidatorIndex) -> bool {
self.received_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
&& self.sent_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
fn message_required_by_peer(&self, peer: &PeerId, validator_index: ValidatorIndex) -> bool {
self.received_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true)
&& self.sent_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true)
}
/// Add a chunk to the message vault. Overwrites anything that was already present.
fn add_message(&mut self, chunk_index: u32, message: AvailabilityGossipMessage) {
let _ = self.message_vault.insert(chunk_index, message);
}
/// Clean up the span if we've got our own chunk.
fn drop_span_after_own_availability(&mut self) {
if let Some(validator_index) = self.validator_index {
if self.message_vault.contains_key(&validator_index) {
self.span = jaeger::JaegerSpan::Disabled;
}
}
}
}
......@@ -195,12 +212,10 @@ impl ProtocolState {
&'a self,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
) -> HashSet<CandidateHash> {
relay_parents
.into_iter()
.filter_map(|r| self.per_relay_parent.get(r))
.map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned())
.flatten()
.collect()
cached_live_candidates_unioned(
&self.per_relay_parent,
relay_parents
)
}
#[tracing::instrument(level = "trace", skip(candidates), fields(subsystem = LOG_TARGET))]
......@@ -218,16 +233,32 @@ impl ProtocolState {
// register the relation of relay_parent to candidate..
for (receipt_hash, fetched) in candidates {
let per_candidate = self.per_candidate.entry(receipt_hash).or_default();
// Cached candidates already have entries and thus don't need this
// information to be set.
if let FetchedLiveCandidate::Fresh(descriptor) = fetched {
per_candidate.validator_index = validator_index.clone();
per_candidate.validators = validators.clone();
per_candidate.descriptor = descriptor;
}
per_candidate.live_in.insert(relay_parent);
let candidate_entry = match self.per_candidate.entry(receipt_hash) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => {
if let FetchedLiveCandidate::Fresh(descriptor) = fetched {
e.insert(PerCandidate {
message_vault: HashMap::new(),
received_messages: HashMap::new(),
sent_messages: HashMap::new(),
validators: validators.clone(),
validator_index,
descriptor,
live_in: HashSet::new(),
span: if validator_index.is_some() {
jaeger::candidate_hash_span(&receipt_hash, "pending-availability")
} else {
jaeger::JaegerSpan::Disabled
},
})
} else {
tracing::warn!(target: LOG_TARGET, "No `per_candidate` but not fresh. logic error");
continue;
}
}
};
candidate_entry.live_in.insert(relay_parent);
}
}
......@@ -259,6 +290,18 @@ impl ProtocolState {
}
}
fn cached_live_candidates_unioned<'a>(
per_relay_parent: &'a HashMap<Hash, PerRelayParent>,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
) -> HashSet<CandidateHash> {
relay_parents
.into_iter()
.filter_map(|r| per_relay_parent.get(r))
.map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned())
.flatten()
.collect()
}
/// Deal with network bridge updates and track what needs to be tracked
/// which depends on the message type received.
#[tracing::instrument(level = "trace", skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))]
......@@ -297,8 +340,6 @@ where
}
};
let mut _span = jaeger::hash_span(&gossiped_availability.candidate_hash.0, "availability-message-received");
process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics)
.await?;
}
......@@ -344,9 +385,11 @@ where
// handle all candidates
for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) {
// If we are not a validator for this candidate, let's skip it.
if state.per_candidate.entry(candidate_hash).or_default().validator_index.is_none() {
continue
}
match state.per_candidate.get(&candidate_hash) {
None => continue,
Some(c) if c.validator_index.is_none() => continue,
Some(_) => {},
};
// check if the availability is present in the store exists
if !query_data_availability(ctx, candidate_hash).await? {
......@@ -367,12 +410,18 @@ where
.map(|(peer, _view)| peer.clone())
.collect();
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
let per_candidate = state.per_candidate.get_mut(&candidate_hash)
.expect("existence checked above; qed");
let validator_count = per_candidate.validators.len();
// distribute all erasure messages to interested peers
for chunk_index in 0u32..(validator_count as u32) {
let _span = {
let mut span = per_candidate.span.child("load-and-distribute");
span.add_string_tag("chunk-index", &format!("{}", chunk_index));
span
};
let message = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
tracing::trace!(
target: LOG_TARGET,
......@@ -389,10 +438,15 @@ where
"Retrieved chunk from availability storage",
);
AvailabilityGossipMessage {
let msg = AvailabilityGossipMessage {
candidate_hash,
erasure_chunk,
}
};
per_candidate.add_message(chunk_index, msg.clone());
msg
} else {
tracing::error!(
target: LOG_TARGET,
......@@ -407,12 +461,15 @@ where
let peers = peers
.iter()
.filter(|peer| per_candidate.message_required_by_peer(peer, &chunk_index))
.filter(|peer| per_candidate.message_required_by_peer(peer, chunk_index))
.cloned()
.collect::<Vec<_>>();
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
}
// traces are better if we wait until the loop is done to drop.
per_candidate.drop_span_after_own_availability();
}
// cleanup the removed relay parents and their states
......@@ -442,10 +499,6 @@ where
.insert(message.erasure_chunk.index);
}
per_candidate
.message_vault
.insert(message.erasure_chunk.index, message.clone());
if !peers.is_empty() {
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
peers.clone(),
......@@ -482,7 +535,10 @@ where
// Send all messages we've seen before and the peer is now interested in.
for candidate_hash in added_candidates {
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
let per_candidate = match state.per_candidate.get_mut(&candidate_hash) {
Some(p) => p,
None => continue,
};
// obtain the relevant chunk indices not sent yet
let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex))
......@@ -493,7 +549,7 @@ where
per_candidate
.message_vault
.get(&erasure_chunk_index)
.filter(|_| per_candidate.message_required_by_peer(&origin, &erasure_chunk_index))
.filter(|_| per_candidate.message_required_by_peer(&origin, erasure_chunk_index))
})
.cloned()
.collect::<HashSet<_>>();
......@@ -540,12 +596,10 @@ where
let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter());
// check if the candidate is of interest
let descriptor = if live_candidates.contains(&message.candidate_hash) {
let candidate_entry = if live_candidates.contains(&message.candidate_hash) {
state.per_candidate
.get(&message.candidate_hash)
.get_mut(&message.candidate_hash)
.expect("All live candidates are contained in per_candidate; qed")
.descriptor
.clone()
} else {
tracing::trace!(
target: LOG_TARGET,
......@@ -557,105 +611,140 @@ where
return Ok(())
};
// Handle a duplicate before doing expensive checks.
if let Some(existing) = candidate_entry.message_vault.get(&message.erasure_chunk.index) {
let span = candidate_entry.span.child("handle-duplicate");
// check if this particular erasure chunk was already sent by that peer before
{
let _span = span.child("check-entry");
let received_set = candidate_entry
.received_messages
.entry(origin.clone())
.or_default();
if !received_set.insert(message.erasure_chunk.index) {
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return Ok(());
}
}
// check that the message content matches what we have already before rewarding
// the peer.
{
let _span = span.child("check-accurate");
if existing == &message {
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
} else {
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
}
}
return Ok(());
}
let span = {
let mut span = candidate_entry.span.child("process-new-chunk");
span.add_string_tag("peer-id", &origin.to_base58());
span
};
// check the merkle proof against the erasure root in the candidate descriptor.
let anticipated_hash = match branch_hash(
&descriptor.erasure_root,
&message.erasure_chunk.proof,
message.erasure_chunk.index as usize,
) {
Ok(hash) => hash,
Err(e) => {
let anticipated_hash = {
let _span = span.child("check-merkle-root");
match branch_hash(
&candidate_entry.descriptor.erasure_root,
&message.erasure_chunk.proof,
message.erasure_chunk.index as usize,
) {
Ok(hash) => hash,
Err(e) => {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?message.candidate_hash,
peer = %origin,
error = ?e,
"Failed to calculate chunk merkle proof",
);
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(());
},
}
};
{
let _span = span.child("check-chunk-hash");
let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?message.candidate_hash,
peer = %origin,
error = ?e,
"Failed to calculate chunk merkle proof",
"Peer sent chunk with invalid merkle proof",
);
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(());
},
};
let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?message.candidate_hash,
peer = %origin,
"Peer send chunk with invalid merkle proof",
);
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(());
}
}
let erasure_chunk_index = &message.erasure_chunk.index;
{
let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default();
// insert into known messages and change reputation. we've guaranteed
// above that the message vault doesn't contain any message under this
// chunk index already.
// check if this particular erasure chunk was already sent by that peer before
{
let received_set = per_candidate
candidate_entry
.received_messages
.entry(origin.clone())
.or_default();
if !received_set.insert(*erasure_chunk_index) {
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return Ok(());
.or_default()
.insert(message.erasure_chunk.index);
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await;
// save the chunk for our index
if Some(message.erasure_chunk.index) == candidate_entry.validator_index {
let _span = span.child("store-our-chunk");
if store_chunk(
ctx,
message.candidate_hash,
candidate_entry.descriptor.relay_parent,
message.erasure_chunk.index,
message.erasure_chunk.clone(),
).await?.is_err() {
tracing::warn!(
target: LOG_TARGET,
"Failed to store erasure chunk to availability store"
);
}
}
// insert into known messages and change reputation
if per_candidate
.message_vault
.insert(*erasure_chunk_index, message.clone())
.is_some()
{
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
} else {
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await;
// save the chunk for our index
if Some(*erasure_chunk_index) == per_candidate.validator_index {
if store_chunk(
ctx,
message.candidate_hash,
descriptor.relay_parent,
message.erasure_chunk.index,
message.erasure_chunk.clone(),
).await?.is_err() {
tracing::warn!(
target: LOG_TARGET,
"Failed to store erasure chunk to availability store"
);
}
}
};
candidate_entry.add_message(message.erasure_chunk.index, message.clone());
candidate_entry.drop_span_after_own_availability();
}
// condense the peers to the peers with interest on the candidate
let peers = state
.peer_views
.clone()
.into_iter()
.filter(|(_, view)| {
// peers view must contain the candidate hash too
state
.cached_live_candidates_unioned(view.heads.iter())
.contains(&message.candidate_hash)
})
.map(|(peer, _)| -> PeerId { peer.clone() })
.collect::<Vec<_>>();
let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default();
// condense the peers to the peers with interest on the candidate
let peers = {
let _span = span.child("determine-recipient-peers");
let per_relay_parent = &state.per_relay_parent;
let peers = peers
.into_iter()
.filter(|peer| per_candidate.message_required_by_peer(peer, erasure_chunk_index))
.collect::<Vec<_>>();
state
.peer_views
.clone()
.into_iter()
.filter(|(_, view)| {
// peers view must contain the candidate hash too
cached_live_candidates_unioned(
per_relay_parent,
view.heads.iter(),
).contains(&message.candidate_hash)
})
.map(|(peer, _)| -> PeerId { peer.clone() })
.filter(|peer| candidate_entry.message_required_by_peer(peer, message.erasure_chunk.index))
.collect::<Vec<_>>()
};
drop(span);
// gossip that message to interested peers
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
send_tracked_gossip_messages_to_peers(ctx, candidate_entry, metrics, peers, iter::once(message)).await;
Ok(())
}
......@@ -681,7 +770,14 @@ impl AvailabilityDistributionSubsystem {
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let mut state = ProtocolState::default();
let mut state = ProtocolState {
peer_views: HashMap::new(),
view: Default::default(),
live_under: HashMap::new(),
per_relay_parent: HashMap::new(),
per_candidate: HashMap::new(),
};
self.run_inner(ctx, &mut state).await
}
......
......@@ -50,6 +50,19 @@ fn chunk_protocol_message(
)
}
fn make_per_candidate() -> PerCandidate {
PerCandidate {
live_in: HashSet::new(),
message_vault: HashMap::new(),
received_messages: HashMap::new(),
sent_messages: HashMap::new(),
validators: Vec::new(),
validator_index: None,
descriptor: Default::default(),
span: jaeger::JaegerSpan::Disabled,
}
}
struct TestHarness {
virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
}
......@@ -1024,9 +1037,10 @@ fn remove_relay_parent_only_removes_per_candidate_if_final() {
live_candidates: std::iter::once(candidate_hash_a).collect(),
});
state.per_candidate.insert(candidate_hash_a, PerCandidate {
live_in: vec![hash_a, hash_b].into_iter().collect(),
..Default::default()
state.per_candidate.insert(candidate_hash_a, {
let mut per_candidate = make_per_candidate();
per_candidate.live_in = vec![hash_a, hash_b].into_iter().collect();
per_candidate
});
state.remove_relay_parent(&hash_a);
......@@ -1052,6 +1066,8 @@ fn add_relay_parent_includes_all_live_candidates() {
let candidate_hash_a = CandidateHash([10u8; 32].into());
let candidate_hash_b = CandidateHash([11u8; 32].into());
state.per_candidate.insert(candidate_hash_b, make_per_candidate());
let candidates = vec![
(candidate_hash_a, FetchedLiveCandidate::Fresh(Default::default())),
(candidate_hash_b, FetchedLiveCandidate::Cached),
......
......@@ -495,8 +495,16 @@ where
NetworkBridgeEvent::PeerMessage(remote, message) => {
match message {
protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
let mut _span = jaeger::hash_span(&relay_parent, "bitfield-gossip-received");
_span.add_string_tag("peer-id", &remote.to_base58());
let mut _span = {
let mut span = jaeger::hash_span(&relay_parent, "bitfield-gossip-received");
span.add_string_tag("peer-id", &remote.to_base58());
span.add_string_tag(
"claimed-validator",
&format!("{}", bitfield.validator_index()),
);
span
};
tracing::trace!(target: LOG_TARGET, peer_id = %remote, "received bitfield gossip from peer");
let gossiped_bitfield = BitfieldGossipMessage {
relay_parent,
......
......@@ -731,6 +731,10 @@ async fn handle_incoming_message<'a>(
"candidate-hash",
&format!("{:?}", candidate_hash.0),
);
span.add_string_tag(
"peer-id",
&peer.to_base58(),
);
span
};
......
......@@ -129,6 +129,12 @@ impl JaegerSpan {
}
}
impl std::fmt::Debug for JaegerSpan {