Unverified Commit 7aed7a2e authored by Bernhard Schuster's avatar Bernhard Schuster Committed by GitHub
Browse files

integrate dispute finality (#3484)



* finality_target adjustments

* fn finality_target

* partially address review comments

* fixins

* more rustic if condition

* fix tests

* fixins

* Update node/core/approval-voting/src/lib.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Update node/core/approval-voting/src/lib.rs

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>

* review comments part one

* rename candidates -> block_descriptions

* testing outline (incomplete, WIP)

* test foo

* split RelayChainSelection into RelayChainSelection{,WithFallback}, introduce HeaderProvider{,Provider}

* make some stuff public (revert this soon™)

* some test improvements

* slips of pens

* test fixins

* add another trait abstraction

* pending edge case tests + warnings fixes

* more test cases

* fin

* chore fmt

* fix cargo.lock

* Undo obsolete changes

* // comments

* make mod pub(crate)

* fix

* minimize static bounds

* resolve number() as before

* fmt

* post merge fix

* address some nits

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>
parent 2d197804
Pipeline #149054 passed with stages
in 39 minutes and 51 seconds
......@@ -1763,6 +1763,19 @@ dependencies = [
"termcolor",
]
[[package]]
name = "env_logger"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3"
dependencies = [
"atty",
"humantime 2.0.1",
"log",
"regex",
"termcolor",
]
[[package]]
name = "environmental"
version = "1.1.3"
......@@ -6946,16 +6959,18 @@ dependencies = [
name = "polkadot-service"
version = "0.9.8"
dependencies = [
"assert_matches",
"async-trait",
"beefy-gadget",
"beefy-primitives",
"env_logger 0.8.4",
"env_logger 0.9.0",
"frame-system-rpc-runtime-api",
"futures 0.3.15",
"hex-literal",
"kusama-runtime",
"kvdb",
"kvdb-rocksdb",
"log",
"pallet-babe",
"pallet-im-online",
"pallet-mmr-primitives",
......@@ -6985,6 +7000,7 @@ dependencies = [
"polkadot-node-core-runtime-api",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-parachain",
......
......@@ -27,7 +27,7 @@ use polkadot_node_subsystem::{
ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage,
ApprovalDistributionMessage, CandidateValidationMessage,
AvailabilityRecoveryMessage, ChainSelectionMessage, DisputeCoordinatorMessage,
ImportStatementsResult,
ImportStatementsResult, HighestApprovedAncestorBlock, BlockDescription,
},
errors::RecoveryError,
overseer::{self, SubsystemSender as _}, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
......@@ -1180,7 +1180,7 @@ async fn handle_approved_ancestor(
target: Hash,
lower_bound: BlockNumber,
wakeups: &Wakeups,
) -> SubsystemResult<Option<(Hash, BlockNumber)>> {
) -> SubsystemResult<Option<HighestApprovedAncestorBlock>> {
const MAX_TRACING_WINDOW: usize = 200;
const ABNORMAL_DEPTH_THRESHOLD: usize = 5;
......@@ -1228,6 +1228,8 @@ async fn handle_approved_ancestor(
Vec::new()
};
let mut block_descriptions = Vec::new();
let mut bits: BitVec<Lsb0, u8> = Default::default();
for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
// Block entries should be present as the assumption is that
......@@ -1259,8 +1261,10 @@ async fn handle_approved_ancestor(
}
} else if bits.len() <= ABNORMAL_DEPTH_THRESHOLD {
all_approved_max = None;
block_descriptions.clear();
} else {
all_approved_max = None;
block_descriptions.clear();
let unapproved: Vec<_> = entry.unapproved_candidates().collect();
tracing::debug!(
......@@ -1338,6 +1342,11 @@ async fn handle_approved_ancestor(
}
}
}
block_descriptions.push(BlockDescription {
block_hash,
session: entry.session(),
candidates: entry.candidates().iter().map(|(_idx, candidate_hash)| *candidate_hash ).collect(),
});
}
tracing::trace!(
......@@ -1366,8 +1375,19 @@ async fn handle_approved_ancestor(
},
);
// `reverse()` to obtain the ascending order from lowest to highest
// block within the candidates, which is the expected order
block_descriptions.reverse();
let all_approved_max = all_approved_max.map(|(hash, block_number)| {
HighestApprovedAncestorBlock{
hash,
number: block_number,
descriptions: block_descriptions,
}
});
match all_approved_max {
Some((ref hash, ref number)) => {
Some(HighestApprovedAncestorBlock { ref hash, ref number, .. }) => {
span.add_uint_tag("approved-number", *number as u64);
span.add_string_fmt_debug_tag("approved-hash", hash);
}
......
......@@ -26,7 +26,7 @@ use polkadot_node_primitives::approval::{
RELAY_VRF_MODULO_CONTEXT, DelayTranche,
};
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem::messages::{AllMessages, HighestApprovedAncestorBlock};
use sp_core::testing::TaskExecutor;
use parking_lot::Mutex;
......@@ -1612,10 +1612,12 @@ fn approved_ancestor_all_approved() {
let test_fut = Box::pin(async move {
let overlay_db = OverlayedBackend::new(&db);
assert_eq!(
assert_matches!(
handle_approved_ancestor(&mut ctx, &overlay_db, block_hash_4, 0, &Default::default())
.await.unwrap(),
Some((block_hash_4, 4)),
.await,
Ok(Some(HighestApprovedAncestorBlock { hash, number, .. } )) => {
assert_eq!((block_hash_4, 4), (hash, number));
}
)
});
......@@ -1686,10 +1688,12 @@ fn approved_ancestor_missing_approval() {
let test_fut = Box::pin(async move {
let overlay_db = OverlayedBackend::new(&db);
assert_eq!(
assert_matches!(
handle_approved_ancestor(&mut ctx, &overlay_db, block_hash_4, 0, &Default::default())
.await.unwrap(),
Some((block_hash_2, 2)),
.await,
Ok(Some(HighestApprovedAncestorBlock { hash, number, .. })) => {
assert_eq!((block_hash_2, 2), (hash, number));
}
)
});
......
......@@ -35,7 +35,7 @@ use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{
ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult,
DisputeParticipationMessage, ImportStatementsResult, BlockDescription,
}
};
use polkadot_node_subsystem_util::rolling_session_window::{
......@@ -961,14 +961,16 @@ fn make_dispute_message(
).map_err(MakeDisputeMessageError::InvalidStatementCombination)
}
/// Determine the the best block and its block number.
/// Assumes `block_descriptions` are sorted from the one
/// with the lowest `BlockNumber` to the highest.
fn determine_undisputed_chain(
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
base_number: BlockNumber,
block_descriptions: Vec<(Hash, SessionIndex, Vec<CandidateHash>)>,
block_descriptions: Vec<BlockDescription>,
) -> Result<Option<(BlockNumber, Hash)>, Error> {
let last = block_descriptions.last()
.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.0));
.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.block_hash));
// Fast path for no disputes.
let recent_disputes = match overlay_db.load_recent_disputes()? {
......@@ -984,14 +986,14 @@ fn determine_undisputed_chain(
)
};
for (i, (_, session, candidates)) in block_descriptions.iter().enumerate() {
for (i, BlockDescription { session, candidates, .. }) in block_descriptions.iter().enumerate() {
if candidates.iter().any(|c| is_possibly_invalid(*session, *c)) {
if i == 0 {
return Ok(None);
} else {
return Ok(Some((
base_number + i as BlockNumber,
block_descriptions[i - 1].0,
block_descriptions[i - 1].block_hash,
)));
}
}
......
......@@ -22,6 +22,7 @@ use polkadot_primitives::v1::{BlakeTwo256, HashT, ValidatorId, Header, SessionIn
use polkadot_node_subsystem::{jaeger, ActiveLeavesUpdate, ActivatedLeaf, LeafStatus};
use polkadot_node_subsystem::messages::{
AllMessages, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
BlockDescription,
};
use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystemContextHandle};
use sp_core::testing::TaskExecutor;
......@@ -672,10 +673,10 @@ fn finality_votes_ignore_disputed_candidates() {
let block_hash_b = Hash::repeat_byte(0x0b);
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
msg: DisputeCoordinatorMessage::DetermineUndisputedChain{
base_number: 10,
block_descriptions: vec![
(block_hash_a, session, vec![candidate_hash]),
BlockDescription { block_hash: block_hash_a, session, candidates: vec![candidate_hash] },
],
tx,
},
......@@ -688,8 +689,8 @@ fn finality_votes_ignore_disputed_candidates() {
msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
base_number: 10,
block_descriptions: vec![
(block_hash_a, session, vec![]),
(block_hash_b, session, vec![candidate_hash]),
BlockDescription { block_hash: block_hash_a, session, candidates: vec![] },
BlockDescription { block_hash: block_hash_b, session, candidates: vec![candidate_hash] },
],
tx,
},
......@@ -804,7 +805,7 @@ fn supermajority_valid_dispute_may_be_finalized() {
msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
base_number: 10,
block_descriptions: vec![
(block_hash_a, session, vec![candidate_hash]),
BlockDescription { block_hash: block_hash_a, session, candidates: vec![candidate_hash] },
],
tx,
},
......@@ -817,8 +818,8 @@ fn supermajority_valid_dispute_may_be_finalized() {
msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
base_number: 10,
block_descriptions: vec![
(block_hash_a, session, vec![]),
(block_hash_b, session, vec![candidate_hash]),
BlockDescription { block_hash: block_hash_a, session, candidates: vec![] },
BlockDescription { block_hash: block_hash_b, session, candidates: vec![candidate_hash] },
],
tx,
},
......
......@@ -112,7 +112,10 @@ polkadot-statement-distribution = { path = "../network/statement-distribution",
[dev-dependencies]
polkadot-test-client = { path = "../test/client" }
env_logger = "0.8.4"
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
env_logger = "0.9.0"
log = "0.4.14"
assert_matches = "1.5.0"
[features]
default = ["db", "full-node"]
......
......@@ -18,23 +18,24 @@
use std::sync::Arc;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::Header as _;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use crate::HeaderProvider;
#[cfg(feature = "full-node")]
use polkadot_primitives::v1::Hash;
/// Returns the block hash of the block at the given `target_number` by walking
/// backwards from the given `current_header`.
pub(super) fn walk_backwards_to_target_block<Block, B>(
backend: &B,
pub(super) fn walk_backwards_to_target_block<Block, HP>(
backend: &HP,
target_number: NumberFor<Block>,
current_header: &Block::Header,
) -> Result<(Block::Hash, NumberFor<Block>), sp_blockchain::Error>
where
Block: BlockT,
B: sp_blockchain::HeaderBackend<Block>,
HP: HeaderProvider<Block>,
{
let mut target_hash = current_header.hash();
let mut target_header = current_header.clone();
......@@ -54,7 +55,7 @@ where
target_hash = *target_header.parent_hash();
target_header = backend
.header(BlockId::Hash(target_hash))?
.header(target_hash)?
.expect("Header known to exist due to the existence of one of its descendants; qed");
}
}
......@@ -69,7 +70,7 @@ pub(crate) struct PauseAfterBlockFor<N>(pub(crate) N, pub(crate) N);
impl<Block, B> grandpa::VotingRule<Block, B> for PauseAfterBlockFor<NumberFor<Block>>
where
Block: BlockT,
B: sp_blockchain::HeaderBackend<Block>,
B: sp_blockchain::HeaderBackend<Block> + 'static,
{
fn restrict_vote(
&self,
......
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// Copyright 2017-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
......@@ -34,6 +34,9 @@ pub use self::overseer::{
create_default_subsystems,
};
#[cfg(test)]
mod tests;
#[cfg(feature = "full-node")]
use {
tracing::info,
......@@ -51,7 +54,6 @@ use {
sp_trie::PrefixedMemoryDB,
sc_client_api::ExecutorProvider,
grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider},
sp_runtime::traits::Header as HeaderT,
};
#[cfg(feature = "full-node")]
......@@ -103,7 +105,18 @@ pub use service::{
};
pub use service::config::{DatabaseConfig, PrometheusConfig};
pub use sp_api::{ApiRef, Core as CoreApi, ConstructRuntimeApi, ProvideRuntimeApi, StateBackend};
pub use sp_runtime::traits::{DigestFor, HashFor, NumberFor, Block as BlockT, self as runtime_traits, BlakeTwo256};
pub use sp_runtime::{
generic,
traits::{
DigestFor,
HashFor,
NumberFor,
Block as BlockT,
Header as HeaderT,
self as runtime_traits,
BlakeTwo256,
},
};
#[cfg(feature = "kusama-native")]
pub use kusama_runtime;
......@@ -114,9 +127,76 @@ pub use rococo_runtime;
pub use westend_runtime;
/// The maximum number of active leaves we forward to the [`Overseer`] on startup.
#[cfg(any(test,feature = "full-node"))]
#[cfg(any(test, feature = "full-node"))]
const MAX_ACTIVE_LEAVES: usize = 4;
/// Provides the header and block number for a hash.
///
/// Decouples `sc_client_api::Backend` and `sp_blockchain::HeaderBackend`.
pub trait HeaderProvider<Block, Error = sp_blockchain::Error>: Send + Sync + 'static
where
Block: BlockT,
Error: std::fmt::Debug + Send + Sync + 'static,
{
/// Obtain the header for a hash.
fn header(
&self,
hash: <Block as BlockT>::Hash,
) -> Result<Option<<Block as BlockT>::Header>, Error>;
/// Obtain the block number for a hash.
fn number(
&self,
hash: <Block as BlockT>::Hash,
) -> Result<Option<<<Block as BlockT>::Header as HeaderT>::Number>, Error>;
}
impl<Block, T> HeaderProvider<Block> for T
where
Block: BlockT,
T: sp_blockchain::HeaderBackend<Block> + 'static,
{
fn header(
&self,
hash: Block::Hash,
) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
<Self as sp_blockchain::HeaderBackend<Block>>::header(
self,
generic::BlockId::<Block>::Hash(hash),
)
}
fn number(
&self,
hash: Block::Hash,
) -> sp_blockchain::Result<Option<<<Block as BlockT>::Header as HeaderT>::Number>> {
<Self as sp_blockchain::HeaderBackend<Block>>::number(self, hash)
}
}
/// Decoupling the provider.
///
/// Mandated since `trait HeaderProvider` can only be
/// implemented once for a generic `T`.
pub trait HeaderProviderProvider<Block>: Send + Sync + 'static
where
Block: BlockT,
{
type Provider: HeaderProvider<Block> + 'static;
fn header_provider(&self) -> &Self::Provider;
}
impl<Block, T> HeaderProviderProvider<Block> for T
where
Block: BlockT,
T: sc_client_api::Backend<Block> + 'static,
{
type Provider = <T as sc_client_api::Backend<Block>>::Blockchain;
fn header_provider(&self) -> &Self::Provider {
self.blockchain()
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
......@@ -205,8 +285,12 @@ fn set_prometheus_registry(config: &mut Configuration) -> Result<(), Error> {
/// Initialize the `Jeager` collector. The destination must listen
/// on the given address and port for `UDP` packets.
#[cfg(any(test,feature = "full-node"))]
fn jaeger_launch_collector_with_agent(spawner: impl SpawnNamed, config: &Configuration, agent: Option<std::net::SocketAddr>) -> Result<(), Error> {
#[cfg(any(test, feature = "full-node"))]
fn jaeger_launch_collector_with_agent(
spawner: impl SpawnNamed,
config: &Configuration,
agent: Option<std::net::SocketAddr>,
) -> Result<(), Error> {
if let Some(agent) = agent {
let cfg = jaeger::JaegerConfig::builder()
.agent(agent)
......@@ -219,7 +303,7 @@ fn jaeger_launch_collector_with_agent(spawner: impl SpawnNamed, config: &Configu
}
#[cfg(feature = "full-node")]
type FullSelectChain = relay_chain_selection::SelectRelayChain<FullBackend>;
type FullSelectChain = relay_chain_selection::SelectRelayChainWithFallback<FullBackend>;
#[cfg(feature = "full-node")]
type FullGrandpaBlockImport<RuntimeApi, Executor> = grandpa::GrandpaBlockImport<
FullBackend, Block, FullClient<RuntimeApi, Executor>, FullSelectChain
......@@ -303,7 +387,7 @@ fn new_partial<RuntimeApi, Executor>(
jaeger_launch_collector_with_agent(task_manager.spawn_handle(), &*config, jaeger_agent)?;
let select_chain = relay_chain_selection::SelectRelayChain::new(
let select_chain = relay_chain_selection::SelectRelayChainWithFallback::new(
backend.clone(),
Handle::new_disconnected(),
polkadot_node_subsystem_util::metrics::Metrics::register(config.prometheus_registry())?,
......@@ -506,7 +590,7 @@ where
.unwrap_or_default()
.into_iter()
.filter_map(|hash| {
let number = client.number(hash).ok()??;
let number = HeaderBackend::number(client, hash).ok()??;
// Only consider leaves that are in maximum an uncle of the best block.
if number < best_block.number().saturating_sub(1) {
......
......@@ -35,19 +35,16 @@
#![cfg(feature = "full-node")]
use {
polkadot_primitives::v1::{
Hash, BlockNumber, Block as PolkadotBlock, Header as PolkadotHeader,
},
polkadot_subsystem::messages::{ApprovalVotingMessage, ChainSelectionMessage},
polkadot_node_subsystem_util::metrics::{self, prometheus},
polkadot_overseer::{Handle, OverseerHandle},
futures::channel::oneshot,
consensus_common::{Error as ConsensusError, SelectChain},
sp_blockchain::HeaderBackend,
sp_runtime::generic::BlockId,
std::sync::Arc,
use polkadot_primitives::v1::{
Hash, BlockNumber, Block as PolkadotBlock, Header as PolkadotHeader,
};
use polkadot_subsystem::messages::{ApprovalVotingMessage, HighestApprovedAncestorBlock, ChainSelectionMessage, DisputeCoordinatorMessage};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use futures::channel::oneshot;
use consensus_common::{Error as ConsensusError, SelectChain};
use std::sync::Arc;
use polkadot_overseer::{AllMessages, Handle, OverseerHandle};
use super::{HeaderProvider, HeaderProviderProvider};
/// The maximum amount of unfinalized blocks we are willing to allow due to approval checking
/// or disputes.
......@@ -109,25 +106,120 @@ impl Metrics {
}
/// A chain-selection implementation which provides safety for relay chains.
pub struct SelectRelayChain<B> {
backend: Arc<B>,
overseer: Handle,
pub struct SelectRelayChainWithFallback<
B: sc_client_api::Backend<PolkadotBlock>,
> {
// A fallback to use in case the overseer is disconnected.
//
// This is used on relay chains which have not yet enabled
// parachains as well as situations where the node is offline.
fallback: sc_consensus::LongestChain<B, PolkadotBlock>,
selection: SelectRelayChain<
B,
Handle,
>,
}
impl<B> Clone for SelectRelayChainWithFallback<B>
where
B: sc_client_api::Backend<PolkadotBlock>,
SelectRelayChain<
B,
Handle,
>: Clone,
{
fn clone(&self) -> Self {
Self {
fallback: self.fallback.clone(),
selection: self.selection.clone(),
}
}
}
impl<B> SelectRelayChainWithFallback<B>
where
B: sc_client_api::Backend<PolkadotBlock> + 'static,
{
/// Create a new [`SelectRelayChainWithFallback`] wrapping the given chain backend
/// and a handle to the overseer.
pub fn new(backend: Arc<B>, overseer: Handle, metrics: Metrics) -> Self {
SelectRelayChainWithFallback {
fallback: sc_consensus::LongestChain::new(backend.clone()),
selection: SelectRelayChain::new(
backend,
overseer,
metrics,
),
}
}
}
impl<B> SelectRelayChainWithFallback<B>
where
B: sc_client_api::Backend<PolkadotBlock> + 'static,
{
/// Given an overseer handle, this connects the [`SelectRelayChainWithFallback`]'s
/// internal handle and its clones to the same overseer.
pub fn connect_to_overseer(
&mut self,
handle: OverseerHandle,
) {
self.selection.overseer.connect_to_overseer(handle);
}
}
#[async_trait::async_trait]
impl<B> SelectChain<PolkadotBlock> for SelectRelayChainWithFallback<B>
where
B: sc_client_api::Backend<PolkadotBlock> + 'static,
{
async fn leaves(&self) -> Result<Vec<Hash>, ConsensusError> {
if self.selection.overseer.is_disconnected() {
return self.fallback.leaves().await
}
self.selection.leaves().await
}
async fn best_chain(&self) -> Result<PolkadotHeader, ConsensusError> {
if self.selection.overseer.is_disconnected() {
return self.fallback.best_chain().await
}
self.selection.best_chain().await
}
async fn finality_target(
&self,
target_hash: Hash,
maybe_max_number: Option<BlockNumber>,
) -> Result<Option<Hash>, ConsensusError> {
if self.selection.overseer.is_disconnected() {
return self.fallback.finality_target(target_hash, maybe_max_number).await
}
self.selection.finality_target(target_hash, maybe_max_number).await