diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 5f1e1c2c786fecc0dc9e71d30b3944b4ec552719..375030eedc485e012e00aea517d2f14c51eb7cab 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -21,6 +21,7 @@ //! of others. It uses this information to determine when candidates and blocks have //! been sufficiently approved to finalize. +use futures_timer::Delay; use itertools::Itertools; use jaeger::{hash_to_trace_identifier, PerLeafSpan}; use polkadot_node_jaeger as jaeger; @@ -124,6 +125,9 @@ const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120); const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500); const APPROVAL_CACHE_SIZE: u32 = 1024; +/// The maximum number of times we retry to approve a block if is still needed. +const MAX_APPROVAL_RETRIES: u32 = 16; + const APPROVAL_DELAY: Tick = 2; pub(crate) const LOG_TARGET: &str = "parachain::approval-voting"; @@ -166,6 +170,10 @@ pub struct ApprovalVotingSubsystem { mode: Mode, metrics: Metrics, clock: Box<dyn Clock + Send + Sync>, + /// The maximum time we retry to approve a block if it is still needed and PoV fetch failed. + max_approval_retries: u32, + /// The backoff before we retry the approval. + retry_backoff: Duration, } #[derive(Clone)] @@ -492,6 +500,8 @@ impl ApprovalVotingSubsystem { sync_oracle, metrics, Box::new(SystemClock {}), + MAX_APPROVAL_RETRIES, + APPROVAL_CHECKING_TIMEOUT / 2, ) } @@ -503,6 +513,8 @@ impl ApprovalVotingSubsystem { sync_oracle: Box<dyn SyncOracle + Send>, metrics: Metrics, clock: Box<dyn Clock + Send + Sync>, + max_approval_retries: u32, + retry_backoff: Duration, ) -> Self { ApprovalVotingSubsystem { keystore, @@ -512,6 +524,8 @@ impl ApprovalVotingSubsystem { mode: Mode::Syncing(sync_oracle), metrics, clock, + max_approval_retries, + retry_backoff, } } @@ -701,18 +715,53 @@ enum ApprovalOutcome { TimedOut, } +#[derive(Clone)] +struct RetryApprovalInfo { + candidate: CandidateReceipt, + backing_group: GroupIndex, + executor_params: ExecutorParams, + core_index: Option<CoreIndex>, + session_index: SessionIndex, + attempts_remaining: u32, + backoff: Duration, +} + struct ApprovalState { validator_index: ValidatorIndex, candidate_hash: CandidateHash, approval_outcome: ApprovalOutcome, + retry_info: Option<RetryApprovalInfo>, } impl ApprovalState { fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self { - Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Approved } + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Approved, + retry_info: None, + } } fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self { - Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Failed } + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Failed, + retry_info: None, + } + } + + fn failed_with_retry( + validator_index: ValidatorIndex, + candidate_hash: CandidateHash, + retry_info: Option<RetryApprovalInfo>, + ) -> Self { + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Failed, + retry_info, + } } } @@ -752,6 +801,7 @@ impl CurrentlyCheckingSet { candidate_hash, validator_index, approval_outcome: ApprovalOutcome::TimedOut, + retry_info: None, }, Some(approval_state) => approval_state, } @@ -1236,18 +1286,19 @@ where validator_index, candidate_hash, approval_outcome, + retry_info, } ) = approval_state; if matches!(approval_outcome, ApprovalOutcome::Approved) { let mut approvals: Vec<Action> = relay_block_hashes - .into_iter() + .iter() .map(|block_hash| Action::IssueApproval( candidate_hash, ApprovalVoteRequest { validator_index, - block_hash, + block_hash: *block_hash, }, ) ) @@ -1255,6 +1306,49 @@ where actions.append(&mut approvals); } + if let Some(retry_info) = retry_info { + for block_hash in relay_block_hashes { + if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) { + let ctx = &mut ctx; + let metrics = subsystem.metrics.clone(); + let retry_info = retry_info.clone(); + let executor_params = retry_info.executor_params.clone(); + let candidate = retry_info.candidate.clone(); + let launch_approval_span = state + .spans + .get(&block_hash) + .map(|span| span.child("launch-approval")) + .unwrap_or_else(|| jaeger::Span::new(candidate_hash, "launch-approval")) + .with_trace_id(candidate_hash) + .with_candidate(candidate_hash) + .with_stage(jaeger::Stage::ApprovalChecking); + currently_checking_set + .insert_relay_block_hash( + candidate_hash, + validator_index, + block_hash, + async move { + launch_approval( + ctx, + metrics, + retry_info.session_index, + candidate, + validator_index, + block_hash, + retry_info.backing_group, + executor_params, + retry_info.core_index, + &launch_approval_span, + retry_info, + ) + .await + }, + ) + .await?; + } + } + } + actions }, (block_hash, validator_index) = delayed_approvals_timers.select_next_some() => { @@ -1302,6 +1396,8 @@ where &mut approvals_cache, &mut subsystem.mode, actions, + subsystem.max_approval_retries, + subsystem.retry_backoff, ) .await? { @@ -1350,6 +1446,8 @@ async fn handle_actions<Context>( approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>, mode: &mut Mode, actions: Vec<Action>, + max_approval_retries: u32, + retry_backoff: Duration, ) -> SubsystemResult<bool> { let mut conclude = false; let mut actions_iter = actions.into_iter(); @@ -1442,6 +1540,16 @@ async fn handle_actions<Context>( None => { let ctx = &mut *ctx; + let retry = RetryApprovalInfo { + candidate: candidate.clone(), + backing_group, + executor_params: executor_params.clone(), + core_index, + session_index: session, + attempts_remaining: max_approval_retries, + backoff: retry_backoff, + }; + currently_checking_set .insert_relay_block_hash( candidate_hash, @@ -1459,6 +1567,7 @@ async fn handle_actions<Context>( executor_params, core_index, &launch_approval_span, + retry, ) .await }, @@ -3321,6 +3430,7 @@ async fn launch_approval<Context>( executor_params: ExecutorParams, core_index: Option<CoreIndex>, span: &jaeger::Span, + retry: RetryApprovalInfo, ) -> SubsystemResult<RemoteHandle<ApprovalState>> { let (a_tx, a_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel(); @@ -3352,6 +3462,7 @@ async fn launch_approval<Context>( let candidate_hash = candidate.hash(); let para_id = candidate.descriptor.para_id; + let mut next_retry = None; gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data."); let request_validation_data_span = span @@ -3390,7 +3501,6 @@ async fn launch_approval<Context>( let background = async move { // Force the move of the timer into the background task. let _timer = timer; - let available_data = match a_rx.await { Err(_) => return ApprovalState::failed(validator_index, candidate_hash), Ok(Ok(a)) => a, @@ -3401,10 +3511,27 @@ async fn launch_approval<Context>( target: LOG_TARGET, ?para_id, ?candidate_hash, + attempts_remaining = retry.attempts_remaining, "Data unavailable for candidate {:?}", (candidate_hash, candidate.descriptor.para_id), ); - // do nothing. we'll just be a no-show and that'll cause others to rise up. + // Availability could fail if we did not discover much of the network, so + // let's back off and order the subsystem to retry at a later point if the + // approval is still needed, because no-show wasn't covered yet. + if retry.attempts_remaining > 0 { + Delay::new(retry.backoff).await; + next_retry = Some(RetryApprovalInfo { + candidate, + backing_group, + executor_params, + core_index, + session_index, + attempts_remaining: retry.attempts_remaining - 1, + backoff: retry.backoff, + }); + } else { + next_retry = None; + } metrics_guard.take().on_approval_unavailable(); }, &RecoveryError::ChannelClosed => { @@ -3435,7 +3562,7 @@ async fn launch_approval<Context>( metrics_guard.take().on_approval_invalid(); }, } - return ApprovalState::failed(validator_index, candidate_hash) + return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry) }, }; drop(request_validation_data_span); diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 561f5fbbdf2c00928dd9d1c8f6d7389ee5a034c2..65334b71580af2268511b59934fbe4ab2a6adb2c 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -75,6 +75,9 @@ const SLOT_DURATION_MILLIS: u64 = 5000; const TIMEOUT: Duration = Duration::from_millis(2000); +const NUM_APPROVAL_RETRIES: u32 = 3; +const RETRY_BACKOFF: Duration = Duration::from_millis(300); + #[derive(Clone)] struct TestSyncOracle { flag: Arc<AtomicBool>, @@ -263,7 +266,8 @@ where _relay_vrf_story: polkadot_node_primitives::approval::v1::RelayVRFStory, _assignment: &polkadot_node_primitives::approval::v2::AssignmentCertV2, _backing_groups: Vec<polkadot_primitives::GroupIndex>, - ) -> Result<polkadot_node_primitives::approval::v1::DelayTranche, criteria::InvalidAssignment> { + ) -> Result<polkadot_node_primitives::approval::v1::DelayTranche, criteria::InvalidAssignment> + { self.1(validator_index) } } @@ -575,6 +579,8 @@ fn test_harness<T: Future<Output = VirtualOverseer>>( sync_oracle, Metrics::default(), clock.clone(), + NUM_APPROVAL_RETRIES, + RETRY_BACKOFF, ), assignment_criteria, backend, @@ -3202,6 +3208,20 @@ async fn recover_available_data(virtual_overseer: &mut VirtualOverseer) { ); } +async fn recover_available_data_failure(virtual_overseer: &mut VirtualOverseer) { + let available_data = RecoveryError::Unavailable; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityRecovery( + AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, _, tx) + ) => { + tx.send(Err(available_data)).unwrap(); + }, + "overseer did not receive recover available data message", + ); +} + struct TriggersAssignmentConfig<F1, F2> { our_assigned_tranche: DelayTranche, assign_validator_tranche: F1, @@ -4791,6 +4811,133 @@ fn subsystem_relaunches_approval_work_on_restart() { }); } +/// Test that we retry the approval of candidate on availability failure, up to max retries. +#[test] +fn subsystem_relaunches_approval_work_on_availability_failure() { + let assignment_criteria = Box::new(MockAssignmentCriteria( + || { + let mut assignments = HashMap::new(); + + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact { + core_bitfield: vec![CoreIndex(0), CoreIndex(2)].try_into().unwrap(), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let _ = assignments.insert( + CoreIndex(1), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay { + core_index: CoreIndex(1), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + assignments + }, + |_| Ok(0), + )); + let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build(); + let store = config.backend(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + setup_overseer_with_blocks_with_two_assignments_triggered( + &mut virtual_overseer, + store, + &clock, + sync_oracle_handle, + ) + .await; + + // We have two candidates for one we are going to fail the availability for up to + // max_retries and for the other we are going to succeed on the last retry, so we should + // see the approval being distributed. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _ + )) => { + } + ); + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive { + exec_kind, + response_sender, + .. + }) if exec_kind == PvfExecKind::Approval => { + response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + virtual_overseer + }); +} + // Test that cached approvals, which are candidates that we approved but we didn't issue // the signature yet because we want to coalesce it with more candidate are sent after restart. #[test] diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index f05d061f3fdee63984d1411e1a10809912128095..f5de4d6d0d163125862330cfba8548ed4c924708 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -815,6 +815,8 @@ fn build_overseer( Box::new(TestSyncOracle {}), state.approval_voting_metrics.clone(), Box::new(system_clock.clone()), + 1, + Duration::from_secs(1), ); let approval_distribution = ApprovalDistribution::new_with_clock( diff --git a/prdoc/pr_6807.prdoc b/prdoc/pr_6807.prdoc new file mode 100644 index 0000000000000000000000000000000000000000..b9564dfb2fe26fd1ec399f6c490f6bcf86de3c5d --- /dev/null +++ b/prdoc/pr_6807.prdoc @@ -0,0 +1,19 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Retry approval on availability failure if the check is still needed + +doc: + - audience: Node Dev + description: | + Recovering the POV can fail in situation where the node just restart and the DHT topology + wasn't fully discovered yet, so the current node can't connect to most of its Peers. + This is bad because for gossiping the assignment you need to be connected to just a few + peers, so because we can't approve the candidate other nodes will see this as a no show. + Fix it by retrying to approve a candidate for a fixed number of atttempts if the block is + still needed. + + +crates: + - name: polkadot-node-core-approval-voting + bump: minor