diff --git a/polkadot/node/core/dispute-coordinator/src/initialized.rs b/polkadot/node/core/dispute-coordinator/src/initialized.rs index cd94770da8886a27140a1b8b1cb84e8d7b31d36d..c0eb029f4d0f534862ca35a45c390cecf1ede94b 100644 --- a/polkadot/node/core/dispute-coordinator/src/initialized.rs +++ b/polkadot/node/core/dispute-coordinator/src/initialized.rs @@ -16,7 +16,10 @@ //! Dispute coordinator subsystem in initialized state (after first active leaf is received). -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::{BTreeMap, VecDeque}, + sync::Arc, +}; use futures::{ channel::{mpsc, oneshot}, @@ -65,6 +68,12 @@ use super::{ OverlayedBackend, }; +/// How many blocks we import votes from per leaf update. +/// +/// Since vote import is relatively slow, we have to limit the maximum amount of work we do on leaf +/// updates (and especially on startup) so the dispute coordinator won't be considered stalling. +const CHAIN_IMPORT_MAX_BATCH_SIZE: usize = 8; + // Initial data for `dispute-coordinator`. It is provided only at first start. pub struct InitialData { pub participations: Vec<(ParticipationPriority, ParticipationRequest)>, @@ -89,6 +98,17 @@ pub(crate) struct Initialized { participation: Participation, scraper: ChainScraper, participation_receiver: WorkerMessageReceiver, + /// Backlog of still to be imported votes from chain. + /// + /// For some reason importing votes is relatively slow, if there is a large finality lag (~50 + /// blocks) we will be too slow importing all votes from unfinalized chains on startup + /// (dispute-coordinator gets killed because of unresponsiveness). + /// + /// https://github.com/paritytech/polkadot/issues/6912 + /// + /// To resolve this, we limit the amount of votes imported at once to + /// `CHAIN_IMPORT_MAX_BATCH_SIZE` and put the rest here for later processing. + chain_import_backlog: VecDeque<ScrapedOnChainVotes>, metrics: Metrics, } @@ -117,6 +137,7 @@ impl Initialized { scraper, participation, participation_receiver, + chain_import_backlog: VecDeque::new(), metrics, } } @@ -168,24 +189,16 @@ impl Initialized { } let mut overlay_db = OverlayedBackend::new(backend); - for votes in on_chain_votes { - let _ = self - .process_on_chain_votes( - ctx, - &mut overlay_db, - votes, - clock.now(), - first_leaf.hash, - ) - .await - .map_err(|error| { - gum::warn!( - target: LOG_TARGET, - ?error, - "Skipping scraping block due to error", - ); - }); - } + + self.process_chain_import_backlog( + ctx, + &mut overlay_db, + on_chain_votes, + clock.now(), + first_leaf.hash, + ) + .await; + if !overlay_db.is_empty() { let ops = overlay_db.into_write_ops(); backend.write(ops)?; @@ -344,26 +357,49 @@ impl Initialized { scraped_updates.on_chain_votes.len() ); - // The `runtime-api` subsystem has an internal queue which serializes the execution, - // so there is no point in running these in parallel - for votes in scraped_updates.on_chain_votes { - let _ = self - .process_on_chain_votes(ctx, overlay_db, votes, now, new_leaf.hash) - .await - .map_err(|error| { - gum::warn!( - target: LOG_TARGET, - ?error, - "Skipping scraping block due to error", - ); - }); - } + self.process_chain_import_backlog( + ctx, + overlay_db, + scraped_updates.on_chain_votes, + now, + new_leaf.hash, + ) + .await; } gum::trace!(target: LOG_TARGET, timestamp = now, "Done processing ActiveLeavesUpdate"); Ok(()) } + /// Process one batch of our `chain_import_backlog`. + /// + /// `new_votes` will be appended beforehand. + async fn process_chain_import_backlog<Context>( + &mut self, + ctx: &mut Context, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, + new_votes: Vec<ScrapedOnChainVotes>, + now: u64, + block_hash: Hash, + ) { + let mut chain_import_backlog = std::mem::take(&mut self.chain_import_backlog); + chain_import_backlog.extend(new_votes); + let import_range = + 0..std::cmp::min(CHAIN_IMPORT_MAX_BATCH_SIZE, chain_import_backlog.len()); + // The `runtime-api` subsystem has an internal queue which serializes the execution, + // so there is no point in running these in parallel + for votes in chain_import_backlog.drain(import_range) { + let res = self.process_on_chain_votes(ctx, overlay_db, votes, now, block_hash).await; + match res { + Ok(()) => {}, + Err(error) => { + gum::warn!(target: LOG_TARGET, ?error, "Skipping scraping block due to error",); + }, + }; + } + self.chain_import_backlog = chain_import_backlog; + } + /// Scrapes on-chain votes (backing votes and concluded disputes) for a active leaf of the /// relay chain. async fn process_on_chain_votes<Context>(