diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 2f9a9069cd46edf356ecb21f869e27fbfcf9dd22..6a9a86321b12f76becb975fac4c07b21d89ff82c 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -106,14 +106,13 @@ impl Requester { { tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads"); let ActiveLeavesUpdate { activated, deactivated } = update; - // Order important! We need to handle activated, prior to deactivated, otherwise we might - // cancel still needed jobs. - if let Some(activated) = activated { - // Stale leaves happen after a reversion - we don't want to re-run availability there. - if let LeafStatus::Fresh = activated.status { - self.start_requesting_chunks(ctx, runtime, activated).await?; - } + // Stale leaves happen after a reversion - we don't want to re-run availability there. + if let Some(leaf) = activated.filter(|leaf| leaf.status == LeafStatus::Fresh) { + // Order important! We need to handle activated, prior to deactivated, otherwise we might + // cancel still needed jobs. + self.start_requesting_chunks(ctx, runtime, leaf).await?; } + self.stop_requesting_chunks(deactivated.into_iter()); Ok(()) } @@ -212,15 +211,24 @@ impl Requester { .with_session_info( ctx, runtime, - // We use leaf here, as relay_parent must be in the same session as the - // leaf. (Cores are dropped at session boundaries.) At the same time, - // only leaves are guaranteed to be fetchable by the state trie. + // We use leaf here, the relay_parent must be in the same session as the + // leaf. This is guaranteed by runtime which ensures that cores are cleared + // at session boundaries. At the same time, only leaves are guaranteed to + // be fetchable by the state trie. leaf, |info| FetchTaskConfig::new(leaf, &core, tx, metrics, info), ) - .await?; - - if let Some(task_cfg) = task_cfg { + .await + .map_err(|err| { + tracing::warn!( + target: LOG_TARGET, + error = ?err, + "Failed to spawn a fetch task" + ); + err + }); + + if let Ok(Some(task_cfg)) = task_cfg { e.insert(FetchTask::start(task_cfg, ctx).await?); } // Not a validator, nothing to do. @@ -274,7 +282,7 @@ where // `head` is the child of the first block in `ancestors`, request its session index. let head_session_index = match ancestors_iter.next() { - Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?, + Some(parent) => runtime.get_session_index_for_child(ctx.sender(), *parent).await?, None => { // No first element, i.e. empty. return Ok(ancestors) @@ -285,7 +293,7 @@ where // The first parent is skipped. for parent in ancestors_iter { // Parent is the i-th ancestor, request session index for its child -- (i-1)th element. - let session_index = runtime.get_session_index(ctx.sender(), *parent).await?; + let session_index = runtime.get_session_index_for_child(ctx.sender(), *parent).await?; if session_index == head_session_index { session_ancestry_len += 1; } else { diff --git a/polkadot/node/network/availability-distribution/src/requester/session_cache.rs b/polkadot/node/network/availability-distribution/src/requester/session_cache.rs index 1dbab38cfff0418e11ab50d02088e180367e482c..10fda8cd9c6ad2492c3330b453a6de0439264327 100644 --- a/polkadot/node/network/availability-distribution/src/requester/session_cache.rs +++ b/polkadot/node/network/availability-distribution/src/requester/session_cache.rs @@ -105,7 +105,7 @@ impl SessionCache { Context: SubsystemContext, F: FnOnce(&SessionInfo) -> R, { - let session_index = runtime.get_session_index(ctx.sender(), parent).await?; + let session_index = runtime.get_session_index_for_child(ctx.sender(), parent).await?; if let Some(o_info) = self.session_info_cache.get(&session_index) { tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru"); @@ -177,13 +177,15 @@ impl SessionCache { &self, ctx: &mut Context, runtime: &mut RuntimeInfo, - parent: Hash, + relay_parent: Hash, session_index: SessionIndex, ) -> Result<Option<SessionInfo>, Error> where Context: SubsystemContext, { - let info = runtime.get_session_info_by_index(ctx.sender(), parent, session_index).await?; + let info = runtime + .get_session_info_by_index(ctx.sender(), relay_parent, session_index) + .await?; let discovery_keys = info.session_info.discovery_keys.clone(); let mut validator_groups = info.session_info.validator_groups.clone(); diff --git a/polkadot/node/network/availability-distribution/src/requester/tests.rs b/polkadot/node/network/availability-distribution/src/requester/tests.rs index e75c1bd8bb0162b1ad3287360188a8a08940c258..3f6d284d9b921a96c75be262400575cf77ac4fe5 100644 --- a/polkadot/node/network/availability-distribution/src/requester/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/tests.rs @@ -25,6 +25,7 @@ use polkadot_primitives::{ v1::{BlockNumber, CoreState, GroupIndex, Hash, Id, ScheduledCore, SessionIndex}, v2::SessionInfo, }; + use polkadot_subsystem::{ messages::{ AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage, diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 758a978cc9083d42c25000c766e220ba553ab295..58d6898bc2ac5208e72a50c3a7a3d71513c3c2b0 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -434,7 +434,7 @@ where Context: SubsystemContext<Message = CollatorProtocolMessage>, Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>, { - let session_index = runtime.get_session_index(ctx.sender(), relay_parent).await?; + let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?; let info = &runtime .get_session_info_by_index(ctx.sender(), relay_parent, session_index) .await? diff --git a/polkadot/node/network/dispute-distribution/src/sender/mod.rs b/polkadot/node/network/dispute-distribution/src/sender/mod.rs index 55d88f1310ae1b8349cf44fb91b8b287863f572f..ee2e15b576485ef3bb90415fae31f45490e6125d 100644 --- a/polkadot/node/network/dispute-distribution/src/sender/mod.rs +++ b/polkadot/node/network/dispute-distribution/src/sender/mod.rs @@ -190,8 +190,26 @@ impl DisputeSender { dispute: (SessionIndex, CandidateHash), ) -> Result<()> { let (session_index, candidate_hash) = dispute; - // We need some relay chain head for context for receiving session info information: - let ref_head = self.active_sessions.values().next().ok_or(NonFatal::NoActiveHeads)?; + // A relay chain head is required as context for receiving session info information from runtime and + // storage. We will iterate `active_sessions` to find a suitable head. We assume that there is at + // least one active head which, by `session_index`, is at least as recent as the `dispute` passed in. + // We need to avoid picking an older one from a session that might not yet exist in storage. + // Related to <https://github.com/paritytech/polkadot/issues/4730> . + let ref_head = self + .active_sessions + .iter() + .find_map(|(active_session_index, head_hash)| { + // There might be more than one session index that is at least as recent as the dispute + // so we just pick the first one. Keep in mind we are talking about the session index for the + // child of block identified by `head_hash` and not the session index for the block. + if active_session_index >= &session_index { + Some(head_hash) + } else { + None + } + }) + .ok_or(NonFatal::NoActiveHeads)?; + let info = runtime .get_session_info_by_index(ctx.sender(), *ref_head, session_index) .await?; @@ -293,7 +311,7 @@ impl DisputeSender { ctx: &mut Context, runtime: &mut RuntimeInfo, ) -> Result<bool> { - let new_sessions = get_active_session_indeces(ctx, runtime, &self.active_heads).await?; + let new_sessions = get_active_session_indices(ctx, runtime, &self.active_heads).await?; let new_sessions_raw: HashSet<_> = new_sessions.keys().collect(); let old_sessions_raw: HashSet<_> = self.active_sessions.keys().collect(); let updated = new_sessions_raw != old_sessions_raw; @@ -306,14 +324,15 @@ impl DisputeSender { /// Retrieve the currently active sessions. /// /// List is all indices of all active sessions together with the head that was used for the query. -async fn get_active_session_indeces<Context: SubsystemContext>( +async fn get_active_session_indices<Context: SubsystemContext>( ctx: &mut Context, runtime: &mut RuntimeInfo, active_heads: &Vec<Hash>, ) -> Result<HashMap<SessionIndex, Hash>> { let mut indeces = HashMap::new(); + // Iterate all heads we track as active and fetch the child' session indices. for head in active_heads { - let session_index = runtime.get_session_index(ctx.sender(), *head).await?; + let session_index = runtime.get_session_index_for_child(ctx.sender(), *head).await?; indeces.insert(session_index, *head); } Ok(indeces) diff --git a/polkadot/node/network/dispute-distribution/src/sender/send_task.rs b/polkadot/node/network/dispute-distribution/src/sender/send_task.rs index 8a71a0cacbfe930b65c3d577e34a3574491b28f0..ed4aab74939a34bc8603daead15c39276136368a 100644 --- a/polkadot/node/network/dispute-distribution/src/sender/send_task.rs +++ b/polkadot/node/network/dispute-distribution/src/sender/send_task.rs @@ -204,7 +204,8 @@ impl SendTask { active_sessions: &HashMap<SessionIndex, Hash>, ) -> Result<HashSet<AuthorityDiscoveryId>> { let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent; - // Parachain validators: + // Retrieve all authorities which participated in the parachain consensus of the session + // in which the candidate was backed. let info = runtime .get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index) .await?; @@ -219,7 +220,8 @@ impl SendTask { .map(|(_, v)| v.clone()) .collect(); - // Current authorities: + // Retrieve all authorities for the current session as indicated by the active + // heads we are tracking. for (session_index, head) in active_sessions.iter() { let info = runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?; diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 2194aa806a604fa033e9bc096b15ab8088f6ebba..2c522a340a2a6ce1c9e0a7f29d0c144a44eae59f 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -624,9 +624,9 @@ struct ActiveHeadData { statements: IndexMap<StoredStatementComparator, SignedFullStatement>, /// Large statements we are waiting for with associated meta data. waiting_large_statements: HashMap<CandidateHash, LargeStatementStatus>, - /// The validators at this head. + /// The parachain validators at the head's child session index. validators: Vec<ValidatorId>, - /// The session index this head is at. + /// The current session index of this fork. session_index: sp_staking::SessionIndex, /// How many `Seconded` statements we've seen per validator. seconded_counts: HashMap<ValidatorIndex, usize>, @@ -1798,8 +1798,9 @@ impl StatementDistributionSubsystem { "New active leaf", ); + // Retrieve the parachain validators at the child of the head we track. let session_index = - runtime.get_session_index(ctx.sender(), relay_parent).await?; + runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?; let info = runtime .get_session_info_by_index(ctx.sender(), relay_parent, session_index) .await?; diff --git a/polkadot/node/subsystem-types/src/lib.rs b/polkadot/node/subsystem-types/src/lib.rs index 797e195a5854559f2200ee4618c2149eb8e13f20..e2150e14abe5bd3412e45376dfff3fa04087e16b 100644 --- a/polkadot/node/subsystem-types/src/lib.rs +++ b/polkadot/node/subsystem-types/src/lib.rs @@ -40,7 +40,7 @@ pub use polkadot_node_jaeger as jaeger; const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8; /// The status of an activated leaf. -#[derive(Debug, Clone, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum LeafStatus { /// A leaf is fresh when it's the first time the leaf has been encountered. /// Most leaves should be fresh. diff --git a/polkadot/node/subsystem-util/src/rolling_session_window.rs b/polkadot/node/subsystem-util/src/rolling_session_window.rs index 5e11d2fe54469b513e0c4a3cac387e886d2793e6..1f204d3026772212b1ca7e509f1aa3e08fd7914a 100644 --- a/polkadot/node/subsystem-util/src/rolling_session_window.rs +++ b/polkadot/node/subsystem-util/src/rolling_session_window.rs @@ -102,7 +102,7 @@ impl RollingSessionWindow { window_size: SessionWindowSize, block_hash: Hash, ) -> Result<Self, SessionsUnavailable> { - let session_index = get_session_index_for_head(ctx, block_hash).await?; + let session_index = get_session_index_for_child(ctx, block_hash).await?; let window_start = session_index.saturating_sub(window_size.get() - 1); @@ -160,7 +160,7 @@ impl RollingSessionWindow { ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), block_hash: Hash, ) -> Result<SessionWindowUpdate, SessionsUnavailable> { - let session_index = get_session_index_for_head(ctx, block_hash).await?; + let session_index = get_session_index_for_child(ctx, block_hash).await?; let old_window_start = self.earliest_session; @@ -212,7 +212,12 @@ impl RollingSessionWindow { } } -async fn get_session_index_for_head( +// Returns the session index expected at any child of the `parent` block. +// +// Note: We could use `RuntimeInfo::get_session_index_for_child` here but it's +// cleaner to just call the runtime API directly without needing to create an instance +// of `RuntimeInfo`. +async fn get_session_index_for_child( ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), block_hash: Hash, ) -> Result<SessionIndex, SessionsUnavailable> { diff --git a/polkadot/node/subsystem-util/src/runtime/mod.rs b/polkadot/node/subsystem-util/src/runtime/mod.rs index fc5a3446495662eb7839dee282e50809978a793e..d7afac0b58c2b67ae55698407efbe09dc7c22cfc 100644 --- a/polkadot/node/subsystem-util/src/runtime/mod.rs +++ b/polkadot/node/subsystem-util/src/runtime/mod.rs @@ -117,8 +117,9 @@ impl RuntimeInfo { } } - /// Retrieve the current session index. - pub async fn get_session_index<Sender>( + /// Returns the session index expected at any child of the `parent` block. + /// This does not return the session index for the `parent` block. + pub async fn get_session_index_for_child<Sender>( &mut self, sender: &mut Sender, parent: Hash, @@ -141,14 +142,14 @@ impl RuntimeInfo { pub async fn get_session_info<'a, Sender>( &'a mut self, sender: &mut Sender, - parent: Hash, + relay_parent: Hash, ) -> Result<&'a ExtendedSessionInfo> where Sender: SubsystemSender, { - let session_index = self.get_session_index(sender, parent).await?; + let session_index = self.get_session_index_for_child(sender, relay_parent).await?; - self.get_session_info_by_index(sender, parent, session_index).await + self.get_session_info_by_index(sender, relay_parent, session_index).await } /// Get `ExtendedSessionInfo` by session index. @@ -185,7 +186,7 @@ impl RuntimeInfo { pub async fn check_signature<Sender, Payload, RealPayload>( &mut self, sender: &mut Sender, - parent: Hash, + relay_parent: Hash, signed: UncheckedSigned<Payload, RealPayload>, ) -> Result< std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>>, @@ -195,9 +196,9 @@ impl RuntimeInfo { Payload: EncodeAs<RealPayload> + Clone, RealPayload: Encode + Clone, { - let session_index = self.get_session_index(sender, parent).await?; - let info = self.get_session_info_by_index(sender, parent, session_index).await?; - Ok(check_signature(session_index, &info.session_info, parent, signed)) + let session_index = self.get_session_index_for_child(sender, relay_parent).await?; + let info = self.get_session_info_by_index(sender, relay_parent, session_index).await?; + Ok(check_signature(session_index, &info.session_info, relay_parent, signed)) } /// Build `ValidatorInfo` for the current session.