Unverified Commit f9d71f8c authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Dispute distribution implementation (#3282)

* Dispute protocol.

* Dispute distribution protocol.

* Get network requests routed.

* WIP: Basic dispute sender logic.

* Basic validator determination logic.

* WIP: Getting things to typecheck.

* Slightly larger timeout.

* More typechecking stuff.

* Cleanup.

* Finished most of the sending logic.

* Handle active leaves updates

- Cleanup dead disputes
- Update sends for new sessions
- Retry on errors

* Pass sessions in already.

* Startup dispute sending.

* Provide incoming decoding facilities

and use them in statement-distribution.

* Relaxed runtime util requirements.

We only need a `SubsystemSender` not a full `SubsystemContext`.

* Better usability of incoming requests.

Make it possible to consume stuff without clones.

* Add basic receiver functionality.

* Cleanup + fixes for sender.

* One more sender fix.

* Start receiver.

* Make sure to send responses back.

* WIP: Exposed authority discovery

* Make tests pass.

* Fully featured receiver.

* Decrease cost of `NotAValidator`.

* Make `RuntimeInfo` LRU cache size configurable.

* Cache more sessions.

* Fix collator protocol.

* Disable metrics for now.

* Make dispute-distribution a proper subsystem.

* Fix naming.

* Code style fixes.

* Factored out 4x copied mock function.

* WIP: Tests.

* Whitespace cleanup.

* Accessor functions.

* More testing.

* More Debug instances.

* Fix busy loop.

* Working tests.

* More tests.

* Cleanup.

* Fix build.

* Basic receiving test.

* Non validator message gets dropped.

* More receiving tests.

* Test nested and subsequent imports.

* Fix spaces.

* Better formatted imports.

* Import cleanup.

* Metrics.

* Message -> MuxedMessage

* Message -> MuxedMessage

* More review remarks.

* Add missing metrics.rs.

* Fix flaky test.

* Dispute coordinator - deliver confirmations.

* Send out `DisputeMessage` on issue local statement.

* Unwire dispute distribution.

* Review remarks.

* Review remarks.

* Better docs.
parent 519df6ca
Pipeline #146453 passed with stages
in 37 minutes and 48 seconds
......@@ -5816,7 +5816,6 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-primitives",
"rand 0.8.4",
"sc-keystore",
"sc-network",
"smallvec 1.6.1",
"sp-application-crypto",
......@@ -5944,6 +5943,39 @@ dependencies = [
"sp-std",
]
[[package]]
name = "polkadot-dispute-distribution"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"futures 0.3.15",
"futures-timer 3.0.2",
"lazy_static",
"lru",
"maplit",
"parity-scale-codec",
"polkadot-erasure-coding",
"polkadot-node-core-runtime-api",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"rand 0.8.4",
"sc-keystore",
"sc-network",
"smallvec 1.6.1",
"sp-application-crypto",
"sp-core",
"sp-keyring",
"sp-keystore",
"sp-tracing",
"thiserror",
"tracing",
]
[[package]]
name = "polkadot-erasure-coding"
version = "0.9.8"
......@@ -5970,7 +6002,6 @@ dependencies = [
"polkadot-primitives",
"rand 0.8.4",
"rand_chacha 0.3.1",
"sc-keystore",
"sp-application-crypto",
"sp-consensus-babe",
"sp-core",
......@@ -5995,7 +6026,6 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-primitives",
"sc-authority-discovery",
"sc-network",
"sp-consensus",
"sp-core",
......@@ -6350,11 +6380,13 @@ dependencies = [
name = "polkadot-node-network-protocol"
version = "0.1.0"
dependencies = [
"async-trait",
"futures 0.3.15",
"parity-scale-codec",
"polkadot-node-jaeger",
"polkadot-node-primitives",
"polkadot-primitives",
"sc-authority-discovery",
"sc-network",
"strum",
"thiserror",
......@@ -6379,6 +6411,7 @@ dependencies = [
"sp-maybe-compressed-blob",
"sp-runtime",
"thiserror",
"tracing",
"zstd",
]
......@@ -6407,9 +6440,13 @@ dependencies = [
"polkadot-overseer",
"polkadot-primitives",
"polkadot-statement-table",
"sc-keystore",
"sc-network",
"smallvec 1.6.1",
"sp-application-crypto",
"sp-core",
"sp-keyring",
"sp-keystore",
"tracing",
]
......@@ -6828,6 +6865,7 @@ dependencies = [
"polkadot-availability-recovery",
"polkadot-client",
"polkadot-collator-protocol",
"polkadot-dispute-distribution",
"polkadot-gossip-support",
"polkadot-network-bridge",
"polkadot-node-collation-generation",
......
......@@ -64,6 +64,7 @@ members = [
"node/network/availability-recovery",
"node/network/collator-protocol",
"node/network/gossip-support",
"node/network/dispute-distribution",
"node/overseer",
"node/overseer/overseer-gen",
"node/overseer/overseer-gen/proc-macro",
......
......@@ -28,22 +28,21 @@
use std::collections::HashSet;
use std::sync::Arc;
use polkadot_node_primitives::{CandidateVotes, SignedDisputeStatement};
use polkadot_node_primitives::{CandidateVotes, DISPUTE_WINDOW, DisputeMessage, SignedDisputeStatement, DisputeMessageCheckError};
use polkadot_node_subsystem::{
overseer,
messages::{
DisputeCoordinatorMessage, ChainApiMessage, DisputeParticipationMessage,
},
SubsystemContext, FromOverseer, OverseerSignal, SpawnedSubsystem,
SubsystemError,
overseer, SubsystemContext, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemError,
errors::{ChainApiError, RuntimeApiError},
messages::{
ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult
}
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
};
use polkadot_primitives::v1::{
SessionIndex, CandidateHash, Hash, CandidateReceipt, DisputeStatement, ValidatorIndex,
ValidatorSignature, BlockNumber, ValidatorPair,
BlockNumber, CandidateHash, CandidateReceipt, DisputeStatement, Hash,
SessionIndex, SessionInfo, ValidatorIndex, ValidatorPair, ValidatorSignature
};
use futures::prelude::*;
......@@ -61,10 +60,6 @@ mod tests;
const LOG_TARGET: &str = "parachain::dispute-coordinator";
// It would be nice to draw this from the chain state, but we have no tools for it right now.
// On Polkadot this is 1 day, and on Kusama it's 6 hours.
const DISPUTE_WINDOW: SessionIndex = 6;
struct State {
keystore: Arc<LocalKeystore>,
highest_session: Option<SessionIndex>,
......@@ -134,6 +129,9 @@ pub enum Error {
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error("Oneshot send failed")]
OneshotSend,
#[error(transparent)]
Subsystem(#[from] SubsystemError),
......@@ -308,6 +306,7 @@ async fn handle_incoming(
candidate_receipt,
session,
statements,
pending_confirmation,
} => {
handle_import_statements(
ctx,
......@@ -318,6 +317,7 @@ async fn handle_incoming(
candidate_receipt,
session,
statements,
pending_confirmation,
).await?;
}
DisputeCoordinatorMessage::ActiveDisputes(rx) => {
......@@ -400,8 +400,13 @@ async fn handle_import_statements(
candidate_receipt: CandidateReceipt,
session: SessionIndex,
statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
pending_confirmation: oneshot::Sender<ImportStatementsResult>,
) -> Result<(), Error> {
if state.highest_session.map_or(true, |h| session + DISPUTE_WINDOW < h) {
// It is not valid to participate in an ancient dispute (spam?).
pending_confirmation.send(ImportStatementsResult::InvalidImport).map_err(|_| Error::OneshotSend)?;
return Ok(());
}
......@@ -479,37 +484,54 @@ async fn handle_import_statements(
let already_disputed = is_disputed && !was_undisputed;
let concluded_valid = votes.valid.len() >= supermajority_threshold;
let mut tx = db::v1::Transaction::default();
{ // Scope so we will only confirm valid import after the import got actually persisted.
let mut tx = db::v1::Transaction::default();
if freshly_disputed && !concluded_valid {
// add to active disputes and begin local participation.
update_active_disputes(
store,
config,
&mut tx,
|active| active.insert(session, candidate_hash),
)?;
if freshly_disputed && !concluded_valid {
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
n_validators: n_validators as u32,
}).await;
}
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
n_validators: n_validators as u32,
report_availability,
}).await;
if concluded_valid && already_disputed {
// remove from active disputes.
update_active_disputes(
store,
config,
&mut tx,
|active| active.delete(session, candidate_hash),
)?;
if !receive_availability.await.map_err(Error::Oneshot)? {
pending_confirmation.send(ImportStatementsResult::InvalidImport).map_err(|_| Error::OneshotSend)?;
tracing::debug!(
target: LOG_TARGET,
"Recovering availability failed - invalid import."
);
return Ok(())
}
// add to active disputes and begin local participation.
update_active_disputes(
store,
config,
&mut tx,
|active| active.insert(session, candidate_hash),
)?;
}
if concluded_valid && already_disputed {
// remove from active disputes.
update_active_disputes(
store,
config,
&mut tx,
|active| active.delete(session, candidate_hash),
)?;
}
tx.put_candidate_votes(session, candidate_hash, votes.into());
tx.write(store, &config.column_config())?;
}
tx.put_candidate_votes(session, candidate_hash, votes.into());
tx.write(store, &config.column_config())?;
pending_confirmation.send(ImportStatementsResult::ValidImport).map_err(|_| Error::OneshotSend)?;
Ok(())
}
......@@ -541,7 +563,7 @@ async fn issue_local_statement(
valid: bool,
) -> Result<(), Error> {
// Load session info.
let validators = match state.rolling_session_window.session_info(session) {
let info = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
......@@ -551,9 +573,11 @@ async fn issue_local_statement(
return Ok(())
}
Some(info) => info.validators.clone(),
Some(info) => info,
};
let validators = info.validators.clone();
let votes = db::v1::load_candidate_votes(
store,
&config.column_config(),
......@@ -604,8 +628,27 @@ async fn issue_local_statement(
}
}
// Get our message out:
for (statement, index) in &statements {
let dispute_message = match make_dispute_message(info, &votes, statement.clone(), *index) {
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
?err,
"Creating dispute message failed."
);
continue
}
Ok(dispute_message) => dispute_message,
};
ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
}
// Do import
if !statements.is_empty() {
let (pending_confirmation, _rx) = oneshot::channel();
handle_import_statements(
ctx,
store,
......@@ -615,12 +658,67 @@ async fn issue_local_statement(
candidate_receipt,
session,
statements,
pending_confirmation,
).await?;
}
Ok(())
}
#[derive(Debug, thiserror::Error)]
enum MakeDisputeMessageError {
#[error("There was no opposite vote available")]
NoOppositeVote,
#[error("Found vote had an invalid validator index that could not be found")]
InvalidValidatorIndex,
#[error("Statement found in votes had invalid signature.")]
InvalidStoredStatement,
#[error(transparent)]
InvalidStatementCombination(DisputeMessageCheckError),
}
fn make_dispute_message(
info: &SessionInfo,
votes: &CandidateVotes,
our_vote: SignedDisputeStatement,
our_index: ValidatorIndex
) -> Result<DisputeMessage, MakeDisputeMessageError> {
let validators = &info.validators;
let (valid_statement, valid_index, invalid_statement, invalid_index) =
if let DisputeStatement::Valid(_) = our_vote.statement() {
let (statement_kind, validator_index, validator_signature)
= votes.invalid.get(0).ok_or(MakeDisputeMessageError::NoOppositeVote)?.clone();
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Invalid(statement_kind),
our_vote.candidate_hash().clone(),
our_vote.session_index(),
validators.get(validator_index.0 as usize).ok_or(MakeDisputeMessageError::InvalidValidatorIndex)?.clone(),
validator_signature,
).map_err(|()| MakeDisputeMessageError::InvalidStoredStatement)?;
(our_vote, our_index, other_vote, validator_index)
} else {
let (statement_kind, validator_index, validator_signature)
= votes.valid.get(0).ok_or(MakeDisputeMessageError::NoOppositeVote)?.clone();
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Valid(statement_kind),
our_vote.candidate_hash().clone(),
our_vote.session_index(),
validators.get(validator_index.0 as usize).ok_or(MakeDisputeMessageError::InvalidValidatorIndex)?.clone(),
validator_signature,
).map_err(|()| MakeDisputeMessageError::InvalidStoredStatement)?;
(other_vote, validator_index, our_vote, our_index)
};
DisputeMessage::from_signed_statements(
valid_statement, valid_index,
invalid_statement, invalid_index,
votes.candidate_receipt.clone(),
info,
).map_err(MakeDisputeMessageError::InvalidStatementCombination)
}
fn determine_undisputed_chain(
store: &dyn KeyValueDB,
config: &Config,
......
......@@ -25,7 +25,10 @@ use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystem
use sp_core::testing::TaskExecutor;
use sp_keyring::Sr25519Keyring;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use futures::future::{self, BoxFuture};
use futures::{
channel::oneshot,
future::{self, BoxFuture},
};
use parity_scale_codec::Encode;
use assert_matches::assert_matches;
......@@ -261,6 +264,7 @@ fn conflicting_votes_lead_to_dispute_participation() {
false,
).await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
......@@ -270,9 +274,9 @@ fn conflicting_votes_lead_to_dispute_participation() {
(valid_vote, ValidatorIndex(0)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
},
}).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(DisputeParticipationMessage::Participate {
......@@ -280,11 +284,13 @@ fn conflicting_votes_lead_to_dispute_participation() {
candidate_receipt: c_receipt,
session: s,
n_validators,
report_availability,
}) => {
assert_eq!(c_hash, candidate_hash);
assert_eq!(c_receipt, candidate_receipt);
assert_eq!(s, session);
assert_eq!(n_validators, test_state.validators.len() as u32);
report_availability.send(true).unwrap();
}
);
......@@ -310,6 +316,7 @@ fn conflicting_votes_lead_to_dispute_participation() {
assert_eq!(votes.invalid.len(), 1);
}
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
......@@ -318,6 +325,7 @@ fn conflicting_votes_lead_to_dispute_participation() {
statements: vec![
(invalid_vote_2, ValidatorIndex(2)),
],
pending_confirmation,
},
}).await;
......@@ -371,6 +379,7 @@ fn positive_votes_dont_trigger_participation() {
true,
).await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
......@@ -379,6 +388,7 @@ fn positive_votes_dont_trigger_participation() {
statements: vec![
(valid_vote, ValidatorIndex(0)),
],
pending_confirmation,
},
}).await;
......@@ -404,6 +414,7 @@ fn positive_votes_dont_trigger_participation() {
assert!(votes.invalid.is_empty());
}
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
......@@ -412,6 +423,7 @@ fn positive_votes_dont_trigger_participation() {
statements: vec![
(valid_vote_2, ValidatorIndex(1)),
],
pending_confirmation,
},
}).await;
......@@ -472,6 +484,7 @@ fn wrong_validator_index_is_ignored() {
false,
).await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
......@@ -481,6 +494,7 @@ fn wrong_validator_index_is_ignored() {
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(0)),
],
pending_confirmation,
},
}).await;
......@@ -541,6 +555,7 @@ fn finality_votes_ignore_disputed_candidates() {
false,
).await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
......@@ -550,9 +565,21 @@ fn finality_votes_ignore_disputed_candidates() {
(valid_vote, ValidatorIndex(0)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
},
}).await;
let _ = virtual_overseer.recv().await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
report_availability,
..
}
) => {
report_availability.send(true).unwrap();
}
);
{
let (tx, rx) = oneshot::channel();
......@@ -624,6 +651,7 @@ fn supermajority_valid_dispute_may_be_finalized() {
false,
).await;
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
......@@ -633,6 +661,7 @@ fn supermajority_valid_dispute_may_be_finalized() {
(valid_vote, ValidatorIndex(0)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
},
}).await;
......@@ -650,12 +679,14 @@ fn supermajority_valid_dispute_may_be_finalized() {
statements.push((vote, ValidatorIndex(i as _)));
};
let (pending_confirmation, _confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements,
pending_confirmation,
},
}).await;
......
......@@ -83,6 +83,9 @@ pub enum Error {
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error("Oneshot receiver died")]
OneshotSendFailed,
#[error(transparent)]
Participation(#[from] ParticipationError),
}
......@@ -159,6 +162,7 @@ async fn handle_incoming(
candidate_receipt,
session,
n_validators,
report_availability,
} => {
if let Some((_, block_hash)) = state.recent_block {
participate(
......@@ -168,6 +172,7 @@ async fn handle_incoming(
candidate_receipt,
session,
n_validators,
report_availability,
)
.await
} else {
......@@ -184,6 +189,7 @@ async fn participate(
candidate_receipt: CandidateReceipt,
session: SessionIndex,
n_validators: u32,
report_availability: oneshot::Sender<bool>,
) -> Result<(), Error> {
let (recover_available_data_tx, recover_available_data_rx) = oneshot::channel();
let (code_tx, code_rx) = oneshot::channel();
......@@ -203,14 +209,21 @@ async fn participate(
.await;
let available_data = match recover_available_data_rx.await? {
Ok(data) => data,
Ok(data) => {
report_availability.send(true).map_err(|_| Error::OneshotSendFailed)?;
data
}
Err(RecoveryError::Invalid) => {
report_availability.send(true).map_err(|_| Error::OneshotSendFailed)?;
// the available data was recovered but it is invalid, therefore we'll
// vote negatively for the candidate dispute
cast_invalid_vote(ctx, candidate_hash, candidate_receipt, session).await;
return Ok(());
}
Err(RecoveryError::Unavailable) => {
report_availability.send(false).map_err(|_| Error::OneshotSendFailed)?;
return Err(ParticipationError::MissingAvailableData(candidate_hash).into());
}
};
......
......@@ -80,7 +80,7 @@ async fn activate_leaf(virtual_overseer: &mut VirtualOverseer, block_number: Blo
.await;
}
async fn participate(virtual_overseer: &mut VirtualOverseer) {
async fn participate(virtual_overseer: &mut VirtualOverseer) -> oneshot::Receiver<bool> {
let commitments = CandidateCommitments::default();
let candidate_receipt = {
let mut receipt = CandidateReceipt::default();
......@@ -91,6 +91,8 @@ async fn participate(virtual_overseer: &mut VirtualOverseer) {
let session = 1;
let n_validators = 10;