// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! Implements the dispute coordinator subsystem.
//!
//! This is the central subsystem of the node-side components which participate in disputes.
//! This subsystem wraps a database which tracks all statements observed by all validators over some window of sessions.
//! Votes older than this session window are pruned.
//!
//! This subsystem will be the point which produce dispute votes, either positive or negative, based on locally-observed
//! validation results as well as a sink for votes received by other subsystems. When importing a dispute vote from
//! another node, this will trigger the dispute participation subsystem to recover and validate the block and call
//! back to this subsystem.
use std::{
collections::HashSet,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use futures::{channel::oneshot, prelude::*};
use kvdb::KeyValueDB;
use parity_scale_codec::{Decode, Encode, Error as CodecError};
use polkadot_node_primitives::{
CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement,
DISPUTE_WINDOW,
};
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{
BlockDescription, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult, RuntimeApiMessage, RuntimeApiRequest,
},
overseer, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError,
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
};
use polkadot_primitives::v1::{
BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement,
DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
};
use sc_keystore::LocalKeystore;
use crate::metrics::Metrics;
use backend::{Backend, OverlayedBackend};
use db::v1::{DbBackend, RecentDisputes};
mod backend;
mod db;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::dispute-coordinator";
// The choice here is fairly arbitrary. But any dispute that concluded more than a few minutes ago
// is not worth considering anymore. Changing this value has little to no bearing on consensus,
// and really only affects the work that the node might do on startup during periods of many disputes.
const ACTIVE_DURATION_SECS: Timestamp = 180;
/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots.
type Timestamp = u64;
#[derive(Eq, PartialEq)]
enum Participation {
Pending,
Complete,
}
impl Participation {
fn complete(&mut self) -> bool {
let complete = *self == Participation::Complete;
if !complete {
*self = Participation::Complete
}
complete
}
}
struct State {
keystore: Arc,
highest_session: Option,
rolling_session_window: RollingSessionWindow,
recovery_state: Participation,
}
/// Configuration for the dispute coordinator subsystem.
#[derive(Debug, Clone, Copy)]
pub struct Config {
/// The data column in the store to use for dispute data.
pub col_data: u32,
}
impl Config {
fn column_config(&self) -> db::v1::ColumnConfiguration {
db::v1::ColumnConfiguration { col_data: self.col_data }
}
}
/// An implementation of the dispute coordinator subsystem.
pub struct DisputeCoordinatorSubsystem {
config: Config,
store: Arc,
keystore: Arc,
metrics: Metrics,
}
impl DisputeCoordinatorSubsystem {
/// Create a new instance of the subsystem.
pub fn new(
store: Arc,
config: Config,
keystore: Arc,
metrics: Metrics,
) -> Self {
DisputeCoordinatorSubsystem { store, config, keystore, metrics }
}
}
impl overseer::Subsystem for DisputeCoordinatorSubsystem
where
Context: SubsystemContext,
Context: overseer::SubsystemContext,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let backend = DbBackend::new(self.store.clone(), self.config.column_config());
let future = run(self, ctx, backend, Box::new(SystemClock)).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "dispute-coordinator-subsystem", future }
}
}
trait Clock: Send + Sync {
fn now(&self) -> Timestamp;
}
struct SystemClock;
impl Clock for SystemClock {
fn now(&self) -> Timestamp {
// `SystemTime` is notoriously non-monotonic, so our timers might not work
// exactly as expected.
//
// Regardless, disputes are considered active based on an order of minutes,
// so a few seconds of slippage in either direction shouldn't affect the
// amount of work the node is doing significantly.
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Current time is before unix epoch. Validation will not work correctly."
);
0
},
}
}
}
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
#[error(transparent)]
ChainApi(#[from] ChainApiError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error("Oneshot send failed")]
OneshotSend,
#[error(transparent)]
Subsystem(#[from] SubsystemError),
#[error(transparent)]
Codec(#[from] CodecError),
}
impl From for Error {
fn from(err: db::v1::Error) -> Self {
match err {
db::v1::Error::Io(io) => Self::Io(io),
db::v1::Error::Codec(e) => Self::Codec(e),
}
}
}
impl Error {
fn trace(&self) {
match self {
// don't spam the log with spurious errors
Self::RuntimeApi(_) | Self::Oneshot(_) =>
tracing::debug!(target: LOG_TARGET, err = ?self),
// it's worth reporting otherwise
_ => tracing::warn!(target: LOG_TARGET, err = ?self),
}
}
}
/// The status of dispute. This is a state machine which can be altered by the
/// helper methods.
#[derive(Debug, Clone, Copy, Encode, Decode, PartialEq)]
pub enum DisputeStatus {
/// The dispute is active and unconcluded.
#[codec(index = 0)]
Active,
/// The dispute has been concluded in favor of the candidate
/// since the given timestamp.
#[codec(index = 1)]
ConcludedFor(Timestamp),
/// The dispute has been concluded against the candidate
/// since the given timestamp.
///
/// This takes precedence over `ConcludedFor` in the case that
/// both are true, which is impossible unless a large amount of
/// validators are participating on both sides.
#[codec(index = 2)]
ConcludedAgainst(Timestamp),
}
impl DisputeStatus {
/// Initialize the status to the active state.
pub fn active() -> DisputeStatus {
DisputeStatus::Active
}
/// Transition the status to a new status after observing the dispute has concluded for the candidate.
/// This may be a no-op if the status was already concluded.
pub fn concluded_for(self, now: Timestamp) -> DisputeStatus {
match self {
DisputeStatus::Active => DisputeStatus::ConcludedFor(now),
DisputeStatus::ConcludedFor(at) => DisputeStatus::ConcludedFor(std::cmp::min(at, now)),
against => against,
}
}
/// Transition the status to a new status after observing the dispute has concluded against the candidate.
/// This may be a no-op if the status was already concluded.
pub fn concluded_against(self, now: Timestamp) -> DisputeStatus {
match self {
DisputeStatus::Active => DisputeStatus::ConcludedAgainst(now),
DisputeStatus::ConcludedFor(at) =>
DisputeStatus::ConcludedAgainst(std::cmp::min(at, now)),
DisputeStatus::ConcludedAgainst(at) =>
DisputeStatus::ConcludedAgainst(std::cmp::min(at, now)),
}
}
/// Whether the disputed candidate is possibly invalid.
pub fn is_possibly_invalid(&self) -> bool {
match self {
DisputeStatus::Active | DisputeStatus::ConcludedAgainst(_) => true,
DisputeStatus::ConcludedFor(_) => false,
}
}
/// Yields the timestamp this dispute concluded at, if any.
pub fn concluded_at(&self) -> Option {
match self {
DisputeStatus::Active => None,
DisputeStatus::ConcludedFor(at) | DisputeStatus::ConcludedAgainst(at) => Some(*at),
}
}
}
async fn run(
subsystem: DisputeCoordinatorSubsystem,
mut ctx: Context,
mut backend: B,
clock: Box,
) where
Context: overseer::SubsystemContext,
Context: SubsystemContext,
B: Backend,
{
loop {
let res = run_until_error(&mut ctx, &subsystem, &mut backend, &*clock).await;
match res {
Err(e) => {
e.trace();
if let Error::Subsystem(SubsystemError::Context(_)) = e {
break
}
},
Ok(()) => {
tracing::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
break
},
}
}
}
// Run the subsystem until an error is encountered or a `conclude` signal is received.
// Most errors are non-fatal and should lead to another call to this function.
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async fn run_until_error(
ctx: &mut Context,
subsystem: &DisputeCoordinatorSubsystem,
backend: &mut B,
clock: &dyn Clock,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext,
Context: SubsystemContext,
B: Backend,
{
let mut state = State {
keystore: subsystem.keystore.clone(),
highest_session: None,
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
recovery_state: Participation::Pending,
};
let metrics = &subsystem.metrics;
loop {
let mut overlay_db = OverlayedBackend::new(backend);
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
handle_new_activations(
ctx,
&mut overlay_db,
&mut state,
update.activated.into_iter().map(|a| a.hash),
clock.now(),
&metrics,
)
.await?;
if !state.recovery_state.complete() {
handle_startup(ctx, &mut overlay_db, &mut state).await?;
}
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
FromOverseer::Communication { msg } =>
handle_incoming(ctx, &mut overlay_db, &mut state, msg, clock.now(), &metrics)
.await?,
}
if !overlay_db.is_empty() {
let ops = overlay_db.into_write_ops();
backend.write(ops)?;
}
}
}
// Restores the subsystem's state before proceeding with the main event loop. Primarily, this
// repopulates the rolling session window the relevant session information to handle incoming
// import statement requests.
//
// This method also retransmits a `DisputeParticiationMessage::Participate` for any non-concluded
// disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an
// arbitration on the dispute.
async fn handle_startup(
ctx: &mut Context,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext,
Context: SubsystemContext,
{
let recent_disputes = match overlay_db.load_recent_disputes() {
Ok(Some(disputes)) => disputes,
Ok(None) => return Ok(()),
Err(e) => {
tracing::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
return Err(e.into())
},
};
// Filter out disputes that have already concluded.
let active_disputes = recent_disputes
.into_iter()
.filter(|(_, status)| *status == DisputeStatus::Active)
.collect::();
for ((session, ref candidate_hash), _) in active_disputes.into_iter() {
let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) {
Ok(Some(votes)) => votes.into(),
Ok(None) => continue,
Err(e) => {
tracing::error!(
target: LOG_TARGET,
"Failed initial load of candidate votes: {:?}",
e
);
continue
},
};
let validators = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
continue
},
Some(info) => info.validators.clone(),
};
let n_validators = validators.len();
let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect();
// Determine if there are any missing local statements for this dispute. Validators are
// filtered if:
// 1) their statement already exists, or
// 2) the validator key is not in the local keystore (i.e. the validator is remote).
// The remaining set only contains local validators that are also missing statements.
let missing_local_statement = validators
.iter()
.enumerate()
.map(|(index, validator)| (ValidatorIndex(index as _), validator))
.any(|(index, validator)| {
!voted_indices.contains(&index) &&
state
.keystore
.key_pair::(validator)
.ok()
.map_or(false, |v| v.is_some())
});
// Send a `DisputeParticipationMessage` for all non-concluded disputes which do not have a
// recorded local statement.
if missing_local_statement {
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash: *candidate_hash,
candidate_receipt: votes.candidate_receipt.clone(),
session,
n_validators: n_validators as u32,
report_availability,
})
.await;
if !receive_availability.await? {
tracing::debug!(
target: LOG_TARGET,
"Participation failed. Candidate not available"
);
}
}
}
Ok(())
}
async fn handle_new_activations(
ctx: &mut (impl SubsystemContext
+ overseer::SubsystemContext),
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
new_activations: impl IntoIterator- ,
now: u64,
metrics: &Metrics,
) -> Result<(), Error> {
for new_leaf in new_activations {
match state.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await {
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
);
continue
},
Ok(SessionWindowUpdate::Initialized { window_end, .. }) |
Ok(SessionWindowUpdate::Advanced { new_window_end: window_end, .. }) => {
let session = window_end;
if state.highest_session.map_or(true, |s| s < session) {
tracing::trace!(target: LOG_TARGET, session, "Observed new session. Pruning");
state.highest_session = Some(session);
db::v1::note_current_session(overlay_db, session)?;
}
},
Ok(SessionWindowUpdate::Unchanged) => {},
};
scrape_on_chain_votes(ctx, overlay_db, state, new_leaf, now, metrics).await?;
}
Ok(())
}
/// Scrapes on-chain votes (backing votes and concluded disputes) for a active leaf of the relay chain.
async fn scrape_on_chain_votes(
ctx: &mut (impl SubsystemContext
+ overseer::SubsystemContext),
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
new_leaf: Hash,
now: u64,
metrics: &Metrics,
) -> Result<(), Error> {
// obtain the concluded disputes as well as the candidate backing votes
// from the new leaf
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = {
let (tx, rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
new_leaf,
RuntimeApiRequest::FetchOnChainVotes(tx),
))
.await;
match rx.await {
Ok(Ok(Some(val))) => val,
Ok(Ok(None)) => {
tracing::trace!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
"No on chain votes stored for relay chain leaf");
return Ok(())
},
Ok(Err(e)) => {
tracing::debug!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
error = ?e,
"Could not retrieve on chain votes due to an API error");
return Ok(())
},
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
error = ?e,
"Could not retrieve onchain votes due to oneshot cancellation");
return Ok(())
},
}
};
if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
return Ok(())
}
// Obtain the session info, for sake of `ValidatorId`s
// either from the rolling session window.
// Must be called _after_ `fn cache_session_info_for_head`
// which guarantees that the session info is available
// for the current session.
let session_info: SessionInfo =
if let Some(session_info) = state.rolling_session_window.session_info(session) {
session_info.clone()
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
"Could not retrieve session info from rolling session window");
return Ok(())
};
// Scraped on-chain backing votes for the candidates with
// the new active leaf as if we received them via gossip.
for (candidate_receipt, backers) in backing_validators_per_candidate {
let candidate_hash = candidate_receipt.hash();
let statements = backers.into_iter().filter_map(|(validator_index, attestation)| {
let validator_public: ValidatorId = session_info
.validators
.get(validator_index.0 as usize)
.or_else(|| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
"Missing public key for validator {:?}",
&validator_index);
None
})
.cloned()?;
let validator_signature = attestation.signature().clone();
let valid_statement_kind = match attestation.to_compact_statement(candidate_hash) {
CompactStatement::Seconded(_) =>
ValidDisputeStatementKind::BackingSeconded(new_leaf),
CompactStatement::Valid(_) => ValidDisputeStatementKind::BackingValid(new_leaf),
};
let signed_dispute_statement =
SignedDisputeStatement::new_unchecked_from_trusted_source(
DisputeStatement::Valid(valid_statement_kind),
candidate_hash,
session,
validator_public,
validator_signature,
);
Some((signed_dispute_statement, validator_index))
});
let import_result = handle_import_statements(
ctx,
overlay_db,
state,
candidate_hash,
MaybeCandidateReceipt::Provides(candidate_receipt),
session,
statements,
now,
metrics,
)
.await?;
match import_result {
ImportStatementsResult::ValidImport => tracing::trace!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Imported backing vote from on-chain"),
ImportStatementsResult::InvalidImport => tracing::warn!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Attempted import of on-chain backing votes failed"),
}
}
if disputes.is_empty() {
return Ok(())
}
// Import concluded disputes from on-chain, this already went through a vote so it's assumed
// as verified. This will only be stored, gossiping it is not necessary.
// First try to obtain all the backings which ultimately contain the candidate
// receipt which we need.
for DisputeStatementSet { candidate_hash, session, statements } in disputes {
let statements = statements
.into_iter()
.filter_map(|(dispute_statement, validator_index, validator_signature)| {
let session_info: SessionInfo = if let Some(session_info) =
state.rolling_session_window.session_info(session)
{
session_info.clone()
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Could not retrieve session info from rolling session window for recently concluded dispute");
return None
};
let validator_public: ValidatorId = session_info
.validators
.get(validator_index.0 as usize)
.or_else(|| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?new_leaf,
?session,
"Missing public key for validator {:?} that participated in concluded dispute",
&validator_index);
None
})
.cloned()?;
Some((
SignedDisputeStatement::new_unchecked_from_trusted_source(
dispute_statement,
candidate_hash,
session,
validator_public,
validator_signature,
),
validator_index,
))
})
.collect::>();
let import_result = handle_import_statements(
ctx,
overlay_db,
state,
candidate_hash,
// TODO
MaybeCandidateReceipt::AssumeBackingVotePresent,
session,
statements,
now,
metrics,
)
.await?;
match import_result {
ImportStatementsResult::ValidImport => tracing::trace!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?candidate_hash,
?session,
"Imported statement of conlcuded dispute from on-chain"),
ImportStatementsResult::InvalidImport => tracing::warn!(target: LOG_TARGET,
relay_parent = ?new_leaf,
?candidate_hash,
?session,
"Attempted import of on-chain statement of concluded dispute failed"),
}
}
Ok(())
}
async fn handle_incoming(
ctx: &mut impl SubsystemContext,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
message: DisputeCoordinatorMessage,
now: Timestamp,
metrics: &Metrics,
) -> Result<(), Error> {
match message {
DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt,
session,
statements,
pending_confirmation,
} => {
let outcome = handle_import_statements(
ctx,
overlay_db,
state,
candidate_hash,
MaybeCandidateReceipt::Provides(candidate_receipt),
session,
statements,
now,
metrics,
)
.await?;
pending_confirmation.send(outcome).map_err(|_| Error::OneshotSend)?;
},
DisputeCoordinatorMessage::RecentDisputes(rx) => {
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let _ = rx.send(recent_disputes.keys().cloned().collect());
},
DisputeCoordinatorMessage::ActiveDisputes(rx) => {
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let _ = rx.send(collect_active(recent_disputes, now));
},
DisputeCoordinatorMessage::QueryCandidateVotes(query, rx) => {
let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
if let Some(v) = overlay_db.load_candidate_votes(session_index, &candidate_hash)? {
query_output.push((session_index, candidate_hash, v.into()));
} else {
tracing::debug!(
target: LOG_TARGET,
session_index,
"No votes found for candidate",
);
}
}
let _ = rx.send(query_output);
},
DisputeCoordinatorMessage::IssueLocalStatement(
session,
candidate_hash,
candidate_receipt,
valid,
) => {
issue_local_statement(
ctx,
overlay_db,
state,
candidate_hash,
candidate_receipt,
session,
valid,
now,
metrics,
)
.await?;
},
DisputeCoordinatorMessage::DetermineUndisputedChain {
base: (base_number, base_hash),
block_descriptions,
tx,
} => {
let undisputed_chain =
determine_undisputed_chain(overlay_db, base_number, base_hash, block_descriptions)?;
let _ = tx.send(undisputed_chain);
},
}
Ok(())
}
fn collect_active(
recent_disputes: RecentDisputes,
now: Timestamp,
) -> Vec<(SessionIndex, CandidateHash)> {
recent_disputes
.iter()
.filter_map(|(disputed, status)| {
status
.concluded_at()
.filter(|at| at + ACTIVE_DURATION_SECS < now)
.map_or(Some(*disputed), |_| None)
})
.collect()
}
fn insert_into_statement_vec(
vec: &mut Vec<(T, ValidatorIndex, ValidatorSignature)>,
tag: T,
val_index: ValidatorIndex,
val_signature: ValidatorSignature,
) {
let pos = match vec.binary_search_by_key(&val_index, |x| x.1) {
Ok(_) => return, // no duplicates needed.
Err(p) => p,
};
vec.insert(pos, (tag, val_index, val_signature));
}
#[derive(Debug, Clone)]
enum MaybeCandidateReceipt {
/// Directly provides the candiate receipt.
Provides(CandidateReceipt),
/// Assumes it was seen before by means of seconded message.
AssumeBackingVotePresent,
}
async fn handle_import_statements(
ctx: &mut impl SubsystemContext,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
candidate_hash: CandidateHash,
candidate_receipt: MaybeCandidateReceipt,
session: SessionIndex,
statements: impl IntoIterator
- ,
now: Timestamp,
metrics: &Metrics,
) -> Result {
if state.highest_session.map_or(true, |h| session + DISPUTE_WINDOW < h) {
// It is not valid to participate in an ancient dispute (spam?).
return Ok(ImportStatementsResult::InvalidImport)
}
let session_info = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
return Ok(ImportStatementsResult::InvalidImport)
},
Some(info) => info,
};
let validators = session_info.validators.clone();
let n_validators = validators.len();
let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold(n_validators);
// In case we are not provided with a candidate receipt
// we operate under the assumption, that a previous vote
// which included a `CandidateReceipt` was seen.
// This holds since every block is preceeded by the `Backing`-phase.
//
// There is one exception: A sufficiently sophisticated attacker could prevent
// us from seeing the backing votes by witholding arbitrary blocks, and hence we do
// not have a `CandidateReceipt` available.
let mut votes = match overlay_db
.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
{
Some(votes) => votes,
None =>
if let MaybeCandidateReceipt::Provides(candidate_receipt) = candidate_receipt {
CandidateVotes { candidate_receipt, valid: Vec::new(), invalid: Vec::new() }
} else {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
return Ok(ImportStatementsResult::InvalidImport)
},
};
let candidate_receipt = votes.candidate_receipt.clone();
// Update candidate votes.
for (statement, val_index) in statements {
if validators
.get(val_index.0 as usize)
.map_or(true, |v| v != statement.validator_public())
{
tracing::debug!(
target: LOG_TARGET,
?val_index,
session,
claimed_key = ?statement.validator_public(),
"Validator index doesn't match claimed key",
);
continue
}
match statement.statement().clone() {
DisputeStatement::Valid(valid_kind) => {
metrics.on_valid_vote();
insert_into_statement_vec(
&mut votes.valid,
valid_kind,
val_index,
statement.validator_signature().clone(),
);
},
DisputeStatement::Invalid(invalid_kind) => {
metrics.on_invalid_vote();
insert_into_statement_vec(
&mut votes.invalid,
invalid_kind,
val_index,
statement.validator_signature().clone(),
);
},
}
}
// Check if newly disputed.
let is_disputed = !votes.valid.is_empty() && !votes.invalid.is_empty();
let concluded_valid = votes.valid.len() >= supermajority_threshold;
let concluded_invalid = votes.invalid.len() >= supermajority_threshold;
let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone());
let status = if is_disputed {
let status = recent_disputes.entry((session, candidate_hash)).or_insert_with(|| {
tracing::info!(
target: LOG_TARGET,
?candidate_hash,
session,
"New dispute initiated for candidate.",
);
DisputeStatus::active()
});
// Note: concluded-invalid overwrites concluded-valid,
// so we do this check first. Dispute state machine is
// non-commutative.
if concluded_valid {
*status = status.concluded_for(now);
}
if concluded_invalid {
*status = status.concluded_against(now);
}
Some(*status)
} else {
None
};
if status != prev_status {
// This branch is only hit when the candidate is freshly disputed -
// status was previously `None`, and now is not.
if prev_status.is_none() && {
let controlled_indices =
find_controlled_validator_indices(&state.keystore, &validators);
let voted_indices = votes.voted_indices();
!controlled_indices.iter().all(|val_index| voted_indices.contains(&val_index))
} {
// If the dispute is new, we participate UNLESS all our controlled
// keys have already participated.
//
// We also block the coordinator while awaiting our determination
// of whether the vote is available.
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
n_validators: n_validators as u32,
report_availability,
})
.await;
if !receive_availability.await.map_err(Error::Oneshot)? {
// If the data is not available, we disregard the dispute votes.
// This is an indication that the dispute does not correspond to any included
// candidate and that it should be ignored.
//
// We expect that if the candidate is truly disputed that the higher-level network
// code will retry.
tracing::debug!(
target: LOG_TARGET,
"Recovering availability failed - invalid import."
);
return Ok(ImportStatementsResult::InvalidImport)
}
metrics.on_open();
if concluded_valid {
metrics.on_concluded_valid();
}
if concluded_invalid {
metrics.on_concluded_invalid();
}
}
// Only write when updated and vote is available.
overlay_db.write_recent_disputes(recent_disputes);
}
overlay_db.write_candidate_votes(session, candidate_hash, votes.into());
Ok(ImportStatementsResult::ValidImport)
}
fn find_controlled_validator_indices(
keystore: &LocalKeystore,
validators: &[ValidatorId],
) -> HashSet {
let mut controlled = HashSet::new();
for (index, validator) in validators.iter().enumerate() {
if keystore.key_pair::(validator).ok().flatten().is_none() {
continue
}
controlled.insert(ValidatorIndex(index as _));
}
controlled
}
async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
valid: bool,
now: Timestamp,
metrics: &Metrics,
) -> Result<(), Error> {
// Load session info.
let info = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
return Ok(())
},
Some(info) => info,
};
let validators = info.validators.clone();
let votes = overlay_db
.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
valid: Vec::new(),
invalid: Vec::new(),
});
// Sign a statement for each validator index we control which has
// not already voted. This should generally be maximum 1 statement.
let voted_indices = votes.voted_indices();
let mut statements = Vec::new();
let voted_indices: HashSet<_> = voted_indices.into_iter().collect();
let controlled_indices = find_controlled_validator_indices(&state.keystore, &validators[..]);
for index in controlled_indices {
if voted_indices.contains(&index) {
continue
}
let keystore = state.keystore.clone() as Arc<_>;
let res = SignedDisputeStatement::sign_explicit(
&keystore,
valid,
candidate_hash,
session,
validators[index.0 as usize].clone(),
)
.await;
match res {
Ok(Some(signed_dispute_statement)) => {
statements.push((signed_dispute_statement, index));
},
Ok(None) => {},
Err(e) => {
tracing::error!(
target: LOG_TARGET,
err = ?e,
"Encountered keystore error while signing dispute statement",
);
},
}
}
// Get our message out:
for (statement, index) in &statements {
let dispute_message = match make_dispute_message(info, &votes, statement.clone(), *index) {
Err(err) => {
tracing::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
continue
},
Ok(dispute_message) => dispute_message,
};
ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
}
// Do import
if !statements.is_empty() {
match handle_import_statements(
ctx,
overlay_db,
state,
candidate_hash,
MaybeCandidateReceipt::Provides(candidate_receipt),
session,
statements,
now,
metrics,
)
.await
{
Err(_) => {
tracing::error!(
target: LOG_TARGET,
?candidate_hash,
?session,
"pending confirmation receiver got dropped by `handle_import_statements` for our own votes!"
);
},
Ok(ImportStatementsResult::InvalidImport) => {
tracing::error!(
target: LOG_TARGET,
?candidate_hash,
?session,
"`handle_import_statements` considers our own votes invalid!"
);
},
Ok(ImportStatementsResult::ValidImport) => {
tracing::trace!(
target: LOG_TARGET,
?candidate_hash,
?session,
"`handle_import_statements` successfully imported our vote!"
);
},
}
}
Ok(())
}
#[derive(Debug, thiserror::Error)]
enum DisputeMessageCreationError {
#[error("There was no opposite vote available")]
NoOppositeVote,
#[error("Found vote had an invalid validator index that could not be found")]
InvalidValidatorIndex,
#[error("Statement found in votes had invalid signature.")]
InvalidStoredStatement,
#[error(transparent)]
InvalidStatementCombination(DisputeMessageCheckError),
}
fn make_dispute_message(
info: &SessionInfo,
votes: &CandidateVotes,
our_vote: SignedDisputeStatement,
our_index: ValidatorIndex,
) -> Result {
let validators = &info.validators;
let (valid_statement, valid_index, invalid_statement, invalid_index) =
if let DisputeStatement::Valid(_) = our_vote.statement() {
let (statement_kind, validator_index, validator_signature) =
votes.invalid.get(0).ok_or(DisputeMessageCreationError::NoOppositeVote)?.clone();
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Invalid(statement_kind),
our_vote.candidate_hash().clone(),
our_vote.session_index(),
validators
.get(validator_index.0 as usize)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature,
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(our_vote, our_index, other_vote, validator_index)
} else {
let (statement_kind, validator_index, validator_signature) =
votes.valid.get(0).ok_or(DisputeMessageCreationError::NoOppositeVote)?.clone();
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Valid(statement_kind),
our_vote.candidate_hash().clone(),
our_vote.session_index(),
validators
.get(validator_index.0 as usize)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature,
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(other_vote, validator_index, our_vote, our_index)
};
DisputeMessage::from_signed_statements(
valid_statement,
valid_index,
invalid_statement,
invalid_index,
votes.candidate_receipt.clone(),
info,
)
.map_err(DisputeMessageCreationError::InvalidStatementCombination)
}
/// Determine the the best block and its block number.
/// Assumes `block_descriptions` are sorted from the one
/// with the lowest `BlockNumber` to the highest.
fn determine_undisputed_chain(
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
base_number: BlockNumber,
base_hash: Hash,
block_descriptions: Vec,
) -> Result<(BlockNumber, Hash), Error> {
let last = block_descriptions
.last()
.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.block_hash))
.unwrap_or((base_number, base_hash));
// Fast path for no disputes.
let recent_disputes = match overlay_db.load_recent_disputes()? {
None => return Ok(last),
Some(a) if a.is_empty() => return Ok(last),
Some(a) => a,
};
let is_possibly_invalid = |session, candidate_hash| {
recent_disputes
.get(&(session, candidate_hash))
.map_or(false, |status| status.is_possibly_invalid())
};
for (i, BlockDescription { session, candidates, .. }) in block_descriptions.iter().enumerate() {
if candidates.iter().any(|c| is_possibly_invalid(*session, *c)) {
if i == 0 {
return Ok((base_number, base_hash))
} else {
return Ok((base_number + i as BlockNumber, block_descriptions[i - 1].block_hash))
}
}
}
Ok(last)
}