Unverified Commit 19c1d29d authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Dispute Coordinator Subsystem (#3150)



* skeleton for dispute-coordinator

* add coordinator and participation message types

* begin dispute-coordinator DB

* functions for loading

* implement strongly-typed DB transaction

* add some tests for DB transaction

* core logic for pruning

* guide: update candidate-votes key for coordinator

* update candidate-votes key

* use big-endian encoding for session, and implement upper bound generator

* finish implementing pruning

* add a test for note_current_session

* define state of the subsystem itself

* barebones subsystem definition

* control flow

* more control flow

* implement session-updating logic

* trace

* control flow for message handling

* Update node/core/dispute-coordinator/src/lib.rs
Co-authored-by: default avatarAndré Silva <123550+andresilva@users.noreply.github.com>

* Update node/subsystem/src/messages.rs
Co-authored-by: default avatarAndré Silva <123550+andresilva@users.noreply.github.com>

* some more control flow

* guide: remove overlay

* more control flow

* implement some DB getters

* make progress on importing statements

* add SignedDisputeStatement struct

* move ApprovalVote to shared primitives

* add a signing-payload API to explicit dispute statements

* add signing-payload to CompactStatement

* add relay-parent hash to seconded/valid dispute variatns

* correct import

* type-safe wrapper around dispute statements

* use checked dispute statement in message type

* extract rolling session window cache to subsystem-util

* extract session window tests

* approval-voting: use rolling session info cache

* reduce dispute window to match runtime in practice

* add byzantine_threshold and supermajority_threshold utilities to primitives

* integrate rolling session window

* Add PartialOrd to CandidateHash

* add Ord to CandidateHash

* implement active dispute update

* add dispute messages to AllMessages

* add dispute stubs to overseer

* inform dispute participation to participate

* implement issue_local_statement

* implement `determine_undisputed_chain`

* fix warnings

* test harness for dispute coordinator tests

* add more helpers to test harness

* add some more helpers

* some tests for dispute coordinator

* ignore wrong validator indices

* test finality voting rule constraint

* add more tests

* add variants to network bridge

* fix test compilation

* remove most dispute coordinator functionality

as of #3222 we can do most of the work within the approval voting subsystem

* Revert "remove most dispute coordinator functionality"

This reverts commit 9cd615e8

.

* Use thiserror
Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

* Update node/core/dispute-coordinator/src/lib.rs
Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

* extract tests to separate module

* address nit

* adjust run_iteration API
Co-authored-by: default avatarAndré Silva <123550+andresilva@users.noreply.github.com>
Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>
parent 8bf6cc87
Pipeline #142025 canceled with stages
in 20 minutes and 3 seconds
......@@ -6091,6 +6091,30 @@ dependencies = [
"tracing",
]
[[package]]
name = "polkadot-node-core-dispute-coordinator"
version = "0.1.0"
dependencies = [
"assert_matches",
"bitvec",
"derive_more",
"futures 0.3.15",
"kvdb",
"kvdb-memorydb",
"parity-scale-codec",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sc-keystore",
"sp-core",
"sp-keyring",
"sp-keystore",
"thiserror",
"tracing",
]
[[package]]
name = "polkadot-node-core-parachains-inherent"
version = "0.1.0"
......@@ -6221,6 +6245,7 @@ dependencies = [
"sp-consensus-babe",
"sp-consensus-vrf",
"sp-core",
"sp-keystore",
"sp-maybe-compressed-blob",
"sp-runtime",
"thiserror",
......
......@@ -48,6 +48,7 @@ members = [
"node/core/bitfield-signing",
"node/core/candidate-validation",
"node/core/chain-api",
"node/core/dispute-coordinator",
"node/core/parachains-inherent",
"node/core/provisioner",
"node/core/pvf",
......
......@@ -60,7 +60,7 @@ pub type Hash = sp_core::H256;
/// This type is produced by [`CandidateReceipt::hash`].
///
/// This type makes it easy to enforce that a hash is a candidate hash on the type level.
#[derive(Clone, Copy, Encode, Decode, Hash, Eq, PartialEq, Default)]
#[derive(Clone, Copy, Encode, Decode, Hash, Eq, PartialEq, Default, PartialOrd, Ord)]
#[cfg_attr(feature = "std", derive(MallocSizeOf))]
pub struct CandidateHash(pub Hash);
......
......@@ -34,8 +34,11 @@ use polkadot_node_subsystem::{
},
SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
};
use polkadot_primitives::v1::{
Hash, SessionIndex, SessionInfo, CandidateEvent, Header, CandidateHash,
Hash, SessionIndex, CandidateEvent, Header, CandidateHash,
CandidateReceipt, CoreIndex, GroupIndex, BlockNumber, ConsensusLog,
};
use polkadot_node_primitives::approval::{
......@@ -58,32 +61,7 @@ use crate::persisted_entries::CandidateEntry;
use crate::criteria::{AssignmentCriteria, OurAssignment};
use crate::time::{slot_number_to_tick, Tick};
use super::{APPROVAL_SESSIONS, LOG_TARGET, State, DBReader};
/// A rolling window of sessions.
#[derive(Default)]
pub struct RollingSessionWindow {
pub earliest_session: Option<SessionIndex>,
pub session_info: Vec<SessionInfo>,
}
impl RollingSessionWindow {
pub fn session_info(&self, index: SessionIndex) -> Option<&SessionInfo> {
self.earliest_session.and_then(|earliest| {
if index < earliest {
None
} else {
self.session_info.get((index - earliest) as usize)
}
})
}
pub fn latest_session(&self) -> Option<SessionIndex> {
self.earliest_session
.map(|earliest| earliest + (self.session_info.len() as SessionIndex).saturating_sub(1))
}
}
use super::{LOG_TARGET, State, DBReader};
// Given a new chain-head hash, this determines the hashes of all new blocks we should track
// metadata for, given this head. The list will typically include the `head` hash provided unless
......@@ -191,153 +169,6 @@ async fn determine_new_blocks(
Ok(ancestry)
}
// Sessions unavailable in state to cache.
#[derive(Debug)]
struct SessionsUnavailable;
async fn load_all_sessions(
ctx: &mut impl SubsystemContext,
block_hash: Hash,
start: SessionIndex,
end_inclusive: SessionIndex,
) -> Result<Vec<SessionInfo>, SessionsUnavailable> {
let mut v = Vec::new();
for i in start..=end_inclusive {
let (tx, rx)= oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::SessionInfo(i, tx),
).into()).await;
let session_info = match rx.await {
Ok(Ok(Some(s))) => s,
Ok(Ok(None)) => {
tracing::debug!(
target: LOG_TARGET,
"Session {} is missing from session-info state of block {}",
i,
block_hash,
);
return Err(SessionsUnavailable);
}
Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable),
};
v.push(session_info);
}
Ok(v)
}
// When inspecting a new import notification, updates the session info cache to match
// the session of the imported block.
//
// this only needs to be called on heads where we are directly notified about import, as sessions do
// not change often and import notifications are expected to be typically increasing in session number.
//
// some backwards drift in session index is acceptable.
async fn cache_session_info_for_head(
ctx: &mut impl SubsystemContext,
session_window: &mut RollingSessionWindow,
block_hash: Hash,
block_header: &Header,
) -> Result<(), SessionsUnavailable> {
let session_index = {
let (s_tx, s_rx) = oneshot::channel();
// The genesis is guaranteed to be at the beginning of the session and its parent state
// is non-existent. Therefore if we're at the genesis, we request using its state and
// not the parent.
ctx.send_message(RuntimeApiMessage::Request(
if block_header.number == 0 { block_hash } else { block_header.parent_hash },
RuntimeApiRequest::SessionIndexForChild(s_tx),
).into()).await;
match s_rx.await {
Ok(Ok(s)) => s,
Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable),
}
};
match session_window.earliest_session {
None => {
// First block processed on start-up.
let window_start = session_index.saturating_sub(APPROVAL_SESSIONS - 1);
tracing::debug!(
target: LOG_TARGET, "Loading approval window from session {}..={}",
window_start, session_index,
);
match load_all_sessions(ctx, block_hash, window_start, session_index).await {
Err(SessionsUnavailable) => {
tracing::debug!(
target: LOG_TARGET,
"Could not load sessions {}..={} from block {:?} in session {}",
window_start, session_index, block_hash, session_index,
);
return Err(SessionsUnavailable);
},
Ok(s) => {
session_window.earliest_session = Some(window_start);
session_window.session_info = s;
}
}
}
Some(old_window_start) => {
let latest = session_window.latest_session().expect("latest always exists if earliest does; qed");
// Either cached or ancient.
if session_index <= latest { return Ok(()) }
let old_window_end = latest;
let window_start = session_index.saturating_sub(APPROVAL_SESSIONS - 1);
tracing::info!(
target: LOG_TARGET, "Moving approval window from session {}..={} to {}..={}",
old_window_start, old_window_end,
window_start, session_index,
);
// keep some of the old window, if applicable.
let overlap_start = window_start.saturating_sub(old_window_start);
let fresh_start = if latest < window_start {
window_start
} else {
latest + 1
};
match load_all_sessions(ctx, block_hash, fresh_start, session_index).await {
Err(SessionsUnavailable) => {
tracing::warn!(
target: LOG_TARGET,
"Could not load sessions {}..={} from block {:?} in session {}",
latest + 1, session_index, block_hash, session_index,
);
return Err(SessionsUnavailable);
}
Ok(s) => {
let outdated = std::cmp::min(overlap_start as usize, session_window.session_info.len());
session_window.session_info.drain(..outdated);
session_window.session_info.extend(s);
// we need to account for this case:
// window_start ................................... session_index
// old_window_start ........... latest
let new_earliest = std::cmp::max(window_start, old_window_start);
session_window.earliest_session = Some(new_earliest);
}
}
}
}
Ok(())
}
struct ImportedBlockInfo {
included_candidates: Vec<(CandidateHash, CandidateReceipt, CoreIndex, GroupIndex)>,
session_index: SessionIndex,
......@@ -401,7 +232,7 @@ async fn imported_block_info(
Err(_) => return Ok(None),
};
if env.session_window.earliest_session.as_ref().map_or(true, |e| &session_index < e) {
if env.session_window.earliest_session().map_or(true, |e| session_index < e) {
tracing::debug!(target: LOG_TARGET, "Block {} is from ancient session {}. Skipping",
block_hash, session_index);
......@@ -591,21 +422,25 @@ pub(crate) async fn handle_new_head(
}
};
if let Err(SessionsUnavailable)
= cache_session_info_for_head(
ctx,
&mut state.session_window,
head,
&header,
).await
{
tracing::debug!(
target: LOG_TARGET,
"Could not cache session info when processing head {:?}",
head,
);
match state.session_window.cache_session_info_for_head(ctx, head, &header).await {
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
?head,
?e,
"Could not cache session info when processing head.",
);
return Ok(Vec::new())
return Ok(Vec::new())
}
Ok(a @ SessionWindowUpdate::Advanced { .. }) => {
tracing::info!(
target: LOG_TARGET,
update = ?a,
"Advanced session window for approvals",
);
}
Ok(_) => {}
}
// If we've just started the node and haven't yet received any finality notifications,
......@@ -815,7 +650,7 @@ mod tests {
use super::*;
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_primitives::approval::{VRFOutput, VRFProof};
use polkadot_primitives::v1::ValidatorIndex;
use polkadot_primitives::v1::{SessionInfo, ValidatorIndex};
use polkadot_node_subsystem::messages::AllMessages;
use sp_core::testing::TaskExecutor;
use sp_runtime::{Digest, DigestItem};
......@@ -828,7 +663,7 @@ mod tests {
use merlin::Transcript;
use std::{pin::Pin, sync::Arc};
use crate::{criteria, BlockEntry};
use crate::{APPROVAL_SESSIONS, criteria, BlockEntry};
const DATA_COL: u32 = 0;
const NUM_COLUMNS: u32 = 1;
......@@ -884,7 +719,7 @@ mod tests {
fn blank_state() -> State<TestDB> {
State {
session_window: RollingSessionWindow::default(),
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
db: TestDB::default(),
......@@ -897,10 +732,11 @@ mod tests {
-> State<TestDB>
{
State {
session_window: RollingSessionWindow {
earliest_session: Some(index),
session_info: vec![info],
},
session_window: RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
index,
vec![info],
),
..blank_state()
}
}
......@@ -1423,14 +1259,11 @@ mod tests {
.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
.collect::<Vec<_>>();
let session_window = {
let mut window = RollingSessionWindow::default();
window.earliest_session = Some(session);
window.session_info.push(session_info);
window
};
let session_window = RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
);
let header = header.clone();
Box::pin(async move {
......@@ -1537,14 +1370,11 @@ mod tests {
.collect::<Vec<_>>();
let test_fut = {
let session_window = {
let mut window = RollingSessionWindow::default();
window.earliest_session = Some(session);
window.session_info.push(session_info);
window
};
let session_window = RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
);
let header = header.clone();
Box::pin(async move {
......@@ -1645,7 +1475,7 @@ mod tests {
.collect::<Vec<_>>();
let test_fut = {
let session_window = RollingSessionWindow::default();
let session_window = RollingSessionWindow::new(APPROVAL_SESSIONS);
let header = header.clone();
Box::pin(async move {
......@@ -1748,14 +1578,11 @@ mod tests {
.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
.collect::<Vec<_>>();
let session_window = {
let mut window = RollingSessionWindow::default();
window.earliest_session = Some(session);
window.session_info.push(session_info);
window
};
let session_window = RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
);
let header = header.clone();
Box::pin(async move {
......@@ -2019,318 +1846,4 @@ mod tests {
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
fn cache_session_info_test(
expected_start_session: SessionIndex,
session: SessionIndex,
mut window: RollingSessionWindow,
expect_requests_from: SessionIndex,
) {
let header = Header {
digest: Digest::default(),
extrinsics_root: Default::default(),
number: 5,
state_root: Default::default(),
parent_hash: Default::default(),
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let hash = header.hash();
let test_fut = {
let header = header.clone();
Box::pin(async move {
cache_session_info_for_head(
&mut ctx,
&mut window,
hash,
&header,
).await.unwrap();
assert_eq!(window.earliest_session, Some(expected_start_session));
assert_eq!(
window.session_info,
(expected_start_session..=session).map(dummy_session_info).collect::<Vec<_>>(),
);
})
};
let aux_fut = Box::pin(async move {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, header.parent_hash);
let _ = s_tx.send(Ok(session));
}
);
for i in expect_requests_from..=session {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(j, s_tx),
)) => {
assert_eq!(h, hash);
assert_eq!(i, j);
let _ = s_tx.send(Ok(Some(dummy_session_info(i))));
}
);
}
});
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
fn cache_session_info_first_early() {
cache_session_info_test(
0,
1,
RollingSessionWindow::default(),
0,
);
}
#[test]
fn cache_session_info_does_not_underflow() {
let window = RollingSessionWindow {
earliest_session: Some(1),
session_info: vec![dummy_session_info(1)],
};
cache_session_info_test(
1,
2,
window,
2,
);
}
#[test]
fn cache_session_info_first_late() {
cache_session_info_test(
(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
100,
RollingSessionWindow::default(),
(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
);
}
#[test]
fn cache_session_info_jump() {
let window = RollingSessionWindow {
earliest_session: Some(50),
session_info: vec![dummy_session_info(50), dummy_session_info(51), dummy_session_info(52)],
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
100,
window,
(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
);
}
#[test]
fn cache_session_info_roll_full() {
let start = 99 - (APPROVAL_SESSIONS - 1);
let window = RollingSessionWindow {
earliest_session: Some(start),
session_info: (start..=99).map(dummy_session_info).collect(),
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
100,
window,
100, // should only make one request.
);
}
#[test]
fn cache_session_info_roll_many_full() {
let start = 97 - (APPROVAL_SESSIONS - 1);
let window = RollingSessionWindow {
earliest_session: Some(start),
session_info: (start..=97).map(dummy_session_info).collect(),
};
cache_session_info_test(
(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
100,
window,
98,
);
}
#[test]
fn cache_session_info_roll_early() {
let start = 0;
let window = RollingSessionWindow {
earliest_session: Some(start),
session_info: (0..=1).map(dummy_session_info).collect(),
};
cache_session_info_test(
0,
2,
window,
2, // should only make one request.
);
}
#[test]
fn cache_session_info_roll_many_early() {
let start = 0;
let window = RollingSessionWindow {
earliest_session: Some(start),
session_info: (0..=1).map(dummy_session_info).collect(),
};
cache_session_info_test(
0,
3,
window,
2,
);
}
#[test]
fn any_session_unavailable_for_caching_means_no_change() {
let session: SessionIndex = 6;
let start_session = session.saturating_sub(APPROVAL_SESSIONS - 1);