Unverified Commit b74335ac authored by Bernhard Schuster's avatar Bernhard Schuster Committed by GitHub
Browse files

collect included disputes from on-chain (#3924)



* dummy: impl another runtime API

* query the on chain disputes, and inform self

* make use of the refactor

* minro

* SPLIT ME

* write dispute values

* wip

* impl for all runtimes

* chore: fmt

* [] -> get

* fixup mock runtime

* fixup

* fixup discovery for overseer init

* chore: fmt

* spellcheck

* rename imported_on_chain_disputes -> on_chain_votes

* reduction

* make it mockable

* rename and refactor

* don't query on chain info if it's not needed

* yikes

* fmt

* fix test

* minimal fix for existing tests

* attempt to fetch the session info from the rolling window before falling back

* moved

* comments

* comments

* test for backing votes

* rename

* Update runtime/polkadot/src/lib.rs

* chore: spellcheck + dict

* chore: fmt

* fixup cache size

* add warning

* logging, rationale, less defense

* introduce new unchecked, that still checks in debug builds

* fix

* draft alt approach

* fix unused imports

* include the session

* Update node/core/dispute-coordinator/src/real/mod.rs

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>

* provide where possible

* expand comment

* fixin

* fixup

* ValidityVote <-> ValidityAttestation <-> CompactStatement has a 1:1 representation

* mark TODO

* Update primitives/src/v1/mod.rs

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>

* address review comments

* update docs

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>
parent 0559d2d7
Pipeline #160945 passed with stages
in 46 minutes and 1 second
......@@ -10,6 +10,7 @@ source target/release/completion-scripts/polkadot.bash
```
You can find completion scripts for:
- bash
- fish
- zsh
......
......@@ -31,6 +31,9 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use futures::{channel::oneshot, prelude::*};
use kvdb::KeyValueDB;
use parity_scale_codec::{Decode, Encode, Error as CodecError};
use polkadot_node_primitives::{
CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement,
DISPUTE_WINDOW,
......@@ -39,7 +42,7 @@ use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{
BlockDescription, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult,
DisputeParticipationMessage, ImportStatementsResult, RuntimeApiMessage, RuntimeApiRequest,
},
overseer, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError,
};
......@@ -47,13 +50,10 @@ use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
};
use polkadot_primitives::v1::{
BlockNumber, CandidateHash, CandidateReceipt, DisputeStatement, Hash, SessionIndex,
SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement,
DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
};
use futures::{channel::oneshot, prelude::*};
use kvdb::KeyValueDB;
use parity_scale_codec::{Decode, Encode, Error as CodecError};
use sc_keystore::LocalKeystore;
use crate::metrics::Metrics;
......@@ -348,6 +348,8 @@ where
&mut overlay_db,
&mut state,
update.activated.into_iter().map(|a| a.hash),
clock.now(),
&metrics,
)
.await?;
if !state.recovery_state.complete() {
......@@ -476,6 +478,8 @@ async fn handle_new_activations(
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
new_activations: impl IntoIterator<Item = Hash>,
now: u64,
metrics: &Metrics,
) -> Result<(), Error> {
for new_leaf in new_activations {
match state.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await {
......@@ -485,7 +489,6 @@ async fn handle_new_activations(
err = ?e,
"Failed to update session cache for disputes",
);
continue
},
Ok(SessionWindowUpdate::Initialized { window_end, .. }) |
......@@ -499,10 +502,217 @@ async fn handle_new_activations(
db::v1::note_current_session(overlay_db, session)?;
}
},
_ => {},
Ok(SessionWindowUpdate::Unchanged) => {},
};
scrape_on_chain_votes(ctx, overlay_db, state, new_leaf, now, metrics).await?;
}
Ok(())
}
/// Scrapes on-chain votes (backing votes and concluded disputes) for a active leaf of the relay chain.
async fn scrape_on_chain_votes(
ctx: &mut (impl SubsystemContext<Message = DisputeCoordinatorMessage>
+ overseer::SubsystemContext<Message = DisputeCoordinatorMessage>),
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
new_leaf: Hash,
now: u64,
metrics: &Metrics,
) -> Result<(), Error> {
// obtain the concluded disputes as well as the candidate backing votes
// from the new leaf
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = {
let (tx, rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
new_leaf,
RuntimeApiRequest::FetchOnChainVotes(tx),
))
.await;
match rx.await {
Ok(Ok(Some(val))) => val,
Ok(Ok(None)) => {
tracing::trace!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
"No on chain votes stored for relay chain leaf");
return Ok(())
},
Ok(Err(e)) => {
tracing::debug!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
error = ?e,
"Could not retrieve on chain votes due to an API error");
return Ok(())
},
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
error = ?e,
"Could not retrieve onchain votes due to oneshot cancellation");
return Ok(())
},
}
};
if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
return Ok(())
}
// Obtain the session info, for sake of `ValidatorId`s
// either from the rolling session window.
// Must be called _after_ `fn cache_session_info_for_head`
// which guarantees that the session info is available
// for the current session.
let session_info: SessionInfo =
if let Some(session_info) = state.rolling_session_window.session_info(session) {
session_info.clone()
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
"Could not retrieve session info from rolling session window");
return Ok(())
};
// Scraped on-chain backing votes for the candidates with
// the new active leaf as if we received them via gossip.
for (candidate_receipt, backers) in backing_validators_per_candidate {
let candidate_hash = candidate_receipt.hash();
let statements = backers.into_iter().filter_map(|(validator_index, attestation)| {
let validator_public: ValidatorId = session_info
.validators
.get(validator_index.0 as usize)
.or_else(|| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
"Missing public key for validator {:?}",
&validator_index);
None
})
.cloned()?;
let validator_signature = attestation.signature().clone();
let valid_statement_kind = match attestation.to_compact_statement(candidate_hash) {
CompactStatement::Seconded(_) =>
ValidDisputeStatementKind::BackingSeconded(new_leaf),
CompactStatement::Valid(_) => ValidDisputeStatementKind::BackingValid(new_leaf),
};
let signed_dispute_statement =
SignedDisputeStatement::new_unchecked_from_trusted_source(
DisputeStatement::Valid(valid_statement_kind),
candidate_hash,
session,
validator_public,
validator_signature,
);
Some((signed_dispute_statement, validator_index))
});
let import_result = handle_import_statements(
ctx,
overlay_db,
state,
candidate_hash,
MaybeCandidateReceipt::Provides(candidate_receipt),
session,
statements,
now,
metrics,
)
.await?;
match import_result {
ImportStatementsResult::ValidImport => tracing::trace!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Imported backing vote from on-chain"),
ImportStatementsResult::InvalidImport => tracing::warn!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Attempted import of on-chain backing votes failed"),
}
}
if disputes.is_empty() {
return Ok(())
}
// Import concluded disputes from on-chain, this already went through a vote so it's assumed
// as verified. This will only be stored, gossiping it is not necessary.
// First try to obtain all the backings which ultimately contain the candidate
// receipt which we need.
for DisputeStatementSet { candidate_hash, session, statements } in disputes {
let statements = statements
.into_iter()
.filter_map(|(dispute_statement, validator_index, validator_signature)| {
let session_info: SessionInfo = if let Some(session_info) =
state.rolling_session_window.session_info(session)
{
session_info.clone()
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Could not retrieve session info from rolling session window for recently concluded dispute");
return None
};
let validator_public: ValidatorId = session_info
.validators
.get(validator_index.0 as usize)
.or_else(|| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Missing public key for validator {:?} that participated in concluded dispute",
&validator_index);
None
})
.cloned()?;
Some((
SignedDisputeStatement::new_unchecked_from_trusted_source(
dispute_statement,
candidate_hash,
session,
validator_public,
validator_signature,
),
validator_index,
))
})
.collect::<Vec<_>>();
let import_result = handle_import_statements(
ctx,
overlay_db,
state,
candidate_hash,
// TODO <https://github.com/paritytech/polkadot/issues/4011>
MaybeCandidateReceipt::AssumeBackingVotePresent,
session,
statements,
now,
metrics,
)
.await?;
match import_result {
ImportStatementsResult::ValidImport => tracing::trace!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?candidate_hash,
?session,
"Imported statement of conlcuded dispute from on-chain"),
ImportStatementsResult::InvalidImport => tracing::warn!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?candidate_hash,
?session,
"Attempted import of on-chain statement of concluded dispute failed"),
}
}
Ok(())
}
......@@ -527,7 +737,7 @@ async fn handle_incoming(
overlay_db,
state,
candidate_hash,
candidate_receipt,
MaybeCandidateReceipt::Provides(candidate_receipt),
session,
statements,
now,
......@@ -622,14 +832,22 @@ fn insert_into_statement_vec<T>(
vec.insert(pos, (tag, val_index, val_signature));
}
#[derive(Debug, Clone)]
enum MaybeCandidateReceipt {
/// Directly provides the candiate receipt.
Provides(CandidateReceipt),
/// Assumes it was seen before by means of seconded message.
AssumeBackingVotePresent,
}
async fn handle_import_statements(
ctx: &mut impl SubsystemContext,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
candidate_receipt: MaybeCandidateReceipt,
session: SessionIndex,
statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
statements: impl IntoIterator<Item = (SignedDisputeStatement, ValidatorIndex)>,
now: Timestamp,
metrics: &Metrics,
) -> Result<ImportStatementsResult, Error> {
......@@ -638,7 +856,7 @@ async fn handle_import_statements(
return Ok(ImportStatementsResult::InvalidImport)
}
let validators = match state.rolling_session_window.session_info(session) {
let session_info = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
......@@ -648,21 +866,40 @@ async fn handle_import_statements(
return Ok(ImportStatementsResult::InvalidImport)
},
Some(info) => info.validators.clone(),
Some(info) => info,
};
let validators = session_info.validators.clone();
let n_validators = validators.len();
let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold(n_validators);
let mut votes = overlay_db
// In case we are not provided with a candidate receipt
// we operate under the assumption, that a previous vote
// which included a `CandidateReceipt` was seen.
// This holds since every block is preceeded by the `Backing`-phase.
//
// There is one exception: A sufficiently sophisticated attacker could prevent
// us from seeing the backing votes by witholding arbitrary blocks, and hence we do
// not have a `CandidateReceipt` available.
let mut votes = match overlay_db
.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
valid: Vec::new(),
invalid: Vec::new(),
});
{
Some(votes) => votes,
None =>
if let MaybeCandidateReceipt::Provides(candidate_receipt) = candidate_receipt {
CandidateVotes { candidate_receipt, valid: Vec::new(), invalid: Vec::new() }
} else {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
return Ok(ImportStatementsResult::InvalidImport)
},
};
let candidate_receipt = votes.candidate_receipt.clone();
// Update candidate votes.
for (statement, val_index) in statements {
......@@ -906,7 +1143,7 @@ async fn issue_local_statement(
overlay_db,
state,
candidate_hash,
candidate_receipt,
MaybeCandidateReceipt::Provides(candidate_receipt),
session,
statements,
now,
......@@ -927,7 +1164,7 @@ async fn issue_local_statement(
target: LOG_TARGET,
?candidate_hash,
?session,
"handle_import_statements` considers our own votes invalid!"
"`handle_import_statements` considers our own votes invalid!"
);
},
Ok(ImportStatementsResult::ValidImport) => {
......@@ -935,7 +1172,7 @@ async fn issue_local_statement(
target: LOG_TARGET,
?candidate_hash,
?session,
"handle_import_statements` successfully imported our vote!"
"`handle_import_statements` successfully imported our vote!"
);
},
}
......
......@@ -203,6 +203,17 @@ impl TestState {
}
)
}
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_new_leaf,
RuntimeApiRequest::FetchOnChainVotes(tx),
)) => {
// add some `BackedCandidates` or resolved disputes here as needed
tx.send(Ok(Some(ScrapedOnChainVotes::default()))).unwrap();
}
)
}
async fn handle_resume_sync(
......
......@@ -24,7 +24,8 @@ use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent,
CommittedCandidateReceipt, CoreState, GroupRotationInfo, Hash, Id as ParaId,
InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData,
SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash,
ValidatorId, ValidatorIndex,
};
const AUTHORITIES_CACHE_SIZE: usize = 128 * 1024;
......@@ -41,6 +42,7 @@ const SESSION_INFO_CACHE_SIZE: usize = 64 * 1024;
const DMQ_CONTENTS_CACHE_SIZE: usize = 64 * 1024;
const INBOUND_HRMP_CHANNELS_CACHE_SIZE: usize = 64 * 1024;
const CURRENT_BABE_EPOCH_CACHE_SIZE: usize = 64 * 1024;
const ON_CHAIN_VOTES_CACHE_SIZE: usize = 3 * 1024;
struct ResidentSizeOf<T>(T);
......@@ -98,6 +100,7 @@ pub(crate) struct RequestResultCache {
ResidentSizeOf<BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>>>,
>,
current_babe_epoch: MemoryLruCache<Hash, DoesNotAllocate<Epoch>>,
on_chain_votes: MemoryLruCache<Hash, ResidentSizeOf<Option<ScrapedOnChainVotes>>>,
}
impl Default for RequestResultCache {
......@@ -120,6 +123,7 @@ impl Default for RequestResultCache {
dmq_contents: MemoryLruCache::new(DMQ_CONTENTS_CACHE_SIZE),
inbound_hrmp_channels_contents: MemoryLruCache::new(INBOUND_HRMP_CHANNELS_CACHE_SIZE),
current_babe_epoch: MemoryLruCache::new(CURRENT_BABE_EPOCH_CACHE_SIZE),
on_chain_votes: MemoryLruCache::new(ON_CHAIN_VOTES_CACHE_SIZE),
}
}
}
......@@ -320,6 +324,21 @@ impl RequestResultCache {
pub(crate) fn cache_current_babe_epoch(&mut self, relay_parent: Hash, epoch: Epoch) {
self.current_babe_epoch.insert(relay_parent, DoesNotAllocate(epoch));
}
pub(crate) fn on_chain_votes(
&mut self,
relay_parent: &Hash,
) -> Option<&Option<ScrapedOnChainVotes>> {
self.on_chain_votes.get(relay_parent).map(|v| &v.0)
}
pub(crate) fn cache_on_chain_votes(
&mut self,
relay_parent: Hash,
scraped: Option<ScrapedOnChainVotes>,
) {
self.on_chain_votes.insert(relay_parent, ResidentSizeOf(scraped));
}
}
pub(crate) enum RequestResult {
......@@ -342,4 +361,5 @@ pub(crate) enum RequestResult {
BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>>,
),
CurrentBabeEpoch(Hash, Epoch),
FetchOnChainVotes(Hash, Option<ScrapedOnChainVotes>),
}
......@@ -143,6 +143,8 @@ where
.cache_inbound_hrmp_channel_contents((relay_parent, para_id), contents),
CurrentBabeEpoch(relay_parent, epoch) =>
self.requests_cache.cache_current_babe_epoch(relay_parent, epoch),
FetchOnChainVotes(relay_parent, scraped) =>
self.requests_cache.cache_on_chain_votes(relay_parent, scraped),
}
}
......@@ -209,6 +211,8 @@ where
.map(|sender| Request::InboundHrmpChannelsContents(id, sender)),
Request::CurrentBabeEpoch(sender) =>
query!(current_babe_epoch(), sender).map(|sender| Request::CurrentBabeEpoch(sender)),
Request::FetchOnChainVotes(sender) =>
query!(on_chain_votes(), sender).map(|sender| Request::FetchOnChainVotes(sender)),
}
}
......@@ -342,6 +346,7 @@ where
Request::InboundHrmpChannelsContents(id, sender) =>
query!(InboundHrmpChannelsContents, inbound_hrmp_channels_contents(id), sender),
Request::CurrentBabeEpoch(sender) => query!(CurrentBabeEpoch, current_epoch(), sender),
Request::FetchOnChainVotes(sender) => query!(FetchOnChainVotes, on_chain_votes(), sender),
}
}
......
......@@ -22,8 +22,8 @@ use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, GroupRotationInfo,
Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
PersistedValidationData, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash,
ValidatorId, ValidatorIndex,
PersistedValidationData, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex,
};
use sp_core::testing::TaskExecutor;
use std::{
......@@ -49,6 +49,7 @@ struct MockRuntimeApi {
dmq_contents: HashMap<ParaId, Vec<InboundDownwardMessage>>,
hrmp_channels: HashMap<ParaId, BTreeMap<ParaId, Vec<InboundHrmpMessage>>>,
babe_epoch: Option<BabeEpoch>,
on_chain_votes: Option<ScrapedOnChainVotes>,
}
impl ProvideRuntimeApi<Block> for MockRuntimeApi {
......@@ -149,6 +150,10 @@ sp_api::mock_impl_runtime_apis! {
) -> Option<ValidationCode> {
self.validation_code_by_hash.get(&hash).map(|c| c.clone())
}
fn on_chain_votes(&self) -> Option<ScrapedOnChainVotes> {
self.on_chain_votes.clone()
}
}
impl BabeApi<Block> for MockRuntimeApi {
......
......@@ -66,6 +66,26 @@ impl CandidateVotes {
}
impl SignedDisputeStatement {
/// Create a new `SignedDisputeStatement` from information
/// that is available on-chain, and hence already can be trusted.
///
/// Attention: Not to be used other than with guaranteed fetches.
pub fn new_unchecked_from_trusted_source(
dispute_statement: DisputeStatement,
candidate_hash: CandidateHash,
session_index: SessionIndex,
validator_public: ValidatorId,
validator_signature: ValidatorSignature,
) -> Self {
SignedDisputeStatement {
dispute_statement,
candidate_hash,
validator_public,
validator_signature,
session_index,
}
}
/// Create a new `SignedDisputeStatement`, which is only possible by checking the signature.
pub fn new_checked(
dispute_statement: DisputeStatement,
......
......@@ -643,6 +643,8 @@ pub enum RuntimeApiRequest {
),
/// Get information about the BABE epoch the block was included in.
CurrentBabeEpoch(RuntimeApiSender<BabeEpoch>),
/// Get all disputes in relation to a relay parent.
FetchOnChainVotes(RuntimeApiSender<Option<polkadot_primitives::v1::ScrapedOnChainVotes>>),
}
/// A message to the Runtime API subsystem.
......
......@@ -735,6 +735,7 @@ impl CompactStatement {
/// An either implicit or explicit attestation to the validity of a parachain
/// candidate.
#[derive(Clone, Eq, PartialEq, Decode, Encode, RuntimeDebug, TypeInfo)]
#[cfg_attr(feature = "std", derive(MallocSizeOf))]
pub enum ValidityAttestation {
/// Implicit validity attestation by issuing.
/// This corresponds to issuance of a `Candidate` statement.
......@@ -747,6 +748,18 @@ pub enum ValidityAttestation {
}
impl ValidityAttestation {
/// Produce the underlying signed payload of the attestation, given the hash of the candidate,
/// which should be known in context.
pub fn to_compact_statement(&self, candidate_hash: CandidateHash) -> CompactStatement {
// Explicit and implicit map directly from
// `ValidityVote::Valid` and `ValidityVote::Issued`, and hence there is a
// `1:1` relationshow which enables the conversion.
match *self {
ValidityAttestation::Implicit(_) => CompactStatement::Seconded(candidate_hash),
ValidityAttestation::Explicit(_) => CompactStatement::Valid(candidate_hash),
}
}
/// Get a reference to the signature.
pub fn signature(&self) -> &ValidatorSignature {
match *self {
......