Unverified Commit 5b5bf207 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Avoid querying the local validator in availability recovery (#2792)

* guide: don't request availability data from ourselves

* add QueryAllChunks message

* implement QueryAllChunks

* remove unused relay_parent from StoreChunk

* test QueryAllChunks

* fast paths make short roads

* test early exit behavior
parent d77262d6
Pipeline #132293 failed with stages
in 27 minutes and 28 seconds
......@@ -980,6 +980,33 @@ fn process_message(
let _timer = subsystem.metrics.time_get_chunk();
let _ = tx.send(load_chunk(&subsystem.db, &candidate, validator_index)?);
}
AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => {
match load_meta(&subsystem.db, &candidate)? {
None => {
let _ = tx.send(Vec::new());
}
Some(meta) => {
let mut chunks = Vec::new();
for (index, _) in meta.chunks_stored.iter().enumerate().filter(|(_, b)| **b) {
let _timer = subsystem.metrics.time_get_chunk();
match load_chunk(&subsystem.db, &candidate, ValidatorIndex(index as _))? {
Some(c) => chunks.push(c),
None => {
tracing::warn!(
target: LOG_TARGET,
?candidate,
index,
"No chunk found for set bit in meta"
);
}
}
}
let _ = tx.send(chunks);
}
}
}
AvailabilityStoreMessage::QueryChunkAvailability(candidate, validator_index, tx) => {
let a = load_meta(&subsystem.db, &candidate)?
.map_or(false, |m|
......@@ -989,7 +1016,6 @@ fn process_message(
}
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent: _,
chunk,
tx,
} => {
......
......@@ -284,7 +284,6 @@ fn store_chunk_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
test_harness(TestState::default(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let relay_parent = Hash::repeat_byte(32);
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
let validator_index = ValidatorIndex(5);
let n_validators = 10;
......@@ -309,7 +308,6 @@ fn store_chunk_works() {
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
chunk: chunk.clone(),
tx,
};
......@@ -336,7 +334,6 @@ fn store_chunk_does_nothing_if_no_entry_already() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
test_harness(TestState::default(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let relay_parent = Hash::repeat_byte(32);
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
let validator_index = ValidatorIndex(5);
......@@ -350,7 +347,6 @@ fn store_chunk_does_nothing_if_no_entry_already() {
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
chunk: chunk.clone(),
tx,
};
......@@ -510,6 +506,98 @@ fn store_pov_and_query_chunk_works() {
});
}
#[test]
fn query_all_chunks_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
// all chunks for hash 1.
// 1 chunk for hash 2.
// 0 chunks for hash 3.
let candidate_hash_1 = CandidateHash(Hash::repeat_byte(1));
let candidate_hash_2 = CandidateHash(Hash::repeat_byte(2));
let candidate_hash_3 = CandidateHash(Hash::repeat_byte(3));
let n_validators = 10;
let pov = PoV {
block_data: BlockData(vec![4, 5, 6]),
};
let available_data = AvailableData {
pov: Arc::new(pov),
validation_data: test_state.persisted_validation_data.clone(),
};
{
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_hash_1,
None,
n_validators,
available_data,
tx,
);
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
assert_eq!(rx.await.unwrap(), Ok(()));
}
{
with_tx(&store, |tx| {
super::write_meta(tx, &candidate_hash_2, &CandidateMeta {
data_available: false,
chunks_stored: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators as _],
state: State::Unavailable(BETimestamp(0)),
});
});
let chunk = ErasureChunk {
chunk: vec![1, 2, 3],
index: ValidatorIndex(1),
proof: vec![vec![3, 4, 5]],
};
let (tx, rx) = oneshot::channel();
let store_chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash: candidate_hash_2,
chunk,
tx,
};
virtual_overseer.send(FromOverseer::Communication { msg: store_chunk_msg }).await;
assert_eq!(rx.await.unwrap(), Ok(()));
}
{
let (tx, rx) = oneshot::channel();
let msg = AvailabilityStoreMessage::QueryAllChunks(candidate_hash_1, tx);
virtual_overseer.send(FromOverseer::Communication { msg }).await;
assert_eq!(rx.await.unwrap().len(), n_validators as usize);
}
{
let (tx, rx) = oneshot::channel();
let msg = AvailabilityStoreMessage::QueryAllChunks(candidate_hash_2, tx);
virtual_overseer.send(FromOverseer::Communication { msg }).await;
assert_eq!(rx.await.unwrap().len(), 1);
}
{
let (tx, rx) = oneshot::channel();
let msg = AvailabilityStoreMessage::QueryAllChunks(candidate_hash_3, tx);
virtual_overseer.send(FromOverseer::Communication { msg }).await;
assert_eq!(rx.await.unwrap().len(), 0);
}
});
}
#[test]
fn stored_but_not_included_data_is_pruned() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
......@@ -625,7 +713,7 @@ fn stored_data_kept_until_finalized() {
);
assert!(
query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
);
overseer_signal(
......@@ -644,7 +732,7 @@ fn stored_data_kept_until_finalized() {
);
assert!(
query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, true).await
);
// Wait until it definitely should be gone.
......@@ -657,7 +745,7 @@ fn stored_data_kept_until_finalized() {
);
assert!(
query_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await
has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await
);
});
}
......@@ -781,11 +869,11 @@ fn forkfullness_works() {
);
assert!(
query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
has_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
);
assert!(
query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, true).await,
has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, true).await,
);
// Candidate 2 should now be considered unavailable and will be pruned.
......@@ -802,11 +890,11 @@ fn forkfullness_works() {
);
assert!(
query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
has_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, true).await,
);
assert!(
query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
);
// Wait for longer than finalized blocks should be kept for
......@@ -823,11 +911,11 @@ fn forkfullness_works() {
);
assert!(
query_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, false).await,
has_all_chunks(&mut virtual_overseer, candidate_1_hash, n_validators, false).await,
);
assert!(
query_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
);
});
}
......@@ -857,7 +945,7 @@ async fn query_chunk(
rx.await.unwrap()
}
async fn query_all_chunks(
async fn has_all_chunks(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
candidate_hash: CandidateHash,
n_validators: u32,
......
......@@ -27,7 +27,7 @@ use polkadot_node_network_protocol::request_response::{
v1::{ChunkFetchingRequest, ChunkFetchingResponse},
};
use polkadot_primitives::v1::{AuthorityDiscoveryId, BlakeTwo256, CandidateHash, GroupIndex, Hash, HashT, OccupiedCore, SessionIndex};
use polkadot_node_primitives::ErasureChunk;
use polkadot_node_primitives::ErasureChunk;
use polkadot_subsystem::messages::{
AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, IfDisconnected,
};
......@@ -405,7 +405,6 @@ impl RunningTask {
.send(FromFetchTask::Message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash: self.request.candidate_hash,
relay_parent: self.relay_parent,
chunk,
tx,
},
......
......@@ -67,7 +67,7 @@ pub struct TestState {
pub relay_chain: Vec<Hash>,
/// Whenever the subsystem tries to fetch an erasure chunk one item of the given vec will be
/// popped. So you can experiment with serving invalid chunks or no chunks on request and see
/// whether the subystem still succeds with its goal.
/// whether the subystem still succeds with its goal.
pub chunks: HashMap<(CandidateHash, ValidatorIndex), Vec<Option<ErasureChunk>>>,
/// All chunks that are valid and should be accepted.
pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>,
......
......@@ -334,6 +334,34 @@ impl RequestChunksPhase {
params: &InteractionParams,
sender: &mut impl SubsystemSender,
) -> Result<AvailableData, RecoveryError> {
// First query the store for any chunks we've got.
{
let (tx, rx) = oneshot::channel();
sender.send_message(
AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx).into()
).await;
match rx.await {
Ok(chunks) => {
// This should either be length 1 or 0. If we had the whole data,
// we wouldn't have reached this stage.
let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect();
self.shuffling.retain(|i| !chunk_indices.contains(i));
for chunk in chunks {
self.received_chunks.insert(chunk.index, chunk);
}
}
Err(oneshot::Canceled) => {
tracing::warn!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
"Failed to reach the availability store"
);
}
}
}
loop {
if self.is_unavailable(&params) {
tracing::debug!(
......@@ -431,6 +459,26 @@ fn reconstructed_data_matches_root(
impl<S: SubsystemSender> Interaction<S> {
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
// First just see if we have the data available locally.
{
let (tx, rx) = oneshot::channel();
self.sender.send_message(
AvailabilityStoreMessage::QueryAvailableData(self.params.candidate_hash, tx).into()
).await;
match rx.await {
Ok(Some(data)) => return Ok(data),
Ok(None) => {}
Err(oneshot::Canceled) => {
tracing::warn!(
target: LOG_TARGET,
candidate_hash = ?self.params.candidate_hash,
"Failed to reach the availability store",
)
}
}
}
loop {
// These only fail if we cannot reach the underlying subsystem, which case there is nothing
// meaningful we can do.
......
......@@ -207,6 +207,46 @@ impl TestState {
);
}
async fn respond_to_available_data_query(
&self,
virtual_overseer: &mut VirtualOverseer,
with_data: bool,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryAvailableData(_, tx)
) => {
let _ = tx.send(if with_data {
Some(self.available_data.clone())
} else {
println!("SENDING NONE");
None
});
}
)
}
async fn respond_to_query_all_request(
&self,
virtual_overseer: &mut VirtualOverseer,
send_chunk: impl Fn(usize) -> bool
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryAllChunks(_, tx)
) => {
let v = self.chunks.iter()
.filter(|c| send_chunk(c.index.0 as usize))
.cloned()
.collect();
let _ = tx.send(v);
}
)
}
async fn test_chunk_requests(
&self,
candidate_hash: CandidateHash,
......@@ -430,6 +470,9 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() {
let candidate_hash = test_state.candidate.hash();
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
......@@ -459,6 +502,9 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() {
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
test_state.test_chunk_requests(
new_candidate.hash(),
&mut virtual_overseer,
......@@ -506,6 +552,9 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk
let candidate_hash = test_state.candidate.hash();
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
......@@ -535,6 +584,9 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
test_state.test_chunk_requests(
new_candidate.hash(),
&mut virtual_overseer,
......@@ -589,6 +641,9 @@ fn bad_merkle_path_leads_to_recovery_error() {
test_state.chunks[3].chunk = vec![3; 32];
test_state.chunks[4].chunk = vec![4; 32];
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
......@@ -642,6 +697,9 @@ fn wrong_chunk_index_leads_to_recovery_error() {
test_state.chunks[3] = test_state.chunks[0].clone();
test_state.chunks[4] = test_state.chunks[0].clone();
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
......@@ -705,6 +763,9 @@ fn invalid_erasure_coding_leads_to_invalid_error() {
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
......@@ -757,6 +818,8 @@ fn fast_path_backing_group_recovers() {
_ => Has::No,
};
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.test_full_data_requests(
candidate_hash,
&mut virtual_overseer,
......@@ -808,12 +871,17 @@ fn no_answers_in_fast_path_causes_chunk_requests() {
0 | 3 => Has::No,
_ => Has::timeout(),
};
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.test_full_data_requests(
candidate_hash,
&mut virtual_overseer,
who_has,
).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
......@@ -905,6 +973,9 @@ fn chunks_retry_until_all_nodes_respond() {
let candidate_hash = test_state.candidate.hash();
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await;
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
......@@ -925,3 +996,105 @@ fn chunks_retry_until_all_nodes_respond() {
assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable);
});
}
#[test]
fn returns_early_if_we_have_the_data() {
let test_state = TestState::default();
test_harness_chunks_only(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![],
}),
).await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
None,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.respond_to_available_data_query(&mut virtual_overseer, true).await;
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
});
}
#[test]
fn does_not_query_local_validator() {
let test_state = TestState::default();
test_harness_chunks_only(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![],
}),
).await;
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
AvailabilityRecoveryMessage::RecoverAvailableData(
test_state.candidate.clone(),
test_state.session_index,
None,
tx,
)
).await;
test_state.test_runtime_api(&mut virtual_overseer).await;
test_state.respond_to_available_data_query(&mut virtual_overseer, false).await;
test_state.respond_to_query_all_request(&mut virtual_overseer, |i| i == 0).await;
let candidate_hash = test_state.candidate.hash();
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
test_state.validators.len(),
|i| if i == 0 {
panic!("requested from local validator")
} else {
Has::timeout()
},
).await;
// second round, make sure it uses the local chunk.
test_state.test_chunk_requests(
candidate_hash,
&mut virtual_overseer,
test_state.threshold() - 1,
|i| if i == 0 {
panic!("requested from local validator")
} else {
Has::Yes
},
).await;
assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data);
});
}
......@@ -353,6 +353,9 @@ pub enum AvailabilityStoreMessage {
/// Query an `ErasureChunk` from the AV store by the candidate hash and validator index.
QueryChunk(CandidateHash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),
/// Query all chunks that we have for the given candidate hash.
QueryAllChunks(CandidateHash, oneshot::Sender<Vec<ErasureChunk>>),
/// Query whether an `ErasureChunk` exists within the AV Store.
///
/// This is useful in cases like bitfield signing, when existence
......@@ -366,8 +369,6 @@ pub enum AvailabilityStoreMessage {
StoreChunk {
/// A hash of the candidate this chunk belongs to.
candidate_hash: CandidateHash,