From d38bb9533b70abb7eff4e8770177d7840899ca86 Mon Sep 17 00:00:00 2001
From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com>
Date: Tue, 14 Jan 2025 19:10:27 +0200
Subject: [PATCH] approval-voting: Fix sending of assignments after restart
 (#6973)

There is a problem on restart where nodes will not trigger their needed
assignment if they were offline while the time of the assignment passed.

That happens because after restart we will hit this condition
https://github.com/paritytech/polkadot-sdk/blob/4e805ca05067f6ed970f33f9be51483185b0cc0b/polkadot/node/core/approval-voting/src/lib.rs#L2495
and considered will be `tick_now` which is already higher than the tick
of our assignment.

The fix is to schedule a wakeup for untriggered assignments at restart
and let the logic of processing an wakeup decide if it needs to trigger
the assignment or not.

One thing that we need to be careful here is to make sure we don't
schedule the wake up immediately after restart because, the node would
still be behind with all the assignments that should have received and
might make it wrongfully decide it needs to trigger its assignment, so I
added a `RESTART_WAKEUP_DELAY: Tick = 12` which should be more than
enough for the node to catch up.

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: ordian <write@reusable.software>
Co-authored-by: Andrei Eres <eresav@me.com>
---
 polkadot/node/core/approval-voting/src/lib.rs |  25 +-
 .../node/core/approval-voting/src/tests.rs    | 246 ++++++++++++++++++
 prdoc/pr_6973.prdoc                           |  16 ++
 3 files changed, 286 insertions(+), 1 deletion(-)
 create mode 100644 prdoc/pr_6973.prdoc

diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs
index 27361df3731..b4c2a6afee0 100644
--- a/polkadot/node/core/approval-voting/src/lib.rs
+++ b/polkadot/node/core/approval-voting/src/lib.rs
@@ -132,6 +132,16 @@ pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
 // The max number of ticks we delay sending the approval after we are ready to issue the approval
 const MAX_APPROVAL_COALESCE_WAIT_TICKS: Tick = 12;
 
+// If the node restarted and the tranche has passed without the assignment
+// being trigger, we won't trigger the assignment at restart because we don't have
+// an wakeup schedule for it.
+// The solution, is to always schedule a wake up after the restart and let the
+// process_wakeup to decide if the assignment needs to be triggered.
+// We need to have a delay after restart to give time to the node to catch up with
+// messages and not trigger its assignment unnecessarily, because it hasn't seen
+// the assignments from the other validators.
+const RESTART_WAKEUP_DELAY: Tick = 12;
+
 /// Configuration for the approval voting subsystem
 #[derive(Debug, Clone)]
 pub struct Config {
@@ -1837,7 +1847,20 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi
 			match candidate_entry.approval_entry(&block_hash) {
 				Some(approval_entry) => {
 					match approval_entry.local_statements() {
-						(None, None) | (None, Some(_)) => {}, // second is impossible case.
+						(None, None) =>
+							if approval_entry
+								.our_assignment()
+								.map(|assignment| !assignment.triggered())
+								.unwrap_or(false)
+							{
+								actions.push(Action::ScheduleWakeup {
+									block_hash,
+									block_number: block_entry.block_number(),
+									candidate_hash: *candidate_hash,
+									tick: state.clock.tick_now() + RESTART_WAKEUP_DELAY,
+								})
+							},
+						(None, Some(_)) => {}, // second is impossible case.
 						(Some(assignment), None) => {
 							let claimed_core_indices =
 								get_core_indices_on_startup(&assignment.cert().kind, *core_index);
diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs
index b72993fe1a9..9fe716833b8 100644
--- a/polkadot/node/core/approval-voting/src/tests.rs
+++ b/polkadot/node/core/approval-voting/src/tests.rs
@@ -5380,6 +5380,252 @@ fn subsystem_sends_assignment_approval_in_correct_order_on_approval_restart() {
 	});
 }
 
+// Test that if the subsystem missed the triggering of some tranches because it was not running
+// it launches the missed assignements on restart.
+#[test]
+fn subsystem_launches_missed_assignments_on_restart() {
+	let test_tranche = 20;
+	let assignment_criteria = Box::new(MockAssignmentCriteria(
+		move || {
+			let mut assignments = HashMap::new();
+			let _ = assignments.insert(
+				CoreIndex(0),
+				approval_db::v2::OurAssignment {
+					cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay {
+						core_index: CoreIndex(0),
+					}),
+					tranche: test_tranche,
+					validator_index: ValidatorIndex(0),
+					triggered: false,
+				}
+				.into(),
+			);
+
+			assignments
+		},
+		|_| Ok(0),
+	));
+	let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build();
+	let store = config.backend();
+	let store_clone = config.backend();
+
+	test_harness(config, |test_harness| async move {
+		let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
+				rx.send(Ok(0)).unwrap();
+			}
+		);
+
+		let block_hash = Hash::repeat_byte(0x01);
+		let fork_block_hash = Hash::repeat_byte(0x02);
+		let candidate_commitments = CandidateCommitments::default();
+		let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
+		candidate_receipt.commitments_hash = candidate_commitments.hash();
+		let candidate_hash = candidate_receipt.hash();
+		let slot = Slot::from(1);
+		let (chain_builder, _session_info) = build_chain_with_two_blocks_with_one_candidate_each(
+			block_hash,
+			fork_block_hash,
+			slot,
+			sync_oracle_handle,
+			candidate_receipt,
+		)
+		.await;
+		chain_builder.build(&mut virtual_overseer).await;
+
+		assert!(!clock.inner.lock().current_wakeup_is(1));
+		clock.inner.lock().wakeup_all(1);
+
+		assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot) + test_tranche as u64));
+		clock.inner.lock().wakeup_all(slot_to_tick(slot));
+
+		futures_timer::Delay::new(Duration::from_millis(200)).await;
+
+		clock.inner.lock().wakeup_all(slot_to_tick(slot + 2));
+
+		assert_eq!(clock.inner.lock().wakeups.len(), 0);
+
+		futures_timer::Delay::new(Duration::from_millis(200)).await;
+
+		let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
+		let our_assignment =
+			candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
+		assert!(!our_assignment.triggered());
+
+		// Assignment is not triggered because its tranches has not been reached.
+		virtual_overseer
+	});
+
+	// Restart a new approval voting subsystem with the same database and major syncing true until
+	// the last leaf.
+	let config = HarnessConfigBuilder::default().backend(store_clone).major_syncing(true).build();
+
+	test_harness(config, |test_harness| async move {
+		let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;
+		let slot = Slot::from(1);
+		// 1. Set the clock to the to a tick past the tranche where the assignment should be
+		//    triggered.
+		clock.inner.lock().set_tick(slot_to_tick(slot) + 2 * test_tranche as u64);
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
+				rx.send(Ok(0)).unwrap();
+			}
+		);
+
+		let block_hash = Hash::repeat_byte(0x01);
+		let fork_block_hash = Hash::repeat_byte(0x02);
+		let candidate_commitments = CandidateCommitments::default();
+		let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
+		candidate_receipt.commitments_hash = candidate_commitments.hash();
+		let (chain_builder, session_info) = build_chain_with_two_blocks_with_one_candidate_each(
+			block_hash,
+			fork_block_hash,
+			slot,
+			sync_oracle_handle,
+			candidate_receipt,
+		)
+		.await;
+
+		chain_builder.build(&mut virtual_overseer).await;
+
+		futures_timer::Delay::new(Duration::from_millis(2000)).await;
+
+		// On major syncing ending Approval voting should send all the necessary messages for a
+		// candidate to be approved.
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks(
+				_,
+			)) => {
+			}
+		);
+
+		clock
+			.inner
+			.lock()
+			.wakeup_all(slot_to_tick(slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY - 1);
+
+		// Subsystem should not send any messages because the assignment is not triggered yet.
+		assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());
+
+		// Set the clock to the tick where the assignment should be triggered.
+		clock
+			.inner
+			.lock()
+			.wakeup_all(slot_to_tick(slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY);
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::RuntimeApi(
+				RuntimeApiMessage::Request(
+					_,
+					RuntimeApiRequest::SessionInfo(_, si_tx),
+				)
+			) => {
+				si_tx.send(Ok(Some(session_info.clone()))).unwrap();
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::RuntimeApi(
+				RuntimeApiMessage::Request(
+					_,
+					RuntimeApiRequest::SessionExecutorParams(_, si_tx),
+				)
+			) => {
+				// Make sure all SessionExecutorParams calls are not made for the leaf (but for its relay parent)
+				si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::RuntimeApi(
+				RuntimeApiMessage::Request(_, RuntimeApiRequest::NodeFeatures(_, si_tx), )
+			) => {
+				si_tx.send(Ok(NodeFeatures::EMPTY)).unwrap();
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
+				_,
+				_,
+			)) => {
+			}
+		);
+
+		// Guarantees the approval work has been relaunched.
+		recover_available_data(&mut virtual_overseer).await;
+		fetch_validation_code(&mut virtual_overseer).await;
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive {
+				exec_kind,
+				response_sender,
+				..
+			}) if exec_kind == PvfExecKind::Approval => {
+				response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
+					.unwrap();
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
+				let _ = sender.send(Ok(ApprovalVotingParams {
+					max_approval_coalesce_count: 1,
+				}));
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_))
+		);
+
+		clock
+			.inner
+			.lock()
+			.wakeup_all(slot_to_tick(slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY);
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
+				_,
+				_,
+			)) => {
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
+				let _ = sender.send(Ok(ApprovalVotingParams {
+					max_approval_coalesce_count: 1,
+				}));
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_))
+		);
+
+		// Assert that there are no more messages being sent by the subsystem
+		assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());
+
+		virtual_overseer
+	});
+}
+
 // Test we correctly update the timer when we mark the beginning of gathering assignments.
 #[test]
 fn test_gathering_assignments_statements() {
diff --git a/prdoc/pr_6973.prdoc b/prdoc/pr_6973.prdoc
new file mode 100644
index 00000000000..416789b9171
--- /dev/null
+++ b/prdoc/pr_6973.prdoc
@@ -0,0 +1,16 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: approval-voting fix sending of assignments after restart
+
+doc:
+  - audience: Node Dev
+    description: |
+      There is a problem on restart where nodes will not trigger their needed assignment if 
+      they were offline and the time of the assignment passed, so after restart always 
+      schedule a wakeup so that nodes a have the opportunity of triggering their assignments
+      if they are still needed.
+
+crates:
+  - name: polkadot-node-core-approval-voting
+    bump: minor
-- 
GitLab