Unverified Commit 726db12a authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

availability-distribution: Retry failed fetches on next block. (#2762)



* availability-distribution: Retry on fail on next block.

Retry failed fetches on next block when still pending availability.

* Update node/network/availability-distribution/src/requester/fetch_task/mod.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Fix existing tests.

* Add test for trying all validators.

* Add test for testing retries.

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 37181aeb
Pipeline #131675 failed with stages
in 25 minutes and 26 seconds
...@@ -26,10 +26,7 @@ use polkadot_node_network_protocol::request_response::{ ...@@ -26,10 +26,7 @@ use polkadot_node_network_protocol::request_response::{
request::{OutgoingRequest, RequestError, Requests, Recipient}, request::{OutgoingRequest, RequestError, Requests, Recipient},
v1::{ChunkFetchingRequest, ChunkFetchingResponse}, v1::{ChunkFetchingRequest, ChunkFetchingResponse},
}; };
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{AuthorityDiscoveryId, BlakeTwo256, CandidateHash, GroupIndex, Hash, HashT, OccupiedCore, SessionIndex};
AuthorityDiscoveryId, BlakeTwo256, GroupIndex, Hash, HashT, OccupiedCore,
SessionIndex,
};
use polkadot_node_primitives::ErasureChunk; use polkadot_node_primitives::ErasureChunk;
use polkadot_subsystem::messages::{ use polkadot_subsystem::messages::{
AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, IfDisconnected, AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, IfDisconnected,
...@@ -89,6 +86,9 @@ pub enum FromFetchTask { ...@@ -89,6 +86,9 @@ pub enum FromFetchTask {
/// In case of `None` everything was fine, in case of `Some`, some validators in the group /// In case of `None` everything was fine, in case of `Some`, some validators in the group
/// did not serve us our chunk as expected. /// did not serve us our chunk as expected.
Concluded(Option<BadValidators>), Concluded(Option<BadValidators>),
/// We were not able to fetch the desired chunk for the given `CandidateHash`.
Failed(CandidateHash),
} }
/// Information a running task needs. /// Information a running task needs.
...@@ -262,7 +262,7 @@ impl RunningTask { ...@@ -262,7 +262,7 @@ impl RunningTask {
/// Try validators in backing group in order. /// Try validators in backing group in order.
async fn run_inner(mut self) { async fn run_inner(mut self) {
let mut bad_validators = Vec::new(); let mut bad_validators = Vec::new();
let mut label = FAILED; let mut succeeded = false;
let mut count: u32 = 0; let mut count: u32 = 0;
let mut _span = self.span.child("fetch-task") let mut _span = self.span.child("fetch-task")
.with_chunk_index(self.request.index.0) .with_chunk_index(self.request.index.0)
...@@ -315,13 +315,18 @@ impl RunningTask { ...@@ -315,13 +315,18 @@ impl RunningTask {
// Ok, let's store it and be happy: // Ok, let's store it and be happy:
self.store_chunk(chunk).await; self.store_chunk(chunk).await;
label = SUCCEEDED; succeeded = true;
_span.add_string_tag("success", "true"); _span.add_string_tag("success", "true");
break; break;
} }
_span.add_int_tag("tries", count as _); _span.add_int_tag("tries", count as _);
self.metrics.on_fetch(label); if succeeded {
self.conclude(bad_validators).await; self.metrics.on_fetch(SUCCEEDED);
self.conclude(bad_validators).await;
} else {
self.metrics.on_fetch(FAILED);
self.conclude_fail().await
}
} }
/// Do request and return response, if successful. /// Do request and return response, if successful.
...@@ -434,4 +439,14 @@ impl RunningTask { ...@@ -434,4 +439,14 @@ impl RunningTask {
); );
} }
} }
async fn conclude_fail(&mut self) {
if let Err(err) = self.sender.send(FromFetchTask::Failed(self.request.candidate_hash)).await {
tracing::warn!(
target: LOG_TARGET,
?err,
"Sending `Failed` message for task failed"
);
}
}
} }
...@@ -228,6 +228,7 @@ impl TestRun { ...@@ -228,6 +228,7 @@ impl TestRun {
); );
match msg { match msg {
FromFetchTask::Concluded(_) => break, FromFetchTask::Concluded(_) => break,
FromFetchTask::Failed(_) => break,
FromFetchTask::Message(msg) => FromFetchTask::Message(msg) =>
end_ok = self.handle_message(msg).await, end_ok = self.handle_message(msg).await,
} }
......
...@@ -54,6 +54,8 @@ pub struct Requester { ...@@ -54,6 +54,8 @@ pub struct Requester {
/// ///
/// We keep those around as long as a candidate is pending availability on some leaf, so we /// We keep those around as long as a candidate is pending availability on some leaf, so we
/// won't fetch chunks multiple times. /// won't fetch chunks multiple times.
///
/// We remove them on failure, so we get retries on the next block still pending availability.
fetches: HashMap<CandidateHash, FetchTask>, fetches: HashMap<CandidateHash, FetchTask>,
/// Localized information about sessions we are currently interested in. /// Localized information about sessions we are currently interested in.
...@@ -76,10 +78,7 @@ impl Requester { ...@@ -76,10 +78,7 @@ impl Requester {
/// by advancing the stream. /// by advancing the stream.
#[tracing::instrument(level = "trace", skip(keystore, metrics), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(keystore, metrics), fields(subsystem = LOG_TARGET))]
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self { pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
// All we do is forwarding messages, no need to make this big. let (tx, rx) = mpsc::channel(1);
// Each sender will get one slot, see
// [here](https://docs.rs/futures/0.3.13/futures/channel/mpsc/fn.channel.html).
let (tx, rx) = mpsc::channel(0);
Requester { Requester {
fetches: HashMap::new(), fetches: HashMap::new(),
session_cache: SessionCache::new(keystore), session_cache: SessionCache::new(keystore),
...@@ -214,6 +213,10 @@ impl Stream for Requester { ...@@ -214,6 +213,10 @@ impl Stream for Requester {
} }
Poll::Ready(Some(FromFetchTask::Concluded(None))) => Poll::Ready(Some(FromFetchTask::Concluded(None))) =>
continue, continue,
Poll::Ready(Some(FromFetchTask::Failed(candidate_hash))) => {
// Make sure we retry on next block still pending availability.
self.fetches.remove(&candidate_hash);
}
Poll::Ready(None) => Poll::Ready(None) =>
return Poll::Ready(None), return Poll::Ready(None),
Poll::Pending => Poll::Pending =>
......
...@@ -139,6 +139,7 @@ impl TestCandidateBuilder { ...@@ -139,6 +139,7 @@ impl TestCandidateBuilder {
} }
} }
// Get chunk for index 0
pub fn get_valid_chunk_data(pov: PoV) -> (Hash, ErasureChunk) { pub fn get_valid_chunk_data(pov: PoV) -> (Hash, ErasureChunk) {
let fake_validator_count = 10; let fake_validator_count = 10;
let persisted = PersistedValidationData { let persisted = PersistedValidationData {
......
...@@ -14,8 +14,11 @@ ...@@ -14,8 +14,11 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashSet;
use futures::{executor, future, Future}; use futures::{executor, future, Future};
use polkadot_primitives::v1::CoreState;
use sp_keystore::SyncCryptoStorePtr; use sp_keystore::SyncCryptoStorePtr;
use polkadot_subsystem_testhelpers as test_helpers; use polkadot_subsystem_testhelpers as test_helpers;
...@@ -61,3 +64,64 @@ fn check_basic() { ...@@ -61,3 +64,64 @@ fn check_basic() {
state.run(harness) state.run(harness)
}); });
} }
/// Check whether requester tries all validators in group.
#[test]
fn check_fetch_tries_all() {
let mut state = TestState::default();
for (_, v) in state.chunks.iter_mut() {
// 4 validators in group, so this should still succeed:
v.push(None);
v.push(None);
v.push(None);
}
test_harness(state.keystore.clone(), move |harness| {
state.run(harness)
});
}
/// Check whether requester tries all validators in group
///
/// Check that requester will retry the fetch on error on the next block still pending
/// availability.
#[test]
fn check_fetch_retry() {
let mut state = TestState::default();
state.cores.insert(
state.relay_chain[2],
state.cores.get(&state.relay_chain[1]).unwrap().clone(),
);
// We only care about the first three blocks.
// 1. scheduled
// 2. occupied
// 3. still occupied
state.relay_chain.truncate(3);
// Get rid of unused valid chunks:
let valid_candidate_hashes: HashSet<_> = state.cores
.get(&state.relay_chain[1])
.iter()
.map(|v| v.iter())
.flatten()
.filter_map(|c| {
match c {
CoreState::Occupied(core) => Some(core.candidate_hash),
_ => None,
}
})
.collect();
state.valid_chunks.retain(|(ch, _)| valid_candidate_hashes.contains(ch));
for (_, v) in state.chunks.iter_mut() {
// This should still succeed as cores are still pending availability on next block.
v.push(None);
v.push(None);
v.push(None);
v.push(None);
v.push(None);
}
test_harness(state.keystore.clone(), move |harness| {
state.run(harness)
});
}
...@@ -63,9 +63,12 @@ pub struct TestHarness { ...@@ -63,9 +63,12 @@ pub struct TestHarness {
/// `valid_chunks`. /// `valid_chunks`.
#[derive(Clone)] #[derive(Clone)]
pub struct TestState { pub struct TestState {
// Simulated relay chain heads: /// Simulated relay chain heads:
pub relay_chain: Vec<Hash>, pub relay_chain: Vec<Hash>,
pub chunks: HashMap<(CandidateHash, ValidatorIndex), ErasureChunk>, /// Whenever the subsystem tries to fetch an erasure chunk one item of the given vec will be
/// popped. So you can experiment with serving invalid chunks or no chunks on request and see
/// whether the subystem still succeds with its goal.
pub chunks: HashMap<(CandidateHash, ValidatorIndex), Vec<Option<ErasureChunk>>>,
/// All chunks that are valid and should be accepted. /// All chunks that are valid and should be accepted.
pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>, pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>,
pub session_info: SessionInfo, pub session_info: SessionInfo,
...@@ -125,7 +128,7 @@ impl Default for TestState { ...@@ -125,7 +128,7 @@ impl Default for TestState {
let mut chunks_other_groups = p_chunks.into_iter(); let mut chunks_other_groups = p_chunks.into_iter();
chunks_other_groups.next(); chunks_other_groups.next();
for (validator_index, chunk) in chunks_other_groups { for (validator_index, chunk) in chunks_other_groups {
chunks.insert((validator_index, chunk.index), chunk); chunks.insert((validator_index, chunk.index), vec![Some(chunk)]);
} }
} }
(cores, chunks) (cores, chunks)
...@@ -158,7 +161,7 @@ impl TestState { ...@@ -158,7 +161,7 @@ impl TestState {
/// ///
/// We try to be as agnostic about details as possible, how the subsystem achieves those goals /// We try to be as agnostic about details as possible, how the subsystem achieves those goals
/// should not be a matter to this test suite. /// should not be a matter to this test suite.
async fn run_inner(self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle<AvailabilityDistributionMessage>) { async fn run_inner(mut self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle<AvailabilityDistributionMessage>) {
// We skip genesis here (in reality ActiveLeavesUpdate can also skip a block: // We skip genesis here (in reality ActiveLeavesUpdate can also skip a block:
let updates = { let updates = {
let mut advanced = self.relay_chain.iter(); let mut advanced = self.relay_chain.iter();
...@@ -217,8 +220,8 @@ impl TestState { ...@@ -217,8 +220,8 @@ impl TestState {
} }
} }
AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx)) => { AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx)) => {
let chunk = self.chunks.get(&(candidate_hash, validator_index)); let chunk = self.chunks.get_mut(&(candidate_hash, validator_index)).map(Vec::pop).flatten().flatten();
tx.send(chunk.map(Clone::clone)) tx.send(chunk)
.expect("Receiver is expected to be alive"); .expect("Receiver is expected to be alive");
} }
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreChunk{candidate_hash, chunk, tx, ..}) => { AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreChunk{candidate_hash, chunk, tx, ..}) => {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment