From 806cb541b72f2be9574668b09edcd2712428282f Mon Sep 17 00:00:00 2001
From: sandreim <54316454+sandreim@users.noreply.github.com>
Date: Wed, 26 Jan 2022 17:17:46 +0200
Subject: [PATCH] Refactor and fix usage of `get_session_index()` and
 `get_session_info_by_index()` (#4735)

* Rename/refactor around get_session_index

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* choose proper head for fetching session

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* revert rename

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix comments

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* renaming and more comments

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* review feedback

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Run Fetch task in correct session

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Log warning when ancestors unavailable

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Fixes

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
---
 .../src/requester/mod.rs                      | 38 +++++++++++--------
 .../src/requester/session_cache.rs            |  8 ++--
 .../src/requester/tests.rs                    |  1 +
 .../src/collator_side/mod.rs                  |  2 +-
 .../dispute-distribution/src/sender/mod.rs    | 29 +++++++++++---
 .../src/sender/send_task.rs                   |  6 ++-
 .../network/statement-distribution/src/lib.rs |  7 ++--
 polkadot/node/subsystem-types/src/lib.rs      |  2 +-
 .../src/rolling_session_window.rs             | 11 ++++--
 .../node/subsystem-util/src/runtime/mod.rs    | 19 +++++-----
 10 files changed, 81 insertions(+), 42 deletions(-)

diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs
index 2f9a9069cd4..6a9a86321b1 100644
--- a/polkadot/node/network/availability-distribution/src/requester/mod.rs
+++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs
@@ -106,14 +106,13 @@ impl Requester {
 	{
 		tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
 		let ActiveLeavesUpdate { activated, deactivated } = update;
-		// Order important! We need to handle activated, prior to deactivated, otherwise we might
-		// cancel still needed jobs.
-		if let Some(activated) = activated {
-			// Stale leaves happen after a reversion - we don't want to re-run availability there.
-			if let LeafStatus::Fresh = activated.status {
-				self.start_requesting_chunks(ctx, runtime, activated).await?;
-			}
+		// Stale leaves happen after a reversion - we don't want to re-run availability there.
+		if let Some(leaf) = activated.filter(|leaf| leaf.status == LeafStatus::Fresh) {
+			// Order important! We need to handle activated, prior to deactivated, otherwise we might
+			// cancel still needed jobs.
+			self.start_requesting_chunks(ctx, runtime, leaf).await?;
 		}
+
 		self.stop_requesting_chunks(deactivated.into_iter());
 		Ok(())
 	}
@@ -212,15 +211,24 @@ impl Requester {
 						.with_session_info(
 							ctx,
 							runtime,
-							// We use leaf here, as relay_parent must be in the same session as the
-							// leaf. (Cores are dropped at session boundaries.) At the same time,
-							// only leaves are guaranteed to be fetchable by the state trie.
+							// We use leaf here, the relay_parent must be in the same session as the
+							// leaf. This is guaranteed by runtime which ensures that cores are cleared
+							// at session boundaries. At the same time, only leaves are guaranteed to
+							// be fetchable by the state trie.
 							leaf,
 							|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info),
 						)
-						.await?;
-
-					if let Some(task_cfg) = task_cfg {
+						.await
+						.map_err(|err| {
+							tracing::warn!(
+								target: LOG_TARGET,
+								error = ?err,
+								"Failed to spawn a fetch task"
+							);
+							err
+						});
+
+					if let Ok(Some(task_cfg)) = task_cfg {
 						e.insert(FetchTask::start(task_cfg, ctx).await?);
 					}
 					// Not a validator, nothing to do.
@@ -274,7 +282,7 @@ where
 
 	// `head` is the child of the first block in `ancestors`, request its session index.
 	let head_session_index = match ancestors_iter.next() {
-		Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?,
+		Some(parent) => runtime.get_session_index_for_child(ctx.sender(), *parent).await?,
 		None => {
 			// No first element, i.e. empty.
 			return Ok(ancestors)
@@ -285,7 +293,7 @@ where
 	// The first parent is skipped.
 	for parent in ancestors_iter {
 		// Parent is the i-th ancestor, request session index for its child -- (i-1)th element.
-		let session_index = runtime.get_session_index(ctx.sender(), *parent).await?;
+		let session_index = runtime.get_session_index_for_child(ctx.sender(), *parent).await?;
 		if session_index == head_session_index {
 			session_ancestry_len += 1;
 		} else {
diff --git a/polkadot/node/network/availability-distribution/src/requester/session_cache.rs b/polkadot/node/network/availability-distribution/src/requester/session_cache.rs
index 1dbab38cfff..10fda8cd9c6 100644
--- a/polkadot/node/network/availability-distribution/src/requester/session_cache.rs
+++ b/polkadot/node/network/availability-distribution/src/requester/session_cache.rs
@@ -105,7 +105,7 @@ impl SessionCache {
 		Context: SubsystemContext,
 		F: FnOnce(&SessionInfo) -> R,
 	{
-		let session_index = runtime.get_session_index(ctx.sender(), parent).await?;
+		let session_index = runtime.get_session_index_for_child(ctx.sender(), parent).await?;
 
 		if let Some(o_info) = self.session_info_cache.get(&session_index) {
 			tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
@@ -177,13 +177,15 @@ impl SessionCache {
 		&self,
 		ctx: &mut Context,
 		runtime: &mut RuntimeInfo,
-		parent: Hash,
+		relay_parent: Hash,
 		session_index: SessionIndex,
 	) -> Result<Option<SessionInfo>, Error>
 	where
 		Context: SubsystemContext,
 	{
-		let info = runtime.get_session_info_by_index(ctx.sender(), parent, session_index).await?;
+		let info = runtime
+			.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
+			.await?;
 
 		let discovery_keys = info.session_info.discovery_keys.clone();
 		let mut validator_groups = info.session_info.validator_groups.clone();
diff --git a/polkadot/node/network/availability-distribution/src/requester/tests.rs b/polkadot/node/network/availability-distribution/src/requester/tests.rs
index e75c1bd8bb0..3f6d284d9b9 100644
--- a/polkadot/node/network/availability-distribution/src/requester/tests.rs
+++ b/polkadot/node/network/availability-distribution/src/requester/tests.rs
@@ -25,6 +25,7 @@ use polkadot_primitives::{
 	v1::{BlockNumber, CoreState, GroupIndex, Hash, Id, ScheduledCore, SessionIndex},
 	v2::SessionInfo,
 };
+
 use polkadot_subsystem::{
 	messages::{
 		AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
index 758a978cc90..58d6898bc2a 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
@@ -434,7 +434,7 @@ where
 	Context: SubsystemContext<Message = CollatorProtocolMessage>,
 	Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
 {
-	let session_index = runtime.get_session_index(ctx.sender(), relay_parent).await?;
+	let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
 	let info = &runtime
 		.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
 		.await?
diff --git a/polkadot/node/network/dispute-distribution/src/sender/mod.rs b/polkadot/node/network/dispute-distribution/src/sender/mod.rs
index 55d88f1310a..ee2e15b5764 100644
--- a/polkadot/node/network/dispute-distribution/src/sender/mod.rs
+++ b/polkadot/node/network/dispute-distribution/src/sender/mod.rs
@@ -190,8 +190,26 @@ impl DisputeSender {
 		dispute: (SessionIndex, CandidateHash),
 	) -> Result<()> {
 		let (session_index, candidate_hash) = dispute;
-		// We need some relay chain head for context for receiving session info information:
-		let ref_head = self.active_sessions.values().next().ok_or(NonFatal::NoActiveHeads)?;
+		// A relay chain head is required as context for receiving session info information from runtime and
+		// storage. We will iterate `active_sessions` to find a suitable head. We assume that there is at
+		// least one active head which, by `session_index`, is at least as recent as the `dispute` passed in.
+		// We need to avoid picking an older one from a session that might not yet exist in storage.
+		// Related to <https://github.com/paritytech/polkadot/issues/4730> .
+		let ref_head = self
+			.active_sessions
+			.iter()
+			.find_map(|(active_session_index, head_hash)| {
+				// There might be more than one session index that is at least as recent as the dispute
+				// so we just pick the first one. Keep in mind we are talking about the session index for the
+				// child of block identified by `head_hash` and not the session index for the block.
+				if active_session_index >= &session_index {
+					Some(head_hash)
+				} else {
+					None
+				}
+			})
+			.ok_or(NonFatal::NoActiveHeads)?;
+
 		let info = runtime
 			.get_session_info_by_index(ctx.sender(), *ref_head, session_index)
 			.await?;
@@ -293,7 +311,7 @@ impl DisputeSender {
 		ctx: &mut Context,
 		runtime: &mut RuntimeInfo,
 	) -> Result<bool> {
-		let new_sessions = get_active_session_indeces(ctx, runtime, &self.active_heads).await?;
+		let new_sessions = get_active_session_indices(ctx, runtime, &self.active_heads).await?;
 		let new_sessions_raw: HashSet<_> = new_sessions.keys().collect();
 		let old_sessions_raw: HashSet<_> = self.active_sessions.keys().collect();
 		let updated = new_sessions_raw != old_sessions_raw;
@@ -306,14 +324,15 @@ impl DisputeSender {
 /// Retrieve the currently active sessions.
 ///
 /// List is all indices of all active sessions together with the head that was used for the query.
-async fn get_active_session_indeces<Context: SubsystemContext>(
+async fn get_active_session_indices<Context: SubsystemContext>(
 	ctx: &mut Context,
 	runtime: &mut RuntimeInfo,
 	active_heads: &Vec<Hash>,
 ) -> Result<HashMap<SessionIndex, Hash>> {
 	let mut indeces = HashMap::new();
+	// Iterate all heads we track as active and fetch the child' session indices.
 	for head in active_heads {
-		let session_index = runtime.get_session_index(ctx.sender(), *head).await?;
+		let session_index = runtime.get_session_index_for_child(ctx.sender(), *head).await?;
 		indeces.insert(session_index, *head);
 	}
 	Ok(indeces)
diff --git a/polkadot/node/network/dispute-distribution/src/sender/send_task.rs b/polkadot/node/network/dispute-distribution/src/sender/send_task.rs
index 8a71a0cacbf..ed4aab74939 100644
--- a/polkadot/node/network/dispute-distribution/src/sender/send_task.rs
+++ b/polkadot/node/network/dispute-distribution/src/sender/send_task.rs
@@ -204,7 +204,8 @@ impl SendTask {
 		active_sessions: &HashMap<SessionIndex, Hash>,
 	) -> Result<HashSet<AuthorityDiscoveryId>> {
 		let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent;
-		// Parachain validators:
+		// Retrieve all authorities which participated in the parachain consensus of the session
+		// in which the candidate was backed.
 		let info = runtime
 			.get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index)
 			.await?;
@@ -219,7 +220,8 @@ impl SendTask {
 			.map(|(_, v)| v.clone())
 			.collect();
 
-		// Current authorities:
+		// Retrieve all authorities for the current session as indicated by the active
+		// heads we are tracking.
 		for (session_index, head) in active_sessions.iter() {
 			let info =
 				runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?;
diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs
index 2194aa806a6..2c522a340a2 100644
--- a/polkadot/node/network/statement-distribution/src/lib.rs
+++ b/polkadot/node/network/statement-distribution/src/lib.rs
@@ -624,9 +624,9 @@ struct ActiveHeadData {
 	statements: IndexMap<StoredStatementComparator, SignedFullStatement>,
 	/// Large statements we are waiting for with associated meta data.
 	waiting_large_statements: HashMap<CandidateHash, LargeStatementStatus>,
-	/// The validators at this head.
+	/// The parachain validators at the head's child session index.
 	validators: Vec<ValidatorId>,
-	/// The session index this head is at.
+	/// The current session index of this fork.
 	session_index: sp_staking::SessionIndex,
 	/// How many `Seconded` statements we've seen per validator.
 	seconded_counts: HashMap<ValidatorIndex, usize>,
@@ -1798,8 +1798,9 @@ impl StatementDistributionSubsystem {
 						"New active leaf",
 					);
 
+					// Retrieve the parachain validators at the child of the head we track.
 					let session_index =
-						runtime.get_session_index(ctx.sender(), relay_parent).await?;
+						runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
 					let info = runtime
 						.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
 						.await?;
diff --git a/polkadot/node/subsystem-types/src/lib.rs b/polkadot/node/subsystem-types/src/lib.rs
index 797e195a585..e2150e14abe 100644
--- a/polkadot/node/subsystem-types/src/lib.rs
+++ b/polkadot/node/subsystem-types/src/lib.rs
@@ -40,7 +40,7 @@ pub use polkadot_node_jaeger as jaeger;
 const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;
 
 /// The status of an activated leaf.
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Clone, Debug, PartialEq)]
 pub enum LeafStatus {
 	/// A leaf is fresh when it's the first time the leaf has been encountered.
 	/// Most leaves should be fresh.
diff --git a/polkadot/node/subsystem-util/src/rolling_session_window.rs b/polkadot/node/subsystem-util/src/rolling_session_window.rs
index 5e11d2fe544..1f204d30267 100644
--- a/polkadot/node/subsystem-util/src/rolling_session_window.rs
+++ b/polkadot/node/subsystem-util/src/rolling_session_window.rs
@@ -102,7 +102,7 @@ impl RollingSessionWindow {
 		window_size: SessionWindowSize,
 		block_hash: Hash,
 	) -> Result<Self, SessionsUnavailable> {
-		let session_index = get_session_index_for_head(ctx, block_hash).await?;
+		let session_index = get_session_index_for_child(ctx, block_hash).await?;
 
 		let window_start = session_index.saturating_sub(window_size.get() - 1);
 
@@ -160,7 +160,7 @@ impl RollingSessionWindow {
 		ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
 		block_hash: Hash,
 	) -> Result<SessionWindowUpdate, SessionsUnavailable> {
-		let session_index = get_session_index_for_head(ctx, block_hash).await?;
+		let session_index = get_session_index_for_child(ctx, block_hash).await?;
 
 		let old_window_start = self.earliest_session;
 
@@ -212,7 +212,12 @@ impl RollingSessionWindow {
 	}
 }
 
-async fn get_session_index_for_head(
+// Returns the session index expected at any child of the `parent` block.
+//
+// Note: We could use `RuntimeInfo::get_session_index_for_child` here but it's
+// cleaner to just call the runtime API directly without needing to create an instance
+// of `RuntimeInfo`.
+async fn get_session_index_for_child(
 	ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
 	block_hash: Hash,
 ) -> Result<SessionIndex, SessionsUnavailable> {
diff --git a/polkadot/node/subsystem-util/src/runtime/mod.rs b/polkadot/node/subsystem-util/src/runtime/mod.rs
index fc5a3446495..d7afac0b58c 100644
--- a/polkadot/node/subsystem-util/src/runtime/mod.rs
+++ b/polkadot/node/subsystem-util/src/runtime/mod.rs
@@ -117,8 +117,9 @@ impl RuntimeInfo {
 		}
 	}
 
-	/// Retrieve the current session index.
-	pub async fn get_session_index<Sender>(
+	/// Returns the session index expected at any child of the `parent` block.
+	/// This does not return the session index for the `parent` block.
+	pub async fn get_session_index_for_child<Sender>(
 		&mut self,
 		sender: &mut Sender,
 		parent: Hash,
@@ -141,14 +142,14 @@ impl RuntimeInfo {
 	pub async fn get_session_info<'a, Sender>(
 		&'a mut self,
 		sender: &mut Sender,
-		parent: Hash,
+		relay_parent: Hash,
 	) -> Result<&'a ExtendedSessionInfo>
 	where
 		Sender: SubsystemSender,
 	{
-		let session_index = self.get_session_index(sender, parent).await?;
+		let session_index = self.get_session_index_for_child(sender, relay_parent).await?;
 
-		self.get_session_info_by_index(sender, parent, session_index).await
+		self.get_session_info_by_index(sender, relay_parent, session_index).await
 	}
 
 	/// Get `ExtendedSessionInfo` by session index.
@@ -185,7 +186,7 @@ impl RuntimeInfo {
 	pub async fn check_signature<Sender, Payload, RealPayload>(
 		&mut self,
 		sender: &mut Sender,
-		parent: Hash,
+		relay_parent: Hash,
 		signed: UncheckedSigned<Payload, RealPayload>,
 	) -> Result<
 		std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>>,
@@ -195,9 +196,9 @@ impl RuntimeInfo {
 		Payload: EncodeAs<RealPayload> + Clone,
 		RealPayload: Encode + Clone,
 	{
-		let session_index = self.get_session_index(sender, parent).await?;
-		let info = self.get_session_info_by_index(sender, parent, session_index).await?;
-		Ok(check_signature(session_index, &info.session_info, parent, signed))
+		let session_index = self.get_session_index_for_child(sender, relay_parent).await?;
+		let info = self.get_session_info_by_index(sender, relay_parent, session_index).await?;
+		Ok(check_signature(session_index, &info.session_info, relay_parent, signed))
 	}
 
 	/// Build `ValidatorInfo` for the current session.
-- 
GitLab