From 2ca3750f0fe09dafd9f9c5825c7be8ba0ab5c9ee Mon Sep 17 00:00:00 2001 From: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Date: Fri, 5 May 2023 12:56:54 +0300 Subject: [PATCH] Prefer fetching small PoVs from backing group (#7173) * impl QueryChunkSize Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * QueryChunkSize message Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * enable fetching from backing group for small pov Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * review feedback Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * Refactor `bypass_availability_store` Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> * review feedback Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> --------- Signed-off-by: Andrei Sandu <andrei-mihail@parity.io> --- polkadot/node/core/av-store/src/lib.rs | 19 ++ polkadot/node/core/av-store/src/tests.rs | 48 +++++ .../network/availability-recovery/src/lib.rs | 113 ++++++++-- .../availability-recovery/src/tests.rs | 203 +++++++++++++++++- polkadot/node/service/src/overseer.rs | 2 +- polkadot/node/subsystem-types/src/messages.rs | 3 + 6 files changed, 370 insertions(+), 18 deletions(-) diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 4a6d1d464d0..1c0c8c5e7fe 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -1052,6 +1052,25 @@ fn process_message( let _ = tx.send(load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?); }, + AvailabilityStoreMessage::QueryChunkSize(candidate, tx) => { + let meta = load_meta(&subsystem.db, &subsystem.config, &candidate)?; + + let validator_index = meta.map_or(None, |meta| meta.chunks_stored.first_one()); + + let maybe_chunk_size = if let Some(validator_index) = validator_index { + load_chunk( + &subsystem.db, + &subsystem.config, + &candidate, + ValidatorIndex(validator_index as u32), + )? + .map(|erasure_chunk| erasure_chunk.chunk.len()) + } else { + None + }; + + let _ = tx.send(maybe_chunk_size); + }, AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => { match load_meta(&subsystem.db, &subsystem.config, &candidate)? { None => { diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index f3ffc7a80e0..8c4ddc69483 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -1153,3 +1153,51 @@ async fn import_leaf( new_leaf } + +#[test] +fn query_chunk_size_works() { + let store = test_store(); + + test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move { + let candidate_hash = CandidateHash(Hash::repeat_byte(33)); + let validator_index = ValidatorIndex(5); + let n_validators = 10; + + let chunk = ErasureChunk { + chunk: vec![1, 2, 3], + index: validator_index, + proof: Proof::try_from(vec![vec![3, 4, 5]]).unwrap(), + }; + + // Ensure an entry already exists. In reality this would come from watching + // chain events. + with_tx(&store, |tx| { + super::write_meta( + tx, + &TEST_CONFIG, + &candidate_hash, + &CandidateMeta { + data_available: false, + chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators], + state: State::Unavailable(BETimestamp(0)), + }, + ); + }); + + let (tx, rx) = oneshot::channel(); + + let chunk_msg = + AvailabilityStoreMessage::StoreChunk { candidate_hash, chunk: chunk.clone(), tx }; + + overseer_send(&mut virtual_overseer, chunk_msg).await; + assert_eq!(rx.await.unwrap(), Ok(())); + + let (tx, rx) = oneshot::channel(); + let query_chunk_size = AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx); + + overseer_send(&mut virtual_overseer, query_chunk_size).await; + + assert_eq!(rx.await.unwrap().unwrap(), chunk.chunk.len()); + virtual_overseer + }); +} diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index f028080d10e..e4d76dcfdaa 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -99,15 +99,47 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT; #[cfg(test)] const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100); -/// The Availability Recovery Subsystem. -pub struct AvailabilityRecoverySubsystem { +/// PoV size limit in bytes for which prefer fetching from backers. +const SMALL_POV_LIMIT: usize = 128 * 1024; + +#[derive(Clone, PartialEq)] +/// The strategy we use to recover the PoV. +pub enum RecoveryStrategy { + /// We always try the backing group first, then fallback to validator chunks. + BackersFirstAlways, + /// We try the backing group first if PoV size is lower than specified, then fallback to validator chunks. + BackersFirstIfSizeLower(usize), + /// We always recover using validator chunks. + ChunksAlways, /// Do not request data from the availability store. /// This is the useful for nodes where the /// availability-store subsystem is not expected to run, /// such as collators. - bypass_availability_store: bool, + BypassAvailabilityStore, +} + +impl RecoveryStrategy { + /// Returns true if the strategy needs backing group index. + pub fn needs_backing_group(&self) -> bool { + match self { + RecoveryStrategy::BackersFirstAlways | RecoveryStrategy::BackersFirstIfSizeLower(_) => + true, + _ => false, + } + } - fast_path: bool, + /// Returns the PoV size limit in bytes for `BackersFirstIfSizeLower` strategy, otherwise `None`. + pub fn pov_size_limit(&self) -> Option<usize> { + match *self { + RecoveryStrategy::BackersFirstIfSizeLower(limit) => Some(limit), + _ => None, + } + } +} +/// The Availability Recovery Subsystem. +pub struct AvailabilityRecoverySubsystem { + /// PoV recovery strategy to use. + recovery_strategy: RecoveryStrategy, /// Receiver for available data requests. req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>, /// Metrics for this subsystem. @@ -863,10 +895,10 @@ async fn launch_recovery_task<Context>( ctx: &mut Context, session_info: SessionInfo, receipt: CandidateReceipt, - backing_group: Option<GroupIndex>, + mut backing_group: Option<GroupIndex>, response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>, - bypass_availability_store: bool, metrics: &Metrics, + recovery_strategy: &RecoveryStrategy, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -877,9 +909,33 @@ async fn launch_recovery_task<Context>( candidate_hash, erasure_root: receipt.descriptor.erasure_root, metrics: metrics.clone(), - bypass_availability_store, + bypass_availability_store: recovery_strategy == &RecoveryStrategy::BypassAvailabilityStore, }; + if let Some(small_pov_limit) = recovery_strategy.pov_size_limit() { + // Get our own chunk size to get an estimate of the PoV size. + let chunk_size: Result<Option<usize>, error::Error> = + query_chunk_size(ctx, candidate_hash).await; + if let Ok(Some(chunk_size)) = chunk_size { + let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len()) / 3; + let prefer_backing_group = pov_size_estimate < small_pov_limit; + + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + pov_size_estimate, + small_pov_limit, + enabled = prefer_backing_group, + "Prefer fetch from backing group", + ); + + backing_group = backing_group.filter(|_| { + // We keep the backing group only if `1/3` of chunks sum up to less than `small_pov_limit`. + prefer_backing_group + }); + } + } + let phase = backing_group .and_then(|g| session_info.validator_groups.get(g)) .map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone()))) @@ -917,8 +973,8 @@ async fn handle_recover<Context>( session_index: SessionIndex, backing_group: Option<GroupIndex>, response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>, - bypass_availability_store: bool, metrics: &Metrics, + recovery_strategy: &RecoveryStrategy, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -961,8 +1017,8 @@ async fn handle_recover<Context>( receipt, backing_group, response_sender, - bypass_availability_store, metrics, + recovery_strategy, ) .await, None => { @@ -988,6 +1044,18 @@ async fn query_full_data<Context>( rx.await.map_err(error::Error::CanceledQueryFullData) } +/// Queries a chunk from av-store. +#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)] +async fn query_chunk_size<Context>( + ctx: &mut Context, + candidate_hash: CandidateHash, +) -> error::Result<Option<usize>> { + let (tx, rx) = oneshot::channel(); + ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx)) + .await; + + rx.await.map_err(error::Error::CanceledQueryFullData) +} #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)] impl AvailabilityRecoverySubsystem { /// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the @@ -996,7 +1064,7 @@ impl AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>, metrics: Metrics, ) -> Self { - Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics } + Self { recovery_strategy: RecoveryStrategy::BypassAvailabilityStore, req_receiver, metrics } } /// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to @@ -1005,7 +1073,7 @@ impl AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>, metrics: Metrics, ) -> Self { - Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics } + Self { recovery_strategy: RecoveryStrategy::BackersFirstAlways, req_receiver, metrics } } /// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks @@ -1013,12 +1081,25 @@ impl AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>, metrics: Metrics, ) -> Self { - Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics } + Self { recovery_strategy: RecoveryStrategy::ChunksAlways, req_receiver, metrics } + } + + /// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is + /// above a threshold. + pub fn with_chunks_if_pov_large( + req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>, + metrics: Metrics, + ) -> Self { + Self { + recovery_strategy: RecoveryStrategy::BackersFirstIfSizeLower(SMALL_POV_LIMIT), + req_receiver, + metrics, + } } async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> { let mut state = State::default(); - let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self; + let Self { recovery_strategy, mut req_receiver, metrics } = self; loop { let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse(); @@ -1045,10 +1126,10 @@ impl AvailabilityRecoverySubsystem { &mut ctx, receipt, session_index, - maybe_backing_group.filter(|_| fast_path), + maybe_backing_group.filter(|_| recovery_strategy.needs_backing_group()), response_sender, - bypass_availability_store, &metrics, + &recovery_strategy, ).await { gum::warn!( target: LOG_TARGET, @@ -1064,7 +1145,7 @@ impl AvailabilityRecoverySubsystem { in_req = recv_req => { match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? { Ok(req) => { - if bypass_availability_store { + if recovery_strategy == RecoveryStrategy::BypassAvailabilityStore { gum::debug!( target: LOG_TARGET, "Skipping request to availability-store.", diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index b47a033f0d1..5a2f70149f4 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -117,6 +117,44 @@ fn test_harness_chunks_only<T: Future<Output = (VirtualOverseer, RequestResponse .unwrap(); } +fn test_harness_chunks_if_pov_large< + T: Future<Output = (VirtualOverseer, RequestResponseConfig)>, +>( + test: impl FnOnce(VirtualOverseer, RequestResponseConfig) -> T, +) { + let _ = env_logger::builder() + .is_test(true) + .filter(Some("polkadot_availability_recovery"), log::LevelFilter::Trace) + .try_init(); + + let pool = sp_core::testing::TaskExecutor::new(); + + let (context, virtual_overseer) = make_subsystem_context(pool.clone()); + + let (collation_req_receiver, req_cfg) = + IncomingRequest::get_config_receiver(&ReqProtocolNames::new(&GENESIS_HASH, None)); + let subsystem = AvailabilityRecoverySubsystem::with_chunks_if_pov_large( + collation_req_receiver, + Metrics::new_dummy(), + ); + let subsystem = subsystem.run(context); + + let test_fut = test(virtual_overseer, req_cfg); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::join( + async move { + let (mut overseer, _req_cfg) = test_fut.await; + overseer_signal(&mut overseer, OverseerSignal::Conclude).await; + }, + subsystem, + )) + .1 + .unwrap(); +} + const TIMEOUT: Duration = Duration::from_millis(300); macro_rules! delay { @@ -249,7 +287,7 @@ impl TestState { let _ = tx.send(if with_data { Some(self.available_data.clone()) } else { - println!("SENDING NONE"); + gum::debug!("Sending None"); None }); } @@ -914,6 +952,169 @@ fn fast_path_backing_group_recovers() { }); } +#[test] +fn recovers_from_only_chunks_if_pov_large() { + let test_state = TestState::default(); + + test_harness_chunks_if_pov_large(|mut virtual_overseer, req_cfg| async move { + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + })), + ) + .await; + + let (tx, rx) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + test_state.candidate.clone(), + test_state.session_index, + Some(GroupIndex(0)), + tx, + ), + ) + .await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + + let candidate_hash = test_state.candidate.hash(); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryChunkSize(_, tx) + ) => { + let _ = tx.send(Some(1000000)); + } + ); + + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + + test_state + .test_chunk_requests( + candidate_hash, + &mut virtual_overseer, + test_state.threshold(), + |_| Has::Yes, + ) + .await; + + // Recovered data should match the original one. + assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); + + let (tx, rx) = oneshot::channel(); + + // Test another candidate, send no chunks. + let mut new_candidate = dummy_candidate_receipt(dummy_hash()); + + new_candidate.descriptor.relay_parent = test_state.candidate.descriptor.relay_parent; + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + new_candidate.clone(), + test_state.session_index, + None, + tx, + ), + ) + .await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryChunkSize(_, tx) + ) => { + let _ = tx.send(Some(1000000)); + } + ); + + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + test_state.respond_to_query_all_request(&mut virtual_overseer, |_| false).await; + + test_state + .test_chunk_requests( + new_candidate.hash(), + &mut virtual_overseer, + test_state.impossibility_threshold(), + |_| Has::No, + ) + .await; + + // A request times out with `Unavailable` error. + assert_eq!(rx.await.unwrap().unwrap_err(), RecoveryError::Unavailable); + (virtual_overseer, req_cfg) + }); +} + +#[test] +fn fast_path_backing_group_recovers_if_pov_small() { + let test_state = TestState::default(); + + test_harness_chunks_if_pov_large(|mut virtual_overseer, req_cfg| async move { + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + })), + ) + .await; + + let (tx, rx) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + AvailabilityRecoveryMessage::RecoverAvailableData( + test_state.candidate.clone(), + test_state.session_index, + Some(GroupIndex(0)), + tx, + ), + ) + .await; + + test_state.test_runtime_api(&mut virtual_overseer).await; + + let candidate_hash = test_state.candidate.hash(); + + let who_has = |i| match i { + 3 => Has::Yes, + _ => Has::No, + }; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryChunkSize(_, tx) + ) => { + let _ = tx.send(Some(100)); + } + ); + + test_state.respond_to_available_data_query(&mut virtual_overseer, false).await; + + test_state + .test_full_data_requests(candidate_hash, &mut virtual_overseer, who_has) + .await; + + // Recovered data should match the original one. + assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); + (virtual_overseer, req_cfg) + }); +} + #[test] fn no_answers_in_fast_path_causes_chunk_requests() { let test_state = TestState::default(); diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index d669a41f6f8..afb7ec998b4 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -224,7 +224,7 @@ where IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver }, Metrics::register(registry)?, )) - .availability_recovery(AvailabilityRecoverySubsystem::with_chunks_only( + .availability_recovery(AvailabilityRecoverySubsystem::with_chunks_if_pov_large( available_data_req_receiver, Metrics::register(registry)?, )) diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 0784e397592..689d210096a 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -429,6 +429,9 @@ pub enum AvailabilityStoreMessage { /// Query an `ErasureChunk` from the AV store by the candidate hash and validator index. QueryChunk(CandidateHash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>), + /// Get the size of an `ErasureChunk` from the AV store by the candidate hash. + QueryChunkSize(CandidateHash, oneshot::Sender<Option<usize>>), + /// Query all chunks that we have for the given candidate hash. QueryAllChunks(CandidateHash, oneshot::Sender<Vec<ErasureChunk>>), -- GitLab