Unverified Commit c58edafe authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

address review comments from 2104 (#2136)

parent e28bcbd2
Pipeline #117556 passed with stages
in 24 minutes and 40 seconds
......@@ -135,10 +135,10 @@ struct ProtocolState {
/// Our own view.
view: View,
/// Caches a mapping of relay parents or ancestor to live candidate receipts.
/// Caches a mapping of relay parents or ancestor to live candidate hashes.
/// Allows fast intersection of live candidates with views and consecutive unioning.
/// Maps relay parent / ancestor -> candidate receipts.
receipts: HashMap<Hash, HashSet<CandidateHash>>,
/// Maps relay parent / ancestor -> candidate hashes.
live_under: HashMap<Hash, HashSet<CandidateHash>>,
/// Track things needed to start and stop work on a particular relay parent.
per_relay_parent: HashMap<Hash, PerRelayParent>,
......@@ -217,7 +217,9 @@ impl ProtocolState {
candidates: HashMap<CandidateHash, FetchedLiveCandidate>,
ancestors: Vec<Hash>,
) {
let candidate_hashes: Vec<_> = candidates.keys().cloned().collect();
let per_relay_parent = self.per_relay_parent.entry(relay_parent).or_default();
per_relay_parent.ancestors = ancestors;
per_relay_parent.live_candidates.extend(candidates.keys().cloned());
// register the relation of relay_parent to candidate..
for (receipt_hash, fetched) in candidates {
......@@ -232,10 +234,6 @@ impl ProtocolState {
}
per_candidate.live_in.insert(relay_parent);
}
let per_relay_parent = self.per_relay_parent.entry(relay_parent).or_default();
per_relay_parent.ancestors = ancestors;
per_relay_parent.live_candidates.extend(candidate_hashes);
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
......@@ -254,15 +252,15 @@ impl ProtocolState {
}
}
// Removes all entries from receipts which aren't referenced in the ancestry of
// Removes all entries from live_under which aren't referenced in the ancestry of
// one of our live relay-chain heads.
fn clean_up_receipts_cache(&mut self) {
fn clean_up_live_under_cache(&mut self) {
let extended_view: HashSet<_> = self.per_relay_parent.iter()
.map(|(r_hash, v)| v.ancestors.iter().cloned().chain(std::iter::once(*r_hash)))
.map(|(r_hash, v)| v.ancestors.iter().cloned().chain(iter::once(*r_hash)))
.flatten()
.collect();
self.receipts.retain(|ancestor_hash, _| extended_view.contains(ancestor_hash));
self.live_under.retain(|ancestor_hash, _| extended_view.contains(ancestor_hash));
}
}
......@@ -337,7 +335,7 @@ where
let validators = query_validators(ctx, *added).await?;
let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await;
let (candidates, ancestors)
= query_live_candidates(ctx, &mut state.receipts, *added).await?;
= query_live_candidates(ctx, &mut state.live_under, *added).await?;
state.add_relay_parent(
*added,
......@@ -424,7 +422,7 @@ where
// cleanup the removed relay parents and their states
old_view.difference(&view).for_each(|r| state.remove_relay_parent(r));
state.clean_up_receipts_cache();
state.clean_up_live_under_cache();
Ok(())
}
......@@ -772,13 +770,13 @@ enum FetchedLiveCandidate {
/// This returns a set of all candidate hashes pending availability within the state
/// of the explicitly referenced relay heads.
///
/// This also queries the provided `receipts` cache before reaching into the
/// This also queries the provided `live_under` cache before reaching into the
/// runtime and updates it with the information learned.
#[tracing::instrument(level = "trace", skip(ctx, relay_blocks, receipts), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, relay_blocks, live_under), fields(subsystem = LOG_TARGET))]
async fn query_pending_availability_at<Context>(
ctx: &mut Context,
relay_blocks: impl IntoIterator<Item = Hash>,
receipts: &mut HashMap<Hash, HashSet<CandidateHash>>,
live_under: &mut HashMap<Hash, HashSet<CandidateHash>>,
) -> Result<HashMap<CandidateHash, FetchedLiveCandidate>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
......@@ -787,7 +785,7 @@ where
// fetch and fill out cache for each of these
for relay_parent in relay_blocks {
let receipts_for = match receipts.entry(relay_parent) {
let receipts_for = match live_under.entry(relay_parent) {
Entry::Occupied(e) => {
live_candidates.extend(
e.get().iter().cloned().map(|c| (c, FetchedLiveCandidate::Cached))
......@@ -819,14 +817,15 @@ where
/// Obtain all live candidates under a particular relay head. This implicitly includes
/// `K` ancestors of the head, such that the candidates pending availability in all of
/// the states of the head and the ancestors are unioned together to produce the
/// return type of this function. Each candidate hash is paired.
/// return type of this function. Each candidate hash is paired with information about
/// from where it was fetched.
///
/// This also updates all `receipts` cached by the protocol state and returns a list
/// This also updates all `live_under` cached by the protocol state and returns a list
/// of up to `K` ancestors of the relay-parent.
#[tracing::instrument(level = "trace", skip(ctx, receipts), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, live_under), fields(subsystem = LOG_TARGET))]
async fn query_live_candidates<Context>(
ctx: &mut Context,
receipts: &mut HashMap<Hash, HashSet<CandidateHash>>,
live_under: &mut HashMap<Hash, HashSet<CandidateHash>>,
relay_parent: Hash,
) -> Result<(HashMap<CandidateHash, FetchedLiveCandidate>, Vec<Hash>)>
where
......@@ -840,12 +839,12 @@ where
)
.await?;
// query the ones that were not present in the receipts cache and add them
// query the ones that were not present in the live_under cache and add them
// to it.
let live_candidates = query_pending_availability_at(
ctx,
ancestors.iter().cloned().chain(std::iter::once(relay_parent)),
receipts,
ancestors.iter().cloned().chain(iter::once(relay_parent)),
live_under,
).await?;
Ok((live_candidates, ancestors))
......
......@@ -1010,10 +1010,10 @@ fn clean_up_receipts_cache_unions_ancestors_and_view() {
let hash_c = [2u8; 32].into();
let hash_d = [3u8; 32].into();
state.receipts.insert(hash_a, HashSet::new());
state.receipts.insert(hash_b, HashSet::new());
state.receipts.insert(hash_c, HashSet::new());
state.receipts.insert(hash_d, HashSet::new());
state.live_under.insert(hash_a, HashSet::new());
state.live_under.insert(hash_b, HashSet::new());
state.live_under.insert(hash_c, HashSet::new());
state.live_under.insert(hash_d, HashSet::new());
state.per_relay_parent.insert(hash_a, PerRelayParent {
ancestors: vec![hash_b],
......@@ -1022,13 +1022,13 @@ fn clean_up_receipts_cache_unions_ancestors_and_view() {
state.per_relay_parent.insert(hash_c, PerRelayParent::default());
state.clean_up_receipts_cache();
state.clean_up_live_under_cache();
assert_eq!(state.receipts.len(), 3);
assert!(state.receipts.contains_key(&hash_a));
assert!(state.receipts.contains_key(&hash_b));
assert!(state.receipts.contains_key(&hash_c));
assert!(!state.receipts.contains_key(&hash_d));
assert_eq!(state.live_under.len(), 3);
assert!(state.live_under.contains_key(&hash_a));
assert!(state.live_under.contains_key(&hash_b));
assert!(state.live_under.contains_key(&hash_c));
assert!(!state.live_under.contains_key(&hash_d));
}
#[test]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment