Unverified Commit af0aff96 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Add candidate info to OccupiedCore (#2134)

* guide: add candidate information to OccupiedCore

* add descriptor and hash to occupied core type

* guide: add candidate hash to inclusion

* runtime: return candidate info in core state

* bitfield signing: stop querying runtime as much

* minimize going to runtime in availability distribution

* fix availability distribution tests

* guide: remove para ID from Occupied core

* get all crates compiling
parent c58edafe
Pipeline #117578 passed with stages
in 31 minutes and 50 seconds
...@@ -78,37 +78,7 @@ async fn get_core_availability( ...@@ -78,37 +78,7 @@ async fn get_core_availability(
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let span = jaeger::hash_span(&relay_parent, "core-availability"); let span = jaeger::hash_span(&relay_parent, "core-availability");
if let CoreState::Occupied(core) = core { if let CoreState::Occupied(core) = core {
tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "Getting core availability");
let _span = span.child("occupied");
let (tx, rx) = oneshot::channel();
sender
.lock()
.await
.send(
AllMessages::from(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(core.para_id, tx),
)).into(),
)
.await?;
let committed_candidate_receipt = match rx.await? {
Ok(Some(ccr)) => ccr,
Ok(None) => {
tracing::trace!(target: LOG_TARGET, para_id = %core.para_id, "No committed candidate");
return Ok(false)
},
Err(e) => {
// Don't take down the node on runtime API errors.
tracing::warn!(target: LOG_TARGET, err = ?e, "Encountered a runtime API error");
return Ok(false);
}
};
drop(_span);
let _span = span.child("query chunk"); let _span = span.child("query chunk");
let candidate_hash = committed_candidate_receipt.hash();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
sender sender
...@@ -116,7 +86,7 @@ async fn get_core_availability( ...@@ -116,7 +86,7 @@ async fn get_core_availability(
.await .await
.send( .send(
AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability( AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability(
candidate_hash, core.candidate_hash,
validator_idx, validator_idx,
tx, tx,
)).into(), )).into(),
...@@ -127,9 +97,9 @@ async fn get_core_availability( ...@@ -127,9 +97,9 @@ async fn get_core_availability(
tracing::trace!( tracing::trace!(
target: LOG_TARGET, target: LOG_TARGET,
para_id = %core.para_id, para_id = %core.para_id(),
availability = ?res, availability = ?res,
?candidate_hash, ?core.candidate_hash,
"Candidate availability", "Candidate availability",
); );
...@@ -325,17 +295,18 @@ pub type BitfieldSigningSubsystem<Spawner, Context> = JobManager<Spawner, Contex ...@@ -325,17 +295,18 @@ pub type BitfieldSigningSubsystem<Spawner, Context> = JobManager<Spawner, Contex
mod tests { mod tests {
use super::*; use super::*;
use futures::{pin_mut, executor::block_on}; use futures::{pin_mut, executor::block_on};
use polkadot_primitives::v1::OccupiedCore; use polkadot_primitives::v1::{CandidateHash, OccupiedCore};
fn occupied_core(para_id: u32) -> CoreState { fn occupied_core(para_id: u32, candidate_hash: CandidateHash) -> CoreState {
CoreState::Occupied(OccupiedCore { CoreState::Occupied(OccupiedCore {
para_id: para_id.into(),
group_responsible: para_id.into(), group_responsible: para_id.into(),
next_up_on_available: None, next_up_on_available: None,
occupied_since: 100_u32, occupied_since: 100_u32,
time_out_at: 200_u32, time_out_at: 200_u32,
next_up_on_time_out: None, next_up_on_time_out: None,
availability: Default::default(), availability: Default::default(),
candidate_hash,
candidate_descriptor: Default::default(),
}) })
} }
...@@ -354,6 +325,9 @@ mod tests { ...@@ -354,6 +325,9 @@ mod tests {
).fuse(); ).fuse();
pin_mut!(future); pin_mut!(future);
let hash_a = CandidateHash(Hash::repeat_byte(1));
let hash_b = CandidateHash(Hash::repeat_byte(2));
loop { loop {
futures::select! { futures::select! {
m = receiver.next() => match m.unwrap() { m = receiver.next() => match m.unwrap() {
...@@ -363,29 +337,16 @@ mod tests { ...@@ -363,29 +337,16 @@ mod tests {
), ),
) => { ) => {
assert_eq!(relay_parent, rp); assert_eq!(relay_parent, rp);
tx.send(Ok(vec![CoreState::Free, occupied_core(1), occupied_core(2)])).unwrap(); tx.send(Ok(vec![CoreState::Free, occupied_core(1, hash_a), occupied_core(2, hash_b)])).unwrap();
},
FromJobCommand::SendMessage(
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(rp, RuntimeApiRequest::CandidatePendingAvailability(para_id, tx)),
),
) => {
assert_eq!(relay_parent, rp);
if para_id == 1.into() {
tx.send(Ok(Some(Default::default()))).unwrap();
} else {
tx.send(Ok(None)).unwrap();
}
}, },
FromJobCommand::SendMessage( FromJobCommand::SendMessage(
AllMessages::AvailabilityStore( AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunkAvailability(_, vidx, tx), AvailabilityStoreMessage::QueryChunkAvailability(c_hash, vidx, tx),
), ),
) => { ) => {
assert_eq!(validator_index, vidx); assert_eq!(validator_index, vidx);
tx.send(true).unwrap(); tx.send(c_hash == hash_a).unwrap();
}, },
o => panic!("Unknown message: {:?}", o), o => panic!("Unknown message: {:?}", o),
}, },
......
...@@ -4,13 +4,14 @@ use polkadot_primitives::v1::{OccupiedCore, ScheduledCore}; ...@@ -4,13 +4,14 @@ use polkadot_primitives::v1::{OccupiedCore, ScheduledCore};
pub fn occupied_core(para_id: u32) -> CoreState { pub fn occupied_core(para_id: u32) -> CoreState {
CoreState::Occupied(OccupiedCore { CoreState::Occupied(OccupiedCore {
para_id: para_id.into(),
group_responsible: para_id.into(), group_responsible: para_id.into(),
next_up_on_available: None, next_up_on_available: None,
occupied_since: 100_u32, occupied_since: 100_u32,
time_out_at: 200_u32, time_out_at: 200_u32,
next_up_on_time_out: None, next_up_on_time_out: None,
availability: bitvec![bitvec::order::Lsb0, u8; 0; 32], availability: bitvec![bitvec::order::Lsb0, u8; 0; 32],
candidate_descriptor: Default::default(),
candidate_hash: Default::default(),
}) })
} }
......
...@@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{ ...@@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{
}; };
use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId, BlakeTwo256, CoreState, ErasureChunk, Hash, HashT,
SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash, SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
CandidateDescriptor, CandidateDescriptor,
}; };
...@@ -62,11 +62,6 @@ const LOG_TARGET: &'static str = "availability_distribution"; ...@@ -62,11 +62,6 @@ const LOG_TARGET: &'static str = "availability_distribution";
#[derive(Debug, Error)] #[derive(Debug, Error)]
enum Error { enum Error {
#[error("Response channel to obtain PendingAvailability failed")]
QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain PendingAvailability failed")]
QueryPendingAvailability(#[source] RuntimeApiError),
#[error("Response channel to obtain StoreChunk failed")] #[error("Response channel to obtain StoreChunk failed")]
StoreChunkResponseChannel(#[source] oneshot::Canceled), StoreChunkResponseChannel(#[source] oneshot::Canceled),
...@@ -795,19 +790,12 @@ where ...@@ -795,19 +790,12 @@ where
e => e.or_default(), e => e.or_default(),
}; };
for para in query_para_ids(ctx, relay_parent).await? { for (receipt_hash, descriptor) in query_pending_availability(ctx, relay_parent).await? {
if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? { // unfortunately we have no good way of telling the candidate was
let receipt_hash = ccr.hash(); // cached until now. But we don't clobber a `Cached` entry if there
let descriptor = ccr.descriptor().clone(); // is one already.
live_candidates.entry(receipt_hash).or_insert(FetchedLiveCandidate::Fresh(descriptor));
// unfortunately we have no good way of telling the candidate was receipts_for.insert(receipt_hash);
// cached until now. But we don't clobber a `Cached` entry if there
// is one already.
live_candidates.entry(receipt_hash)
.or_insert(FetchedLiveCandidate::Fresh(descriptor));
receipts_for.insert(receipt_hash);
}
} }
} }
...@@ -850,9 +838,10 @@ where ...@@ -850,9 +838,10 @@ where
Ok((live_candidates, ancestors)) Ok((live_candidates, ancestors))
} }
/// Query all para IDs that are occupied under a given relay-parent. /// Query all hashes and descriptors of candidates pending availability at a particular block.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_para_ids<Context>(ctx: &mut Context, relay_parent: Hash) -> Result<Vec<ParaId>> async fn query_pending_availability<Context>(ctx: &mut Context, relay_parent: Hash)
-> Result<Vec<(CandidateHash, CandidateDescriptor)>>
where where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>, Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{ {
...@@ -863,22 +852,18 @@ where ...@@ -863,22 +852,18 @@ where
))) )))
.await; .await;
let all_para_ids = rx let cores: Vec<_> = rx
.await .await
.map_err(|e| Error::AvailabilityCoresResponseChannel(e))? .map_err(|e| Error::AvailabilityCoresResponseChannel(e))?
.map_err(|e| Error::AvailabilityCores(e))?; .map_err(|e| Error::AvailabilityCores(e))?;
let occupied_para_ids = all_para_ids Ok(cores.into_iter()
.into_iter() .filter_map(|core_state| if let CoreState::Occupied(occupied) = core_state {
.filter_map(|core_state| { Some((occupied.candidate_hash, occupied.candidate_descriptor))
if let CoreState::Occupied(occupied) = core_state { } else {
Some(occupied.para_id) None
} else {
None
}
}) })
.collect(); .collect())
Ok(occupied_para_ids)
} }
/// Modify the reputation of a peer based on its behavior. /// Modify the reputation of a peer based on its behavior.
...@@ -954,27 +939,6 @@ where ...@@ -954,27 +939,6 @@ where
rx.await.map_err(|e| Error::StoreChunkResponseChannel(e)) rx.await.map_err(|e| Error::StoreChunkResponseChannel(e))
} }
/// Request the head data for a particular para.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_pending_availability<Context>(
ctx: &mut Context,
relay_parent: Hash,
para: ParaId,
) -> Result<Option<CommittedCandidateReceipt>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(para, tx),
))).await;
rx.await
.map_err(|e| Error::QueryPendingAvailabilityResponseChannel(e))?
.map_err(|e| Error::QueryPendingAvailability(e))
}
/// Query the validator set. /// Query the validator set.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_validators<Context>( async fn query_validators<Context>(
......
...@@ -21,7 +21,8 @@ use polkadot_node_network_protocol::{view, ObservedRole}; ...@@ -21,7 +21,8 @@ use polkadot_node_network_protocol::{view, ObservedRole};
use polkadot_node_subsystem_util::TimeoutExt; use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex, AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex,
GroupRotationInfo, HeadData, OccupiedCore, PersistedValidationData, PoV, ScheduledCore, GroupRotationInfo, HeadData, OccupiedCore, PersistedValidationData, PoV, ScheduledCore, Id as ParaId,
CommittedCandidateReceipt,
}; };
use polkadot_subsystem_testhelpers as test_helpers; use polkadot_subsystem_testhelpers as test_helpers;
...@@ -29,6 +30,7 @@ use futures::{executor, future, Future}; ...@@ -29,6 +30,7 @@ use futures::{executor, future, Future};
use sc_keystore::LocalKeystore; use sc_keystore::LocalKeystore;
use sp_application_crypto::AppKey; use sp_application_crypto::AppKey;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_keyring::Sr25519Keyring;
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use maplit::hashmap; use maplit::hashmap;
...@@ -95,20 +97,19 @@ async fn overseer_recv( ...@@ -95,20 +97,19 @@ async fn overseer_recv(
msg msg
} }
fn dummy_occupied_core(para: ParaId) -> CoreState { fn occupied_core_from_candidate(receipt: &CommittedCandidateReceipt) -> CoreState {
CoreState::Occupied(OccupiedCore { CoreState::Occupied(OccupiedCore {
para_id: para,
next_up_on_available: None, next_up_on_available: None,
occupied_since: 0, occupied_since: 0,
time_out_at: 5, time_out_at: 5,
next_up_on_time_out: None, next_up_on_time_out: None,
availability: Default::default(), availability: Default::default(),
group_responsible: GroupIndex::from(0), group_responsible: GroupIndex::from(0),
candidate_hash: receipt.hash(),
candidate_descriptor: receipt.descriptor().clone(),
}) })
} }
use sp_keyring::Sr25519Keyring;
#[derive(Clone)] #[derive(Clone)]
struct TestState { struct TestState {
chain_ids: Vec<ParaId>, chain_ids: Vec<ParaId>,
...@@ -388,7 +389,6 @@ async fn change_our_view( ...@@ -388,7 +389,6 @@ async fn change_our_view(
ancestors: Vec<Hash>, ancestors: Vec<Hash>,
session_per_relay_parent: HashMap<Hash, SessionIndex>, session_per_relay_parent: HashMap<Hash, SessionIndex>,
availability_cores_per_relay_parent: HashMap<Hash, Vec<CoreState>>, availability_cores_per_relay_parent: HashMap<Hash, Vec<CoreState>>,
candidate_pending_availabilities_per_relay_parent: HashMap<Hash, Vec<CommittedCandidateReceipt>>,
data_availability: HashMap<CandidateHash, bool>, data_availability: HashMap<CandidateHash, bool>,
chunk_data_per_candidate: HashMap<CandidateHash, (PoV, PersistedValidationData)>, chunk_data_per_candidate: HashMap<CandidateHash, (PoV, PersistedValidationData)>,
send_chunks_to: HashMap<CandidateHash, Vec<PeerId>>, send_chunks_to: HashMap<CandidateHash, Vec<PeerId>>,
...@@ -436,7 +436,7 @@ async fn change_our_view( ...@@ -436,7 +436,7 @@ async fn change_our_view(
} }
for _ in 0..availability_cores_per_relay_parent.len() { for _ in 0..availability_cores_per_relay_parent.len() {
let relay_parent = assert_matches!( assert_matches!(
overseer_recv(virtual_overseer).await, overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request( AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent, relay_parent,
...@@ -446,30 +446,8 @@ async fn change_our_view( ...@@ -446,30 +446,8 @@ async fn change_our_view(
.expect(&format!("Availability core for relay parent {:?} does not exist", relay_parent)); .expect(&format!("Availability core for relay parent {:?} does not exist", relay_parent));
tx.send(Ok(cores.clone())).unwrap(); tx.send(Ok(cores.clone())).unwrap();
relay_parent
} }
); );
let pending_availability = candidate_pending_availabilities_per_relay_parent.get(&relay_parent)
.expect(&format!("Candidate pending availability for relay parent {:?} does not exist", relay_parent));
for _ in 0..pending_availability.len() {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::CandidatePendingAvailability(para, tx)
)) => {
assert_eq!(relay_parent, hash);
let candidate = pending_availability.iter()
.find(|c| c.descriptor.para_id == para)
.expect(&format!("Pending candidate for para {} does not exist", para));
tx.send(Ok(Some(candidate.clone()))).unwrap();
}
);
}
} }
for _ in 0..data_availability.len() { for _ in 0..data_availability.len() {
...@@ -571,7 +549,6 @@ fn check_views() { ...@@ -571,7 +549,6 @@ fn check_views() {
let mut virtual_overseer = test_harness.virtual_overseer; let mut virtual_overseer = test_harness.virtual_overseer;
let TestState { let TestState {
chain_ids,
validator_public, validator_public,
relay_parent: current, relay_parent: current,
ancestors, ancestors,
...@@ -588,36 +565,37 @@ fn check_views() { ...@@ -588,36 +565,37 @@ fn check_views() {
vec![ancestors[0], genesis], vec![ancestors[0], genesis],
hashmap! { current => 1, genesis => 1 }, hashmap! { current => 1, genesis => 1 },
hashmap! { hashmap! {
ancestors[0] => vec![dummy_occupied_core(chain_ids[0]), dummy_occupied_core(chain_ids[1])], ancestors[0] => vec![
occupied_core_from_candidate(&candidates[0]),
occupied_core_from_candidate(&candidates[1]),
],
current => vec![ current => vec![
CoreState::Occupied(OccupiedCore { CoreState::Occupied(OccupiedCore {
para_id: chain_ids[0].clone(),
next_up_on_available: None, next_up_on_available: None,
occupied_since: 0, occupied_since: 0,
time_out_at: 10, time_out_at: 10,
next_up_on_time_out: None, next_up_on_time_out: None,
availability: Default::default(), availability: Default::default(),
group_responsible: GroupIndex::from(0), group_responsible: GroupIndex::from(0),
candidate_hash: candidates[0].hash(),
candidate_descriptor: candidates[0].descriptor().clone(),
}), }),
CoreState::Free, CoreState::Free,
CoreState::Free, CoreState::Free,
CoreState::Occupied(OccupiedCore { CoreState::Occupied(OccupiedCore {
para_id: chain_ids[1].clone(),
next_up_on_available: None, next_up_on_available: None,
occupied_since: 1, occupied_since: 1,
time_out_at: 7, time_out_at: 7,
next_up_on_time_out: None, next_up_on_time_out: None,
availability: Default::default(), availability: Default::default(),
group_responsible: GroupIndex::from(0), group_responsible: GroupIndex::from(0),
candidate_hash: candidates[1].hash(),
candidate_descriptor: candidates[1].descriptor().clone(),
}), }),
CoreState::Free, CoreState::Free,
CoreState::Free, CoreState::Free,
] ]
}, },
hashmap! {
ancestors[0] => vec![candidates[0].clone(), candidates[1].clone()],
current => vec![candidates[0].clone(), candidates[1].clone()],
},
hashmap! { hashmap! {
candidates[0].hash() => true, candidates[0].hash() => true,
candidates[1].hash() => false, candidates[1].hash() => false,
...@@ -690,11 +668,10 @@ fn reputation_verification() { ...@@ -690,11 +668,10 @@ fn reputation_verification() {
hashmap! { current => 1 }, hashmap! { current => 1 },
hashmap! { hashmap! {
current => vec![ current => vec![
dummy_occupied_core(candidates[0].descriptor.para_id), occupied_core_from_candidate(&candidates[0]),
dummy_occupied_core(candidates[1].descriptor.para_id) occupied_core_from_candidate(&candidates[1]),
], ],
}, },
hashmap! { current => vec![candidates[0].clone(), candidates[1].clone()] },
hashmap! { candidates[0].hash() => true, candidates[1].hash() => false }, hashmap! { candidates[0].hash() => true, candidates[1].hash() => false },
hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
hashmap! {}, hashmap! {},
...@@ -783,10 +760,9 @@ fn not_a_live_candidate_is_detected() { ...@@ -783,10 +760,9 @@ fn not_a_live_candidate_is_detected() {
hashmap! { current => 1 }, hashmap! { current => 1 },
hashmap! { hashmap! {
current => vec![ current => vec![
dummy_occupied_core(candidates[0].descriptor.para_id), occupied_core_from_candidate(&candidates[0]),
], ],
}, },
hashmap! { current => vec![candidates[0].clone()] },
hashmap! { candidates[0].hash() => true }, hashmap! { candidates[0].hash() => true },
hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())}, hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
hashmap! {}, hashmap! {},
...@@ -832,10 +808,9 @@ fn peer_change_view_before_us() { ...@@ -832,10 +808,9 @@ fn peer_change_view_before_us() {
hashmap! { current => 1 }, hashmap! { current => 1 },
hashmap! { hashmap! {
current => vec![ current => vec![
dummy_occupied_core(candidates[0].descriptor.para_id), occupied_core_from_candidate(&candidates[0]),
], ],
}, },
hashmap! { current => vec![candi