Skip to content
Snippets Groups Projects
Commit 2661930b authored by sandreim's avatar sandreim Committed by GitHub
Browse files

`relay chain selection` and `dispute-coordinator` fixes and improvements (#4752)


* Dont error in finality_target_with_longest_chain

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* fix

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Add error flag

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Add error flag in dispute-coordinator

Make sure to send errors to subsystems requesting data depending on missing session info

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Scrape ancestors

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* fmt

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* fix

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Fix naming

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* review feedback

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* fmt

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* :speech_balloon:

 fixes

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* consume

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* fix tests

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* typo

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* review fixes

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Bump scraped blocks LRU capacity

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* 🧯 :fire:



Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* remove prints

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Increase scraped blocks cache size

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* more review fixes

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* another fix

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* fix target_ancestor

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Scrape up to max finalized block

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* undo comment change

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Limit ancestry lookup to last finalized block or
max finality lag

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* debug damage

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>
parent 109f7309
No related merge requests found
......@@ -16,12 +16,16 @@
//! Dispute coordinator subsystem in initialized state (after first active leaf is received).
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
};
use lru::LruCache;
use sc_keystore::LocalKeystore;
......@@ -37,7 +41,7 @@ use polkadot_node_subsystem::{
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SubsystemContext,
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable,
};
use polkadot_primitives::{
v1::{
......@@ -48,11 +52,12 @@ use polkadot_primitives::{
v2::SessionInfo,
};
use crate::{metrics::Metrics, real::DisputeCoordinatorSubsystem, LOG_TARGET};
use crate::{
error::{log_error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
error::{log_error, Error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
metrics::Metrics,
real::{ordering::get_finalized_block_number, DisputeCoordinatorSubsystem},
status::{get_active_with_status, Clock, DisputeStatus, Timestamp},
LOG_TARGET,
};
use super::{
......@@ -66,6 +71,11 @@ use super::{
OverlayedBackend,
};
// The capacity and scrape depth are equal to the maximum allowed unfinalized depth.
const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 500;
// This is in sync with `MAX_FINALITY_LAG` in relay chain selection.
const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 500;
/// After the first active leaves update we transition to `Initialized` state.
///
/// Before the first active leaves update we can't really do much. We cannot check incoming
......@@ -80,6 +90,11 @@ pub struct Initialized {
ordering_provider: OrderingProvider,
participation_receiver: WorkerMessageReceiver,
metrics: Metrics,
// This tracks only rolling session window failures.
// It can be a `Vec` if the need to track more arises.
error: Option<SessionsUnavailable>,
/// Latest relay blocks that have been successfully scraped.
last_scraped_blocks: LruCache<Hash, ()>,
}
impl Initialized {
......@@ -105,6 +120,8 @@ impl Initialized {
participation,
participation_receiver,
metrics,
error: None,
last_scraped_blocks: LruCache::new(LRU_SCRAPED_BLOCKS_CAPACITY),
}
}
......@@ -245,22 +262,26 @@ impl Initialized {
.await?;
self.participation.process_active_leaves_update(ctx, &update).await?;
let new_activations = update.activated.into_iter().map(|a| a.hash);
for new_leaf in new_activations {
match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await {
if let Some(new_leaf) = update.activated {
match self
.rolling_session_window
.cache_session_info_for_head(ctx, new_leaf.hash)
.await
{
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
);
continue
self.error = Some(e);
},
Ok(SessionWindowUpdate::Advanced {
new_window_end: window_end,
new_window_start,
..
}) => {
self.error = None;
let session = window_end;
if self.highest_session < session {
tracing::trace!(
......@@ -277,7 +298,82 @@ impl Initialized {
},
Ok(SessionWindowUpdate::Unchanged) => {},
};
self.scrape_on_chain_votes(ctx, overlay_db, new_leaf, now).await?;
// Scrape the head if above rolling session update went well.
if self.error.is_none() {
let _ = self
.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now)
.await
.map_err(|err| {
tracing::warn!(
target: LOG_TARGET,
"Skipping scraping block #{}({}) due to error: {}",
new_leaf.number,
new_leaf.hash,
err
);
});
}
// Try to scrape any blocks for which we could not get the current session or did not receive an
// active leaves update.
let ancestors = match get_finalized_block_number(ctx.sender()).await {
Ok(block_number) => {
// Limit our search to last finalized block, or up to max finality lag.
let block_number = std::cmp::max(
block_number,
new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS),
);
// Fetch ancestry up to and including the last finalized block.
// `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to
// pass in it's parent.
OrderingProvider::get_block_ancestors(
ctx.sender(),
new_leaf.hash,
new_leaf.number,
block_number.saturating_sub(1),
&mut self.last_scraped_blocks,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?new_leaf,
error = ?err,
"Skipping leaf ancestors due to an error",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
})
},
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?new_leaf,
error = ?err,
"Skipping leaf ancestors scraping",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
},
};
// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel.
for ancestor in ancestors {
let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err(
|err| {
tracing::warn!(
target: LOG_TARGET,
hash = ?ancestor,
error = ?err,
"Skipping scraping block due to error",
);
},
);
}
}
Ok(())
......@@ -293,6 +389,11 @@ impl Initialized {
new_leaf: Hash,
now: u64,
) -> Result<()> {
// Avoid scraping twice.
if self.last_scraped_blocks.get(&new_leaf).is_some() {
return Ok(())
}
// obtain the concluded disputes as well as the candidate backing votes
// from the new leaf
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = {
......@@ -331,6 +432,9 @@ impl Initialized {
};
if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
// This block is not interesting as it doesnt contain any backing votes or disputes. We'll
// mark it here as scraped to prevent further processing.
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}
......@@ -413,6 +517,7 @@ impl Initialized {
}
if disputes.is_empty() {
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}
......@@ -490,6 +595,8 @@ impl Initialized {
"Attempted import of on-chain statement of concluded dispute failed"),
}
}
self.last_scraped_blocks.put(new_leaf, ());
Ok(())
}
......@@ -533,18 +640,39 @@ impl Initialized {
}
},
DisputeCoordinatorMessage::RecentDisputes(tx) => {
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
// Return error if session information is missing.
self.ensure_available_session_info()?;
let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
BTreeMap::new()
};
let _ = tx.send(recent_disputes.keys().cloned().collect());
},
DisputeCoordinatorMessage::ActiveDisputes(tx) => {
let recent_disputes =
overlay_db.load_recent_disputes()?.unwrap_or_default().into_iter();
let _ =
tx.send(get_active_with_status(recent_disputes, now).map(|(k, _)| k).collect());
// Return error if session information is missing.
self.ensure_available_session_info()?;
let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
BTreeMap::new()
};
let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.collect(),
);
},
DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
// Return error if session information is missing.
self.ensure_available_session_info()?;
let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
for (session_index, candidate_hash) in query {
if let Some(v) =
overlay_db.load_candidate_votes(session_index, &candidate_hash)?
{
......@@ -581,6 +709,9 @@ impl Initialized {
block_descriptions,
tx,
} => {
// Return error if session information is missing.
self.ensure_available_session_info()?;
let undisputed_chain = determine_undisputed_chain(
overlay_db,
base_number,
......@@ -595,6 +726,15 @@ impl Initialized {
Ok(Box::new(|| Ok(())))
}
// Helper function for checking subsystem errors in message processing.
fn ensure_available_session_info(&self) -> Result<()> {
if let Some(subsystem_error) = self.error.clone() {
return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error)))
}
Ok(())
}
async fn handle_import_statements(
&mut self,
ctx: &mut impl SubsystemContext,
......
......@@ -203,6 +203,10 @@ impl DisputeCoordinatorSubsystem {
},
};
// Before we move to the initialized state we need to check if we got at
// least on finality notification to prevent large ancestry block scraping,
// when the node is syncing.
let mut overlay_db = OverlayedBackend::new(&mut backend);
let (participations, spam_slots, ordering_provider) = match self
.handle_startup(
......
......@@ -184,19 +184,43 @@ impl OrderingProvider {
update: &ActiveLeavesUpdate,
) -> Result<()> {
if let Some(activated) = update.activated.as_ref() {
// Fetch ancestors of the activated leaf.
let ancestors = self
.get_block_ancestors(sender, activated.hash, activated.number)
.await
.unwrap_or_else(|err| {
// Fetch last finalized block.
let ancestors = match get_finalized_block_number(sender).await {
Ok(block_number) => {
// Fetch ancestry up to last finalized block.
Self::get_block_ancestors(
sender,
activated.hash,
activated.number,
block_number,
&mut self.last_observed_blocks,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?activated,
error = ?err,
"Skipping leaf ancestors due to an error",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
})
},
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?activated,
"Skipping leaf ancestors due to an error: {}",
err
error = ?err,
"Failed to retrieve last finalized block number",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
});
},
};
// Ancestors block numbers are consecutive in the descending order.
let earliest_block_number = activated.number - ancestors.len() as u32;
let block_numbers = (earliest_block_number..=activated.number).rev();
......@@ -242,23 +266,22 @@ impl OrderingProvider {
}
/// Returns ancestors of `head` in the descending order, stopping
/// either at the block present in cache or the latest finalized block.
/// either at the block present in cache or at `target_ancestor`.
///
/// Suited specifically for querying non-finalized chains, thus
/// doesn't rely on block numbers.
///
/// Both `head` and last are **not** included in the result.
async fn get_block_ancestors<Sender: SubsystemSender>(
&mut self,
pub async fn get_block_ancestors<Sender: SubsystemSender>(
sender: &mut Sender,
mut head: Hash,
mut head_number: BlockNumber,
target_ancestor: BlockNumber,
lookup_cache: &mut LruCache<Hash, ()>,
) -> Result<Vec<Hash>> {
let mut ancestors = Vec::new();
let finalized_block_number = get_finalized_block_number(sender).await?;
if self.last_observed_blocks.get(&head).is_some() || head_number <= finalized_block_number {
if lookup_cache.get(&head).is_some() || head_number <= target_ancestor {
return Ok(ancestors)
}
......@@ -297,10 +320,10 @@ impl OrderingProvider {
let block_numbers = (earliest_block_number..head_number).rev();
for (block_number, hash) in block_numbers.zip(&hashes) {
// Return if we either met finalized/cached block or
// Return if we either met target/cached block or
// hit the size limit for the returned ancestry of head.
if self.last_observed_blocks.get(hash).is_some() ||
block_number <= finalized_block_number ||
if lookup_cache.get(hash).is_some() ||
block_number <= target_ancestor ||
ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT
{
return Ok(ancestors)
......@@ -345,7 +368,9 @@ async fn get_block_number(
send_message_fatal(sender, ChainApiMessage::BlockNumber(relay_parent, tx), rx).await
}
async fn get_finalized_block_number(sender: &mut impl SubsystemSender) -> FatalResult<BlockNumber> {
pub async fn get_finalized_block_number(
sender: &mut impl SubsystemSender,
) -> FatalResult<BlockNumber> {
let (number_tx, number_rx) = oneshot::channel();
send_message_fatal(sender, ChainApiMessage::FinalizedBlockNumber(number_tx), number_rx).await
}
......@@ -54,7 +54,7 @@ use std::sync::Arc;
///
/// This is a safety net that should be removed at some point in the future.
// Until it's not, make sure to also update `MAX_HEADS_LOOK_BACK` in `approval-voting`
// when changing its value.
// and `MAX_BATCH_SCRAPE_ANCESTORS` in `dispute-coordinator` when changing its value.
const MAX_FINALITY_LAG: polkadot_primitives::v1::BlockNumber = 500;
const LOG_TARGET: &str = "parachain::chain-selection";
......@@ -522,15 +522,32 @@ where
std::any::type_name::<Self>(),
)
.await;
let (subchain_number, subchain_head) = rx
.await
.map_err(Error::DetermineUndisputedChainCanceled)
.map_err(|e| ConsensusError::Other(Box::new(e)))?;
// The the total lag accounting for disputes.
let lag_disputes = initial_leaf_number.saturating_sub(subchain_number);
self.metrics.note_disputes_finality_lag(lag_disputes);
(lag_disputes, subchain_head)
// Try to fetch response from `dispute-coordinator`. If an error occurs we just log it
// and return `target_hash` as maximal vote. It is safer to contain this error here
// and not push it up the stack to cause additional issues in GRANDPA/BABE.
let (lag, subchain_head) =
match rx.await.map_err(Error::DetermineUndisputedChainCanceled) {
// If request succeded we will receive (block number, block hash).
Ok((subchain_number, subchain_head)) => {
// The the total lag accounting for disputes.
let lag_disputes = initial_leaf_number.saturating_sub(subchain_number);
self.metrics.note_disputes_finality_lag(lag_disputes);
(lag_disputes, subchain_head)
},
Err(e) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Call to `DetermineUndisputedChain` failed",
);
// We need to return a sane finality target. But, we are unable to ensure we are not
// finalizing something that is being disputed or has been concluded as invalid. We will be
// conservative here and not vote for finality above the ancestor passed in.
return Ok(target_hash)
},
};
(lag, subchain_head)
} else {
(lag, subchain_head)
};
......
......@@ -34,8 +34,8 @@ use polkadot_node_subsystem::{
use thiserror::Error;
/// Sessions unavailable in state to cache.
#[derive(Debug)]
pub enum SessionsUnavailableKind {
#[derive(Debug, Clone)]
pub enum SessionsUnavailableReason {
/// Runtime API subsystem was unavailable.
RuntimeApiUnavailable(oneshot::Canceled),
/// The runtime API itself returned an error.
......@@ -45,7 +45,7 @@ pub enum SessionsUnavailableKind {
}
/// Information about the sessions being fetched.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SessionsUnavailableInfo {
/// The desired window start.
pub window_start: SessionIndex,
......@@ -56,10 +56,10 @@ pub struct SessionsUnavailableInfo {
}
/// Sessions were unavailable to fetch from the state for some reason.
#[derive(Debug, Error)]
#[derive(Debug, Error, Clone)]
pub struct SessionsUnavailable {
/// The error kind.
kind: SessionsUnavailableKind,
kind: SessionsUnavailableReason,
/// The info about the session window, if any.
info: Option<SessionsUnavailableInfo>,
}
......@@ -229,12 +229,12 @@ async fn get_session_index_for_head(
Ok(Ok(s)) => Ok(s),
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableKind::RuntimeApi(e),
kind: SessionsUnavailableReason::RuntimeApi(e),
info: None,
}),
Err(e) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableKind::RuntimeApiUnavailable(e),
kind: SessionsUnavailableReason::RuntimeApiUnavailable(e),
info: None,
}),
}
......@@ -245,7 +245,7 @@ async fn load_all_sessions(
block_hash: Hash,
start: SessionIndex,
end_inclusive: SessionIndex,
) -> Result<Vec<SessionInfo>, SessionsUnavailableKind> {
) -> Result<Vec<SessionInfo>, SessionsUnavailableReason> {
let mut v = Vec::new();
for i in start..=end_inclusive {
let (tx, rx) = oneshot::channel();
......@@ -257,9 +257,9 @@ async fn load_all_sessions(
let session_info = match rx.await {
Ok(Ok(Some(s))) => s,
Ok(Ok(None)) => return Err(SessionsUnavailableKind::Missing(i)),
Ok(Err(e)) => return Err(SessionsUnavailableKind::RuntimeApi(e)),
Err(canceled) => return Err(SessionsUnavailableKind::RuntimeApiUnavailable(canceled)),
Ok(Ok(None)) => return Err(SessionsUnavailableReason::Missing(i)),
Ok(Err(e)) => return Err(SessionsUnavailableReason::RuntimeApi(e)),
Err(canceled) => return Err(SessionsUnavailableReason::RuntimeApiUnavailable(canceled)),
};
v.push(session_info);
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment