Unverified Commit 045a930a authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

participate in disputes only if haven't voted already (#3796)



* participate in disputes only if haven't voted already

* Add tests for dispute participation on local import.

* Fixes.

* cargo fmt

* Get rid of redundant dependency.

Co-authored-by: default avatarRobert Klotzner <robert.klotzner@gmx.at>
parent 49128c88
Pipeline #156115 failed with stages
in 20 minutes and 9 seconds
......@@ -48,7 +48,7 @@ use polkadot_node_subsystem_util::rolling_session_window::{
};
use polkadot_primitives::v1::{
BlockNumber, CandidateHash, CandidateReceipt, DisputeStatement, Hash, SessionIndex,
SessionInfo, ValidatorIndex, ValidatorPair, ValidatorSignature,
SessionInfo, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
};
use futures::{channel::oneshot, prelude::*};
......@@ -745,8 +745,15 @@ async fn handle_import_statements(
if status != prev_status {
// This branch is only hit when the candidate is freshly disputed -
// status was previously `None`, and now is not.
if prev_status.is_none() {
// No matter what, if the dispute is new, we participate.
if prev_status.is_none() && {
let controlled_indices =
find_controlled_validator_indices(&state.keystore, &validators);
let voted_indices = votes.voted_indices();
!controlled_indices.iter().all(|val_index| voted_indices.contains(&val_index))
} {
// If the dispute is new, we participate UNLESS all our controlled
// keys have already participated.
//
// We also block the coordinator while awaiting our determination
// of whether the vote is available.
......@@ -792,6 +799,22 @@ async fn handle_import_statements(
Ok(())
}
fn find_controlled_validator_indices(
keystore: &LocalKeystore,
validators: &[ValidatorId],
) -> HashSet<ValidatorIndex> {
let mut controlled = HashSet::new();
for (index, validator) in validators.iter().enumerate() {
if keystore.key_pair::<ValidatorPair>(validator).ok().flatten().is_none() {
continue
}
controlled.insert(ValidatorIndex(index as _));
}
controlled
}
async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
......@@ -833,14 +856,12 @@ async fn issue_local_statement(
let mut statements = Vec::new();
let voted_indices: HashSet<_> = voted_indices.into_iter().collect();
for (index, validator) in validators.iter().enumerate() {
let index = ValidatorIndex(index as _);
let controlled_indices = find_controlled_validator_indices(&state.keystore, &validators[..]);
for index in controlled_indices {
if voted_indices.contains(&index) {
continue
}
if state.keystore.key_pair::<ValidatorPair>(validator).ok().flatten().is_none() {
continue
}
let keystore = state.keystore.clone() as Arc<_>;
let res = SignedDisputeStatement::sign_explicit(
......@@ -848,7 +869,7 @@ async fn issue_local_statement(
valid,
candidate_hash,
session,
validator.clone(),
validators[index.0 as usize].clone(),
)
.await;
......
......@@ -107,6 +107,7 @@ impl Default for TestState {
Sr25519Keyring::Dave,
Sr25519Keyring::Eve,
Sr25519Keyring::One,
Sr25519Keyring::Ferdie,
];
let validator_public = validators.iter().map(|k| ValidatorId::from(k.public())).collect();
......@@ -114,7 +115,7 @@ impl Default for TestState {
let validator_groups = vec![
vec![ValidatorIndex(0), ValidatorIndex(1)],
vec![ValidatorIndex(2), ValidatorIndex(3)],
vec![ValidatorIndex(4), ValidatorIndex(5)],
vec![ValidatorIndex(4), ValidatorIndex(5), ValidatorIndex(6)],
];
let master_keystore = make_keystore(&validators).into();
......@@ -316,7 +317,7 @@ fn conflicting_votes_lead_to_dispute_participation() {
test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await;
let valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
test_state.issue_statement_with_index(3, candidate_hash, session, true).await;
let invalid_vote =
test_state.issue_statement_with_index(1, candidate_hash, session, false).await;
......@@ -332,7 +333,7 @@ fn conflicting_votes_lead_to_dispute_participation() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(3)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
......@@ -434,7 +435,7 @@ fn positive_votes_dont_trigger_participation() {
test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await;
let valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
test_state.issue_statement_with_index(2, candidate_hash, session, true).await;
let valid_vote_2 =
test_state.issue_statement_with_index(1, candidate_hash, session, true).await;
......@@ -446,7 +447,7 @@ fn positive_votes_dont_trigger_participation() {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![(valid_vote, ValidatorIndex(0))],
statements: vec![(valid_vote, ValidatorIndex(2))],
pending_confirmation,
},
})
......@@ -539,7 +540,7 @@ fn wrong_validator_index_is_ignored() {
test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await;
let valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
test_state.issue_statement_with_index(2, candidate_hash, session, true).await;
let invalid_vote =
test_state.issue_statement_with_index(1, candidate_hash, session, false).await;
......@@ -553,7 +554,7 @@ fn wrong_validator_index_is_ignored() {
session,
statements: vec![
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(0)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
},
......@@ -609,7 +610,7 @@ fn finality_votes_ignore_disputed_candidates() {
test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await;
let valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
test_state.issue_statement_with_index(2, candidate_hash, session, true).await;
let invalid_vote =
test_state.issue_statement_with_index(1, candidate_hash, session, false).await;
......@@ -622,7 +623,7 @@ fn finality_votes_ignore_disputed_candidates() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
......@@ -715,7 +716,7 @@ fn supermajority_valid_dispute_may_be_finalized() {
polkadot_primitives::v1::supermajority_threshold(test_state.validators.len());
let valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
test_state.issue_statement_with_index(2, candidate_hash, session, true).await;
let invalid_vote =
test_state.issue_statement_with_index(1, candidate_hash, session, false).await;
......@@ -728,7 +729,7 @@ fn supermajority_valid_dispute_may_be_finalized() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
......@@ -755,7 +756,7 @@ fn supermajority_valid_dispute_may_be_finalized() {
);
let mut statements = Vec::new();
for i in (0..supermajority_threshold - 1).map(|i| i + 2) {
for i in (0..supermajority_threshold - 1).map(|i| i + 3) {
let vote =
test_state.issue_statement_with_index(i, candidate_hash, session, true).await;
......@@ -848,7 +849,7 @@ fn concluded_supermajority_for_non_active_after_time() {
polkadot_primitives::v1::supermajority_threshold(test_state.validators.len());
let valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
test_state.issue_statement_with_index(2, candidate_hash, session, true).await;
let invalid_vote =
test_state.issue_statement_with_index(1, candidate_hash, session, false).await;
......@@ -861,7 +862,7 @@ fn concluded_supermajority_for_non_active_after_time() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
......@@ -882,7 +883,7 @@ fn concluded_supermajority_for_non_active_after_time() {
);
let mut statements = Vec::new();
for i in (0..supermajority_threshold - 1).map(|i| i + 2) {
for i in (0..supermajority_threshold - 1).map(|i| i + 3) {
let vote =
test_state.issue_statement_with_index(i, candidate_hash, session, true).await;
......@@ -951,7 +952,7 @@ fn concluded_supermajority_against_non_active_after_time() {
polkadot_primitives::v1::supermajority_threshold(test_state.validators.len());
let valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
test_state.issue_statement_with_index(2, candidate_hash, session, true).await;
let invalid_vote =
test_state.issue_statement_with_index(1, candidate_hash, session, false).await;
......@@ -964,7 +965,7 @@ fn concluded_supermajority_against_non_active_after_time() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
......@@ -985,7 +986,7 @@ fn concluded_supermajority_against_non_active_after_time() {
);
let mut statements = Vec::new();
for i in (0..supermajority_threshold - 1).map(|i| i + 2) {
for i in (0..supermajority_threshold - 1).map(|i| i + 3) {
let vote =
test_state.issue_statement_with_index(i, candidate_hash, session, false).await;
......@@ -1051,7 +1052,7 @@ fn fresh_dispute_ignored_if_unavailable() {
test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await;
let valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
test_state.issue_statement_with_index(2, candidate_hash, session, true).await;
let invalid_vote =
test_state.issue_statement_with_index(1, candidate_hash, session, false).await;
......@@ -1064,7 +1065,7 @@ fn fresh_dispute_ignored_if_unavailable() {
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(2)),
(invalid_vote, ValidatorIndex(1)),
],
pending_confirmation,
......@@ -1298,18 +1299,6 @@ fn resume_dispute_with_local_statement() {
})
.await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
report_availability,
..
}
) => {
report_availability.send(true).unwrap();
}
);
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
......@@ -1330,7 +1319,7 @@ fn resume_dispute_with_local_statement() {
test_state
})
})
// Alice should send a DisputeParticiationMessage::Participate on restart since she has no
// Alice should not send a DisputeParticiationMessage::Participate on restart since she has a
// local statement for the active dispute.
.resume(|test_state, mut virtual_overseer| {
Box::pin(async move {
......@@ -1384,18 +1373,6 @@ fn resume_dispute_without_local_statement_or_local_key() {
})
.await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
report_availability,
..
}
) => {
report_availability.send(true).unwrap();
}
);
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
......@@ -1416,8 +1393,8 @@ fn resume_dispute_without_local_statement_or_local_key() {
test_state
})
})
// Alice should send a DisputeParticiationMessage::Participate on restart since she has no
// local statement for the active dispute.
// Two should not send a DisputeParticiationMessage::Participate on restart since she is no
// validator in that dispute.
.resume(|test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
......@@ -1437,89 +1414,197 @@ fn resume_dispute_without_local_statement_or_local_key() {
fn resume_dispute_with_local_statement_without_local_key() {
let session = 1;
let mut test_state = TestState::default();
test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into();
test_state
.resume(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let test_state = TestState::default();
let mut test_state = test_state.resume(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await;
test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await;
let local_valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
let local_valid_vote =
test_state.issue_statement_with_index(0, candidate_hash, session, true).await;
let valid_vote =
test_state.issue_statement_with_index(1, candidate_hash, session, true).await;
let valid_vote =
test_state.issue_statement_with_index(1, candidate_hash, session, true).await;
let invalid_vote =
test_state.issue_statement_with_index(2, candidate_hash, session, false).await;
let invalid_vote =
test_state.issue_statement_with_index(2, candidate_hash, session, false).await;
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(local_valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
},
})
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
{
let (tx, rx) = oneshot::channel();
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(local_valid_vote, ValidatorIndex(0)),
(valid_vote, ValidatorIndex(1)),
(invalid_vote, ValidatorIndex(2)),
],
pending_confirmation,
},
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(
DisputeParticipationMessage::Participate {
report_availability,
..
}
) => {
report_availability.send(true).unwrap();
}
);
assert_eq!(rx.await.unwrap().len(), 1);
}
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
{
let (tx, rx) = oneshot::channel();
test_state
})
});
// No keys:
test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into();
// Two should not send a DisputeParticiationMessage::Participate on restart since we gave
// her a non existing key.
test_state.resume(|test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
})
.await;
// Assert that subsystem is not sending Participation messages because we don't
// have a key.
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
assert_eq!(rx.await.unwrap().len(), 1);
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})
});
}
#[test]
fn issue_valid_local_statement_does_cause_distribution_but_not_duplicate_participation() {
issue_local_statement_does_cause_distribution_but_not_duplicate_participation(true);
}
#[test]
fn issue_invalid_local_statement_does_cause_distribution_but_not_duplicate_participation() {
issue_local_statement_does_cause_distribution_but_not_duplicate_participation(false);
}
fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation(validity: bool) {
test_harness(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
let session = 1;
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await;
let other_vote = test_state
.issue_statement_with_index(1, candidate_hash, session, !validity)
.await;
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![(other_vote, ValidatorIndex(1))],
pending_confirmation,
},
})
.await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));
// Initiate dispute locally:
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::IssueLocalStatement(
session,
candidate_hash,
candidate_receipt.clone(),
validity,
),
})
.await;
// Dispute distribution should get notified now:
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::SendDispute(msg)
) => {
assert_eq!(msg.session_index(), session);
assert_eq!(msg.candidate_receipt(), &candidate_receipt);
}
);
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
// Make sure we don't get a `DisputeParticiationMessage`.
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
test_state
})
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})
// Alice should send a DisputeParticiationMessage::Participate on restart since she has no
// local statement for the active dispute.
.resume(|test_state, mut virtual_overseer| {
Box::pin(async move {
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
});
}
// Assert that subsystem is not sending Participation messages because we issued a local statement
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
#[test]
fn negative_issue_local_statement_only_triggers_import() {
test_harness(|mut test_state, mut virtual_overseer| {
Box::pin(async move {
let session = 1;
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state.handle_resume_sync(&mut virtual_overseer, session).await;
test_state
})
});
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
test_state.activate_leaf_at_session(&mut virtual_overseer, session, 1).await;
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::IssueLocalStatement(
session,
candidate_hash,
candidate_receipt.clone(),
false,
),
})
.await;
let backend = DbBackend::new(test_state.db.clone(), test_state.config.column_config());
let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap();
assert_eq!(votes.invalid.len(), 1);
assert_eq!(votes.valid.len(), 0);
let disputes = backend.load_recent_disputes().unwrap();
assert_eq!(disputes, None);
// Assert that subsystem is not sending Participation messages:
assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none());
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
test_state
})
});
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment