diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index d145a10a0843fdfb5eb6cc6ecd28143f53b70507..94fcffe990ed8325956f9d4ee74e5715ef5e182c 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6958,6 +6958,7 @@ name = "polkadot-node-core-provisioner" version = "0.9.19" dependencies = [ "bitvec", + "fatality", "futures 0.3.21", "futures-timer", "polkadot-node-primitives", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 836f6c70a9b1c2537e761ca6c25acc97c716c60a..ebda68090af6df435ba6d7452a13cceb2dd18ab6 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -203,6 +203,7 @@ try-runtime = [ "polkadot-cli/try-runtime" ] fast-runtime = [ "polkadot-cli/fast-runtime" ] runtime-metrics = [ "polkadot-cli/runtime-metrics" ] pyroscope = ["polkadot-cli/pyroscope"] +staging-client = ["polkadot-cli/staging-client"] # Configuration for building a .deb package - for use with `cargo-deb` [package.metadata.deb] diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index 16f72886c5d921c88473e004643fe7a095f93e55..7271b2f23bff1ca41ea92999578c4dbe458ead5f 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -73,3 +73,4 @@ rococo-native = ["service/rococo-native"] malus = ["full-node", "service/malus"] runtime-metrics = ["service/runtime-metrics", "polkadot-node-metrics/runtime-metrics"] +staging-client = ["service/staging-client"] diff --git a/polkadot/node/core/provisioner/Cargo.toml b/polkadot/node/core/provisioner/Cargo.toml index dc4ee7e295675e6a18ee6b587f9d70d8ecc4b029..17aaf7da96653f0bca8bcd9265e09bf5cc285f20 100644 --- a/polkadot/node/core/provisioner/Cargo.toml +++ b/polkadot/node/core/provisioner/Cargo.toml @@ -15,9 +15,13 @@ polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } futures-timer = "3.0.2" rand = "0.8.5" +fatality = "0.0.6" [dev-dependencies] sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" } + +[features] +staging-client = [] diff --git a/polkadot/node/core/provisioner/src/error.rs b/polkadot/node/core/provisioner/src/error.rs new file mode 100644 index 0000000000000000000000000000000000000000..7f5807c7c7a34bd8ae92fd6dd244fe68257a6f59 --- /dev/null +++ b/polkadot/node/core/provisioner/src/error.rs @@ -0,0 +1,83 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +///! Error types for provisioner module +use fatality; +use futures::channel::{mpsc, oneshot}; +use polkadot_node_subsystem::errors::{ChainApiError, RuntimeApiError}; +use polkadot_node_subsystem_util as util; +use polkadot_primitives::v2::Hash; +use thiserror::Error; + +/// Errors in the provisioner. +#[derive(Debug, Error)] +#[allow(missing_docs)] +pub enum Error { + #[error(transparent)] + Util(#[from] util::Error), + + #[error("failed to get availability cores")] + CanceledAvailabilityCores(#[source] oneshot::Canceled), + + #[error("failed to get persisted validation data")] + CanceledPersistedValidationData(#[source] oneshot::Canceled), + + #[error("failed to get block number")] + CanceledBlockNumber(#[source] oneshot::Canceled), + + #[error("failed to get backed candidates")] + CanceledBackedCandidates(#[source] oneshot::Canceled), + + #[error("failed to get votes on dispute")] + CanceledCandidateVotes(#[source] oneshot::Canceled), + + #[error(transparent)] + ChainApi(#[from] ChainApiError), + + #[error(transparent)] + Runtime(#[from] RuntimeApiError), + + #[error("failed to send message to ChainAPI")] + ChainApiMessageSend(#[source] mpsc::SendError), + + #[error("failed to send message to CandidateBacking to get backed candidates")] + GetBackedCandidatesSend(#[source] mpsc::SendError), + + #[error("failed to send return message with Inherents")] + InherentDataReturnChannel, + + #[error( + "backed candidate does not correspond to selected candidate; check logic in provisioner" + )] + BackedCandidateOrderingProblem, +} + +/// Used by `get_onchain_disputes` to represent errors related to fetching on-chain disputes from the Runtime +#[allow(dead_code)] // Remove when promoting to stable +#[fatality::fatality] +pub enum GetOnchainDisputesError { + #[fatal] + #[error("runtime subsystem is down")] + Channel, + + #[error("runtime execution error occurred while fetching onchain disputes for parent {1}")] + Execution(#[source] RuntimeApiError, Hash), + + #[error( + "runtime doesn't support RuntimeApiRequest::Disputes/RuntimeApiRequest::StagingDisputes for parent {1}" + )] + NotSupported(#[source] RuntimeApiError, Hash), +} diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 35170bcfe4c762c656d5a55aecd782b9b75a1e70..99c0ed4fac9dd0624d49ec6637d4662a43bd31b2 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -27,7 +27,6 @@ use futures::{ use futures_timer::Delay; use polkadot_node_primitives::CandidateVotes; use polkadot_node_subsystem::{ - errors::{ChainApiError, RuntimeApiError}, jaeger, messages::{ CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData, @@ -36,23 +35,25 @@ use polkadot_node_subsystem::{ ActivatedLeaf, LeafStatus, PerLeafSpan, SubsystemSender, }; use polkadot_node_subsystem_util::{ - self as util, request_availability_cores, request_persisted_validation_data, JobSender, - JobSubsystem, JobTrait, + request_availability_cores, request_persisted_validation_data, JobSender, JobSubsystem, + JobTrait, }; use polkadot_primitives::v2::{ - BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeStatement, - DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption, SessionIndex, - SignedAvailabilityBitfield, ValidatorIndex, + BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState, + DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption, + SessionIndex, SignedAvailabilityBitfield, ValidatorIndex, }; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, pin::Pin, }; -use thiserror::Error; +mod error; mod metrics; +mod onchain_disputes; pub use self::metrics::*; +use error::Error; #[cfg(test)] mod tests; @@ -105,49 +106,6 @@ pub struct ProvisionerJob { awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>, } -/// Errors in the provisioner. -#[derive(Debug, Error)] -#[allow(missing_docs)] -pub enum Error { - #[error(transparent)] - Util(#[from] util::Error), - - #[error("failed to get availability cores")] - CanceledAvailabilityCores(#[source] oneshot::Canceled), - - #[error("failed to get persisted validation data")] - CanceledPersistedValidationData(#[source] oneshot::Canceled), - - #[error("failed to get block number")] - CanceledBlockNumber(#[source] oneshot::Canceled), - - #[error("failed to get backed candidates")] - CanceledBackedCandidates(#[source] oneshot::Canceled), - - #[error("failed to get votes on dispute")] - CanceledCandidateVotes(#[source] oneshot::Canceled), - - #[error(transparent)] - ChainApi(#[from] ChainApiError), - - #[error(transparent)] - Runtime(#[from] RuntimeApiError), - - #[error("failed to send message to ChainAPI")] - ChainApiMessageSend(#[source] mpsc::SendError), - - #[error("failed to send message to CandidateBacking to get backed candidates")] - GetBackedCandidatesSend(#[source] mpsc::SendError), - - #[error("failed to send return message with Inherents")] - InherentDataReturnChannel, - - #[error( - "backed candidate does not correspond to selected candidate; check logic in provisioner" - )] - BackedCandidateOrderingProblem, -} - /// Provisioner run arguments. #[derive(Debug, Clone, Copy)] pub struct ProvisionerConfig; @@ -325,7 +283,7 @@ async fn send_inherent_data( .await .map_err(|err| Error::CanceledAvailabilityCores(err))??; - let disputes = select_disputes(from_job, metrics).await?; + let disputes = select_disputes(from_job, metrics, leaf).await?; // Only include bitfields on fresh leaves. On chain reversions, we want to make sure that // there will be at least one block, which cannot get disputed, so the chain can make progress. @@ -700,11 +658,80 @@ fn extend_by_random_subset_without_repetition( acc.sort_unstable_by(|a, b| a.0.cmp(&b.0)); } +/// The maximum number of disputes Provisioner will include in the inherent data. +/// Serves as a protection not to flood the Runtime with excessive data. +const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 1_000; + async fn select_disputes( sender: &mut impl SubsystemSender, metrics: &metrics::Metrics, + _leaf: &ActivatedLeaf, ) -> Result<MultiDisputeStatementSet, Error> { - const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 1_000; + // Helper lambda + // Gets the active disputes as input and partitions it in seen and unseen disputes by the Runtime + // Returns as much unseen disputes as possible and optionally some seen disputes up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit. + let generate_unseen_active_subset = + |active: Vec<(SessionIndex, CandidateHash)>, + onchain: HashMap<(SessionIndex, CandidateHash), DisputeState>| + -> Vec<(SessionIndex, CandidateHash)> { + let (seen_onchain, mut unseen_onchain): ( + Vec<(SessionIndex, CandidateHash)>, + Vec<(SessionIndex, CandidateHash)>, + ) = active.into_iter().partition(|d| onchain.contains_key(d)); + + if unseen_onchain.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { + // Even unseen on-chain don't fit within the limit. Add as many as possible. + let mut unseen_subset = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME); + extend_by_random_subset_without_repetition( + &mut unseen_subset, + unseen_onchain, + MAX_DISPUTES_FORWARDED_TO_RUNTIME, + ); + unseen_subset + } else { + // Add all unseen onchain disputes and as much of the seen ones as there is space. + let n_unseen_onchain = unseen_onchain.len(); + extend_by_random_subset_without_repetition( + &mut unseen_onchain, + seen_onchain, + MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_unseen_onchain), + ); + unseen_onchain + } + }; + + // Helper lambda + // Extends the active disputes with recent ones up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit. Unseen recent disputes are prioritised. + let generate_active_and_unseen_recent_subset = + |recent: Vec<(SessionIndex, CandidateHash)>, + mut active: Vec<(SessionIndex, CandidateHash)>, + onchain: HashMap<(SessionIndex, CandidateHash), DisputeState>| + -> Vec<(SessionIndex, CandidateHash)> { + let mut n_active = active.len(); + // All active disputes can be sent. Fill the rest of the space with recent ones. + // We assume there is not enough space for all recent disputes. So we prioritise the unseen ones. + let (seen_onchain, unseen_onchain): ( + Vec<(SessionIndex, CandidateHash)>, + Vec<(SessionIndex, CandidateHash)>, + ) = recent.into_iter().partition(|d| onchain.contains_key(d)); + + extend_by_random_subset_without_repetition( + &mut active, + unseen_onchain, + MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active), + ); + n_active = active.len(); + + if n_active < MAX_DISPUTES_FORWARDED_TO_RUNTIME { + // Looks like we can add some of the seen disputes too + extend_by_random_subset_without_repetition( + &mut active, + seen_onchain, + MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active), + ); + } + active + }; // We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine. // It's heavier than `ActiveDisputes` but ensures that everything from the dispute @@ -713,6 +740,22 @@ async fn select_disputes( // upper bound of disputes to pass to wasm `fn create_inherent_data`. // If the active ones are already exceeding the bounds, randomly select a subset. let recent = request_disputes(sender, RequestType::Recent).await; + + // On chain disputes are fetched from the runtime. We want to prioritise the inclusion of unknown + // disputes in the inherent data. The call relies on staging Runtime API. If the staging API is not + // enabled in the binary an empty set is generated which doesn't affect the rest of the logic. + let onchain = match onchain_disputes::get_onchain_disputes(sender, _leaf.hash.clone()).await { + Ok(r) => r, + Err(e) => { + gum::debug!( + target: LOG_TARGET, + ?e, + "Can't fetch onchain disputes. Will continue with empty onchain disputes set.", + ); + HashMap::new() + }, + }; + let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { gum::warn!( target: LOG_TARGET, @@ -720,25 +763,12 @@ async fn select_disputes( recent.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME ); - let mut active = request_disputes(sender, RequestType::Active).await; - let n_active = active.len(); - let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { - let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME); - extend_by_random_subset_without_repetition( - &mut picked, - active, - MAX_DISPUTES_FORWARDED_TO_RUNTIME, - ); - picked + let active = request_disputes(sender, RequestType::Active).await; + if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { + generate_unseen_active_subset(active, onchain) } else { - extend_by_random_subset_without_repetition( - &mut active, - recent, - MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active), - ); - active - }; - active + generate_active_and_unseen_recent_subset(recent, active, onchain) + } } else { recent }; diff --git a/polkadot/node/core/provisioner/src/metrics.rs b/polkadot/node/core/provisioner/src/metrics.rs index e082f41dbb61a5485e34ccdfceca2995d814d800..bda0a560979de946d61c20a53f98ba6ee732f022 100644 --- a/polkadot/node/core/provisioner/src/metrics.rs +++ b/polkadot/node/core/provisioner/src/metrics.rs @@ -34,6 +34,12 @@ struct MetricsInner { pub struct Metrics(Option<MetricsInner>); impl Metrics { + /// Creates new dummy `Metrics` instance. Used for testing only. + #[cfg(test)] + pub fn new_dummy() -> Metrics { + Metrics(None) + } + pub(crate) fn on_inherent_data_request(&self, response: Result<(), ()>) { if let Some(metrics) = &self.0 { match response { diff --git a/polkadot/node/core/provisioner/src/onchain_disputes.rs b/polkadot/node/core/provisioner/src/onchain_disputes.rs new file mode 100644 index 0000000000000000000000000000000000000000..e5b736b600b3b93f705049f838513c806e66cf70 --- /dev/null +++ b/polkadot/node/core/provisioner/src/onchain_disputes.rs @@ -0,0 +1,74 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +use crate::error::GetOnchainDisputesError; +use polkadot_node_subsystem::SubsystemSender; +use polkadot_primitives::v2::{CandidateHash, DisputeState, Hash, SessionIndex}; +use std::collections::HashMap; + +pub async fn get_onchain_disputes( + _sender: &mut impl SubsystemSender, + _relay_parent: Hash, +) -> Result<HashMap<(SessionIndex, CandidateHash), DisputeState>, GetOnchainDisputesError> { + let _onchain = Result::< + HashMap<(SessionIndex, CandidateHash), DisputeState>, + GetOnchainDisputesError, + >::Ok(HashMap::new()); + #[cfg(feature = "staging-client")] + let _onchain = self::staging_impl::get_onchain_disputes(_sender, _relay_parent).await; + + _onchain +} + +// Merge this module with the outer (current one) when promoting to stable +#[cfg(feature = "staging-client")] +mod staging_impl { + use super::*; // remove this when promoting to stable + use crate::LOG_TARGET; + use futures::channel::oneshot; + use polkadot_node_subsystem::{ + errors::RuntimeApiError, + messages::{RuntimeApiMessage, RuntimeApiRequest}, + SubsystemSender, + }; + + /// Gets the on-chain disputes at a given block number and returns them as a `HashSet` so that searching in them is cheap. + pub async fn get_onchain_disputes( + sender: &mut impl SubsystemSender, + relay_parent: Hash, + ) -> Result<HashMap<(SessionIndex, CandidateHash), DisputeState>, GetOnchainDisputesError> { + gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching on-chain disputes"); + let (tx, rx) = oneshot::channel(); + sender + .send_message( + RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::StagingDisputes(tx)) + .into(), + ) + .await; + + rx.await + .map_err(|_| GetOnchainDisputesError::Channel) + .and_then(|res| { + res.map_err(|e| match e { + RuntimeApiError::Execution { .. } => + GetOnchainDisputesError::Execution(e, relay_parent), + RuntimeApiError::NotSupported { .. } => + GetOnchainDisputesError::NotSupported(e, relay_parent), + }) + }) + .map(|v| v.into_iter().map(|e| ((e.0, e.1), e.2)).collect()) + } +} diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs index 2cbfa97e1785c33bf2e9fb01a4c68fb09791079d..f87fbb8ce16adc19d01f49b81a3d43d0fceb0137 100644 --- a/polkadot/node/core/provisioner/src/tests.rs +++ b/polkadot/node/core/provisioner/src/tests.rs @@ -195,23 +195,12 @@ mod select_availability_bitfields { } } -mod select_candidates { - use super::{super::*, build_occupied_core, default_bitvec, occupied_core, scheduled_core}; - use ::test_helpers::{dummy_candidate_descriptor, dummy_hash}; - use polkadot_node_subsystem::messages::{ - AllMessages, RuntimeApiMessage, - RuntimeApiRequest::{ - AvailabilityCores, PersistedValidationData as PersistedValidationDataReq, - }, - }; +mod common { + use super::super::*; + use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem_test_helpers::TestSubsystemSender; - use polkadot_primitives::v2::{ - BlockNumber, CandidateCommitments, CommittedCandidateReceipt, PersistedValidationData, - }; - const BLOCK_UNDER_PRODUCTION: BlockNumber = 128; - - fn test_harness<OverseerFactory, Overseer, TestFactory, Test>( + pub fn test_harness<OverseerFactory, Overseer, TestFactory, Test>( overseer_factory: OverseerFactory, test_factory: TestFactory, ) where @@ -228,6 +217,26 @@ mod select_candidates { let _ = futures::executor::block_on(future::join(overseer, test)); } +} + +mod select_candidates { + use super::{ + super::*, build_occupied_core, common::test_harness, default_bitvec, occupied_core, + scheduled_core, + }; + use ::test_helpers::{dummy_candidate_descriptor, dummy_hash}; + use polkadot_node_subsystem::messages::{ + AllMessages, RuntimeApiMessage, + RuntimeApiRequest::{ + AvailabilityCores, PersistedValidationData as PersistedValidationDataReq, + }, + }; + use polkadot_node_subsystem_test_helpers::TestSubsystemSender; + use polkadot_primitives::v2::{ + BlockNumber, CandidateCommitments, CommittedCandidateReceipt, PersistedValidationData, + }; + + const BLOCK_UNDER_PRODUCTION: BlockNumber = 128; // For test purposes, we always return this set of availability cores: // @@ -486,3 +495,403 @@ mod select_candidates { ) } } + +mod select_disputes { + + use super::{super::*, common::test_harness}; + use polkadot_node_subsystem::{ + messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest}, + RuntimeApiError, + }; + use polkadot_node_subsystem_test_helpers::TestSubsystemSender; + use polkadot_primitives::v2::DisputeState; + use std::sync::Arc; + use test_helpers; + + // Global Test Data + fn recent_disputes(len: usize) -> Vec<(SessionIndex, CandidateHash)> { + let mut res = Vec::with_capacity(len); + for _ in 0..len { + res.push((0, CandidateHash(Hash::random()))); + } + + res + } + + // same as recent_disputes() but with SessionIndex set to 1 + fn active_disputes(len: usize) -> Vec<(SessionIndex, CandidateHash)> { + let mut res = Vec::with_capacity(len); + for _ in 0..len { + res.push((1, CandidateHash(Hash::random()))); + } + + res + } + + fn leaf() -> ActivatedLeaf { + ActivatedLeaf { + hash: Hash::repeat_byte(0xAA), + number: 0xAA, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + } + } + + async fn mock_overseer( + leaf: ActivatedLeaf, + mut receiver: mpsc::UnboundedReceiver<AllMessages>, + onchain_disputes: Result<Vec<(SessionIndex, CandidateHash, DisputeState)>, RuntimeApiError>, + recent_disputes: Vec<(SessionIndex, CandidateHash)>, + active_disputes: Vec<(SessionIndex, CandidateHash)>, + ) { + while let Some(from_job) = receiver.next().await { + match from_job { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::StagingDisputes(sender), + )) => { + let _ = sender.send(onchain_disputes.clone()); + }, + AllMessages::RuntimeApi(_) => panic!("Unexpected RuntimeApi request"), + AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::RecentDisputes( + sender, + )) => { + let _ = sender.send(recent_disputes.clone()); + }, + AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes( + sender, + )) => { + let _ = sender.send(active_disputes.clone()); + }, + AllMessages::DisputeCoordinator( + DisputeCoordinatorMessage::QueryCandidateVotes(disputes, sender), + ) => { + let mut res = Vec::new(); + let v = CandidateVotes { + candidate_receipt: test_helpers::dummy_candidate_receipt(leaf.hash.clone()), + valid: vec![], + invalid: vec![], + }; + for r in disputes.iter() { + res.push((r.0, r.1, v.clone())); + } + + let _ = sender.send(res); + }, + _ => panic!("Unexpected message: {:?}", from_job), + } + } + } + + #[test] + fn recent_disputes_are_withing_onchain_limit() { + const RECENT_DISPUTES_SIZE: usize = 10; + let metrics = metrics::Metrics::new_dummy(); + let onchain_disputes = Ok(Vec::new()); + let active_disputes = Vec::new(); + let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); + + let recent_disputes_overseer = recent_disputes.clone(); + test_harness( + |r| { + mock_overseer( + leaf(), + r, + onchain_disputes, + recent_disputes_overseer, + active_disputes, + ) + }, + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); + + assert!(!disputes.is_empty()); + + let result = disputes.iter().zip(recent_disputes.iter()); + // We should get all recent disputes. + for (d, r) in result { + assert_eq!(d.session, r.0); + assert_eq!(d.candidate_hash, r.1); + } + }, + ) + } + + #[test] + fn recent_disputes_are_too_much_but_active_are_within_limit() { + const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; + const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME; + let metrics = metrics::Metrics::new_dummy(); + let onchain_disputes = Ok(Vec::new()); + let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); + let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); + + let active_disputes_overseer = active_disputes.clone(); + test_harness( + |r| { + mock_overseer( + leaf(), + r, + onchain_disputes, + recent_disputes, + active_disputes_overseer, + ) + }, + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); + + assert!(!disputes.is_empty()); + + let result = disputes.iter().zip(active_disputes.iter()); + // We should get all active disputes. + for (d, r) in result { + assert_eq!(d.session, r.0); + assert_eq!(d.candidate_hash, r.1); + } + }, + ) + } + + #[test] + fn recent_disputes_are_too_much_but_active_are_less_than_the_limit() { + // In this case all active disputes + a random set of recent disputes should be returned + const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; + const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME - 10; + let metrics = metrics::Metrics::new_dummy(); + let onchain_disputes = Ok(Vec::new()); + let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); + let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); + + let active_disputes_overseer = active_disputes.clone(); + test_harness( + |r| { + mock_overseer( + leaf(), + r, + onchain_disputes, + recent_disputes, + active_disputes_overseer, + ) + }, + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); + + assert!(!disputes.is_empty()); + + // Recent disputes are generated with `SessionIndex` = 0 + let (res_recent, res_active): (Vec<DisputeStatementSet>, Vec<DisputeStatementSet>) = + disputes.into_iter().partition(|d| d.session == 0); + + // It should be good enough the count the number of active disputes and not compare them one by one. Checking the exact values is already covered by the previous tests. + assert_eq!(res_active.len(), active_disputes.len()); // We have got all active disputes + assert_eq!(res_active.len() + res_recent.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME); + // And some recent ones. + }, + ) + } + + //These tests rely on staging Runtime functions so they are separated and compiled conditionally. + #[cfg(feature = "staging-client")] + mod staging_tests { + use super::*; + + fn dummy_dispute_state() -> DisputeState { + DisputeState { + validators_for: BitVec::new(), + validators_against: BitVec::new(), + start: 0, + concluded_at: None, + } + } + + #[test] + fn recent_disputes_are_too_much_active_fits_test_recent_prioritisation() { + // In this case recent disputes are above `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit and the active ones are below it. + // The expected behaviour is to send all active disputes and extend the set with recent ones. During the extension the disputes unknown for the Runtime are added with priority. + const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; + const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME - 10; + const ONCHAIN_DISPUTE_SIZE: usize = RECENT_DISPUTES_SIZE - 9; + let metrics = metrics::Metrics::new_dummy(); + let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); + let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); + let onchain_disputes: Result< + Vec<(SessionIndex, CandidateHash, DisputeState)>, + RuntimeApiError, + > = Ok(Vec::from(&recent_disputes[0..ONCHAIN_DISPUTE_SIZE]) + .iter() + .map(|(session_index, candidate_hash)| { + (*session_index, candidate_hash.clone(), dummy_dispute_state()) + }) + .collect()); + let active_disputes_overseer = active_disputes.clone(); + let recent_disputes_overseer = recent_disputes.clone(); + test_harness( + |r| { + mock_overseer( + leaf(), + r, + onchain_disputes, + recent_disputes_overseer, + active_disputes_overseer, + ) + }, + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); + + assert!(!disputes.is_empty()); + + // Recent disputes are generated with `SessionIndex` = 0 + let (res_recent, res_active): ( + Vec<DisputeStatementSet>, + Vec<DisputeStatementSet>, + ) = disputes.into_iter().partition(|d| d.session == 0); + + // It should be good enough the count the number of the disputes and not compare them one by one as this was already covered in other tests. + assert_eq!(res_active.len(), active_disputes.len()); // We've got all active disputes. + assert_eq!( + res_recent.len(), + MAX_DISPUTES_FORWARDED_TO_RUNTIME - active_disputes.len() + ); // And some recent ones. + + // Check if the recent disputes were unknown for the Runtime. + let expected_recent_disputes = + Vec::from(&recent_disputes[ONCHAIN_DISPUTE_SIZE..]); + let res_recent_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter( + res_recent.iter().map(|d| (d.session, d.candidate_hash)), + ); + + // Explicitly check that all unseen disputes are sent to the Runtime. + for d in &expected_recent_disputes { + assert_eq!(res_recent_set.contains(d), true); + } + }, + ) + } + + #[test] + fn active_disputes_are_too_much_test_active_prioritisation() { + // In this case the active disputes are above the `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit so the unseen ones should be prioritised. + const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; + const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; + const ONCHAIN_DISPUTE_SIZE: usize = ACTIVE_DISPUTES_SIZE - 9; + + let metrics = metrics::Metrics::new_dummy(); + let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); + let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); + let onchain_disputes: Result< + Vec<(SessionIndex, CandidateHash, DisputeState)>, + RuntimeApiError, + > = Ok(Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE]) + .iter() + .map(|(session_index, candidate_hash)| { + (*session_index, candidate_hash.clone(), dummy_dispute_state()) + }) + .collect()); + let active_disputes_overseer = active_disputes.clone(); + let recent_disputes_overseer = recent_disputes.clone(); + test_harness( + |r| { + mock_overseer( + leaf(), + r, + onchain_disputes, + recent_disputes_overseer, + active_disputes_overseer, + ) + }, + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); + + assert!(!disputes.is_empty()); + + // Recent disputes are generated with `SessionIndex` = 0 + let (res_recent, res_active): ( + Vec<DisputeStatementSet>, + Vec<DisputeStatementSet>, + ) = disputes.into_iter().partition(|d| d.session == 0); + + // It should be good enough the count the number of the disputes and not compare them one by one + assert_eq!(res_recent.len(), 0); // We expect no recent disputes + assert_eq!(res_active.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME); + + let expected_active_disputes = + Vec::from(&active_disputes[ONCHAIN_DISPUTE_SIZE..]); + let res_active_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter( + res_active.iter().map(|d| (d.session, d.candidate_hash)), + ); + + // Explicitly check that the unseen disputes are delivered to the Runtime. + for d in &expected_active_disputes { + assert_eq!(res_active_set.contains(d), true); + } + }, + ) + } + + #[test] + fn active_disputes_are_too_much_and_are_all_unseen() { + // In this case there are a lot of active disputes unseen by the Runtime. The focus of the test is to verify that in such cases known disputes are NOT sent to the Runtime. + const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; + const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 5; + const ONCHAIN_DISPUTE_SIZE: usize = 5; + + let metrics = metrics::Metrics::new_dummy(); + let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); + let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); + let onchain_disputes: Result< + Vec<(SessionIndex, CandidateHash, DisputeState)>, + RuntimeApiError, + > = Ok(Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE]) + .iter() + .map(|(session_index, candidate_hash)| { + (*session_index, candidate_hash.clone(), dummy_dispute_state()) + }) + .collect()); + let active_disputes_overseer = active_disputes.clone(); + let recent_disputes_overseer = recent_disputes.clone(); + test_harness( + |r| { + mock_overseer( + leaf(), + r, + onchain_disputes, + recent_disputes_overseer, + active_disputes_overseer, + ) + }, + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); + assert!(!disputes.is_empty()); + + // Recent disputes are generated with `SessionIndex` = 0 + let (res_recent, res_active): ( + Vec<DisputeStatementSet>, + Vec<DisputeStatementSet>, + ) = disputes.into_iter().partition(|d| d.session == 0); + + // It should be good enough the count the number of the disputes and not compare them one by one + assert_eq!(res_recent.len(), 0); + assert_eq!(res_active.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME); + + // For sure we don't want to see any of this disputes in the result + let unexpected_active_disputes = + Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE]); + let res_active_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter( + res_active.iter().map(|d| (d.session, d.candidate_hash)), + ); + + // Verify that the result DOESN'T contain known disputes (because there is an excessive number of unknown onces). + for d in &unexpected_active_disputes { + assert_eq!(res_active_set.contains(d), false); + } + }, + ) + } + } +} diff --git a/polkadot/node/core/runtime-api/src/cache.rs b/polkadot/node/core/runtime-api/src/cache.rs index 3dab90e4c74e026fddf158b805bdeb9c8d19fd35..6f5fdc5d4657074679573d1f5ab838b2e3bb3c6f 100644 --- a/polkadot/node/core/runtime-api/src/cache.rs +++ b/polkadot/node/core/runtime-api/src/cache.rs @@ -21,8 +21,8 @@ use parity_util_mem::{MallocSizeOf, MallocSizeOfExt}; use sp_consensus_babe::Epoch; use polkadot_primitives::v2::{ - AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent, - CommittedCandidateReceipt, CoreState, GroupRotationInfo, Hash, Id as ParaId, + AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, + CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, @@ -47,6 +47,7 @@ const ON_CHAIN_VOTES_CACHE_SIZE: usize = 3 * 1024; const PVFS_REQUIRE_PRECHECK_SIZE: usize = 1024; const VALIDATION_CODE_HASH_CACHE_SIZE: usize = 64 * 1024; const VERSION_CACHE_SIZE: usize = 4 * 1024; +const DISPUTES_CACHE_SIZE: usize = 64 * 1024; struct ResidentSizeOf<T>(T); @@ -115,6 +116,10 @@ pub(crate) struct RequestResultCache { ResidentSizeOf<Option<ValidationCodeHash>>, >, version: MemoryLruCache<Hash, ResidentSizeOf<u32>>, + disputes: MemoryLruCache< + Hash, + ResidentSizeOf<Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>>, + >, } impl Default for RequestResultCache { @@ -142,6 +147,7 @@ impl Default for RequestResultCache { pvfs_require_precheck: MemoryLruCache::new(PVFS_REQUIRE_PRECHECK_SIZE), validation_code_hash: MemoryLruCache::new(VALIDATION_CODE_HASH_CACHE_SIZE), version: MemoryLruCache::new(VERSION_CACHE_SIZE), + disputes: MemoryLruCache::new(DISPUTES_CACHE_SIZE), } } } @@ -407,6 +413,21 @@ impl RequestResultCache { pub(crate) fn cache_version(&mut self, key: Hash, value: u32) { self.version.insert(key, ResidentSizeOf(value)); } + + pub(crate) fn disputes( + &mut self, + relay_parent: &Hash, + ) -> Option<&Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>> { + self.disputes.get(relay_parent).map(|v| &v.0) + } + + pub(crate) fn cache_disputes( + &mut self, + relay_parent: Hash, + value: Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>, + ) { + self.disputes.insert(relay_parent, ResidentSizeOf(value)); + } } pub(crate) enum RequestResult { @@ -442,4 +463,5 @@ pub(crate) enum RequestResult { SubmitPvfCheckStatement(Hash, PvfCheckStatement, ValidatorSignature, ()), ValidationCodeHash(Hash, ParaId, OccupiedCoreAssumption, Option<ValidationCodeHash>), Version(Hash, u32), + StagingDisputes(Hash, Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>), } diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 9f8377f0d71329c2d7e66d4be0bd84c0f2b532e2..d10483c800105f4bceac8287d6a71494731f32dc 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -169,6 +169,8 @@ where .cache_validation_code_hash((relay_parent, para_id, assumption), hash), Version(relay_parent, version) => self.requests_cache.cache_version(relay_parent, version), + StagingDisputes(relay_parent, disputes) => + self.requests_cache.cache_disputes(relay_parent, disputes), } } @@ -270,6 +272,8 @@ where Request::ValidationCodeHash(para, assumption, sender) => query!(validation_code_hash(para, assumption), sender) .map(|sender| Request::ValidationCodeHash(para, assumption, sender)), + Request::StagingDisputes(sender) => + query!(disputes(), sender).map(|sender| Request::StagingDisputes(sender)), } } @@ -526,5 +530,7 @@ where }, Request::ValidationCodeHash(para, assumption, sender) => query!(ValidationCodeHash, validation_code_hash(para, assumption), ver = 2, sender), + Request::StagingDisputes(sender) => + query!(StagingDisputes, staging_get_disputes(), ver = 2, sender), } } diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index f2c237e393e636881a7791dd3831f6411c097fe5..15b8521cd0e422a728ffcdbf92223a69840d1682 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -198,3 +198,5 @@ runtime-metrics = [ "polkadot-runtime/runtime-metrics", "polkadot-runtime-parachains/runtime-metrics" ] + +staging-client = ["polkadot-node-core-provisioner/staging-client"] \ No newline at end of file diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 9394416d32f37db5627ea9146ea6382b48a577cd..db74ab11cd4dc8b55b47370a9b9eba23302cc51b 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -40,12 +40,12 @@ use polkadot_node_primitives::{ }; use polkadot_primitives::v2::{ AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateEvent, CandidateHash, - CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState, GroupIndex, - GroupRotationInfo, Hash, Header as BlockHeader, Id as ParaId, InboundDownwardMessage, - InboundHrmpMessage, MultiDisputeStatementSet, OccupiedCoreAssumption, PersistedValidationData, - PvfCheckStatement, SessionIndex, SessionInfo, SignedAvailabilityBitfield, - SignedAvailabilityBitfields, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, - ValidatorSignature, + CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState, + DisputeState, GroupIndex, GroupRotationInfo, Hash, Header as BlockHeader, Id as ParaId, + InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet, OccupiedCoreAssumption, + PersistedValidationData, PvfCheckStatement, SessionIndex, SessionInfo, + SignedAvailabilityBitfield, SignedAvailabilityBitfields, ValidationCode, ValidationCodeHash, + ValidatorId, ValidatorIndex, ValidatorSignature, }; use polkadot_statement_table::v2::Misbehavior; use std::{ @@ -693,6 +693,10 @@ pub enum RuntimeApiRequest { OccupiedCoreAssumption, RuntimeApiSender<Option<ValidationCodeHash>>, ), + /// Returns all on-chain disputes at given block number. + StagingDisputes( + RuntimeApiSender<Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>>, + ), } /// A message to the Runtime API subsystem. diff --git a/polkadot/primitives/src/v2/mod.rs b/polkadot/primitives/src/v2/mod.rs index 57e19c68693e437a7d9fd4220fc1faef87fa1e2a..d462a87217be1f4dbdad67c1510982d089677ebe 100644 --- a/polkadot/primitives/src/v2/mod.rs +++ b/polkadot/primitives/src/v2/mod.rs @@ -1403,6 +1403,22 @@ pub struct DisputeState<N = BlockNumber> { pub concluded_at: Option<N>, } +#[cfg(feature = "std")] +impl MallocSizeOf for DisputeState { + fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize { + // destructuring to make sure no new fields are added to the struct without modifying this function + let Self { validators_for, validators_against, start, concluded_at } = self; + + // According to the documentation `.capacity()` might not return a byte aligned value, so just in case: + let align_eight = |d: usize| (d + 7) / 8; + + align_eight(validators_for.capacity()) + + align_eight(validators_against.capacity()) + + start.size_of(ops) + + concluded_at.size_of(ops) + } +} + /// Parachains inherent-data passed into the runtime by a block author #[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug, TypeInfo)] pub struct InherentData<HDR: HeaderT = Header> {