Commit 9cd615e8 authored by asynchronous rob's avatar asynchronous rob
Browse files

remove most dispute coordinator functionality

as of #3222 we can do most of the work within the approval voting subsystem
parent eac3bb4a
Pipeline #141924 canceled with stages
in 5 minutes and 43 seconds
......@@ -110,15 +110,6 @@ pub struct ActiveDisputes {
}
impl ActiveDisputes {
/// Whether the set of active disputes contains the given candidate.
pub(crate) fn contains(
&self,
session: SessionIndex,
candidate_hash: CandidateHash,
) -> bool {
self.disputed.contains(&(session, candidate_hash))
}
/// Insert the session and candidate hash from the set of active disputes.
/// Returns 'true' if the entry was not already in the set.
pub(crate) fn insert(
......
......@@ -25,13 +25,12 @@
//! another node, this will trigger the dispute participation subsystem to recover and validate the block and call
//! back to this subsystem.
use std::collections::HashSet;
use std::sync::Arc;
use polkadot_node_primitives::{CandidateVotes, SignedDisputeStatement};
use polkadot_node_subsystem::{
messages::{
DisputeCoordinatorMessage, ChainApiMessage, DisputeParticipationMessage,
DisputeCoordinatorMessage, ChainApiMessage,
},
Subsystem, SubsystemContext, FromOverseer, OverseerSignal, SpawnedSubsystem,
SubsystemError,
......@@ -42,14 +41,13 @@ use polkadot_node_subsystem_util::rolling_session_window::{
};
use polkadot_primitives::v1::{
SessionIndex, CandidateHash, Hash, CandidateReceipt, DisputeStatement, ValidatorIndex,
ValidatorSignature, BlockNumber, ValidatorPair,
ValidatorSignature,
};
use futures::prelude::*;
use futures::channel::oneshot;
use kvdb::KeyValueDB;
use parity_scale_codec::Error as CodecError;
use sc_keystore::LocalKeystore;
use db::v1::ActiveDisputes;
......@@ -62,7 +60,6 @@ const LOG_TARGET: &str = "parachain::dispute-coordinator";
const DISPUTE_WINDOW: SessionIndex = 6;
struct State {
keystore: Arc<LocalKeystore>,
highest_session: Option<SessionIndex>,
rolling_session_window: RollingSessionWindow,
}
......@@ -84,7 +81,6 @@ impl Config {
pub struct DisputeCoordinatorSubsystem {
config: Config,
store: Arc<dyn KeyValueDB>,
keystore: Arc<LocalKeystore>,
}
impl DisputeCoordinatorSubsystem {
......@@ -92,9 +88,8 @@ impl DisputeCoordinatorSubsystem {
pub fn new(
store: Arc<dyn KeyValueDB>,
config: Config,
keystore: Arc<LocalKeystore>,
) -> Self {
DisputeCoordinatorSubsystem { store, config, keystore }
DisputeCoordinatorSubsystem { store, config }
}
}
......@@ -187,9 +182,8 @@ async fn run_iteration<Context>(ctx: &mut Context, subsystem: &DisputeCoordinato
-> Result<bool, Error>
where Context: SubsystemContext<Message = DisputeCoordinatorMessage>
{
let DisputeCoordinatorSubsystem { ref store, ref keystore, ref config } = *subsystem;
let DisputeCoordinatorSubsystem { ref store, ref config } = *subsystem;
let mut state = State {
keystore: keystore.clone(),
highest_session: None,
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
};
......@@ -211,12 +205,11 @@ async fn run_iteration<Context>(ctx: &mut Context, subsystem: &DisputeCoordinato
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
FromOverseer::Communication { msg } => {
handle_incoming(
ctx,
&**store,
&mut state,
config,
msg,
).await?
)?
}
}
}
......@@ -286,8 +279,7 @@ async fn handle_new_activations(
Ok(())
}
async fn handle_incoming(
ctx: &mut impl SubsystemContext,
fn handle_incoming(
store: &dyn KeyValueDB,
state: &mut State,
config: &Config,
......@@ -301,7 +293,6 @@ async fn handle_incoming(
statements,
} => {
handle_import_statements(
ctx,
store,
state,
config,
......@@ -309,7 +300,7 @@ async fn handle_incoming(
candidate_receipt,
session,
statements,
).await?;
)?;
}
DisputeCoordinatorMessage::ActiveDisputes(rx) => {
let active_disputes = db::v1::load_active_disputes(store, &config.column_config())?
......@@ -332,37 +323,6 @@ async fn handle_incoming(
let _ = rx.send(candidate_votes.map(Into::into));
}
DisputeCoordinatorMessage::IssueLocalStatement(
session,
candidate_hash,
candidate_receipt,
valid,
) => {
issue_local_statement(
ctx,
state,
store,
config,
candidate_hash,
candidate_receipt,
session,
valid,
).await?;
}
DisputeCoordinatorMessage::DetermineUndisputedChain {
base_number,
block_descriptions,
tx,
} => {
let undisputed_chain = determine_undisputed_chain(
store,
&config,
base_number,
block_descriptions
)?;
let _ = tx.send(undisputed_chain);
}
}
Ok(())
......@@ -382,8 +342,7 @@ fn insert_into_statement_vec<T>(
vec.insert(pos, (tag, val_index, val_signature));
}
async fn handle_import_statements(
ctx: &mut impl SubsystemContext,
fn handle_import_statements(
store: &dyn KeyValueDB,
state: &mut State,
config: &Config,
......@@ -480,15 +439,6 @@ async fn handle_import_statements(
&mut tx,
|active| active.insert(session, candidate_hash),
)?;
let voted_indices = votes.voted_indices();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
voted_indices,
}.into()).await;
}
if concluded_valid && already_disputed {
......@@ -523,133 +473,12 @@ fn update_active_disputes(
Ok(())
}
async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
state: &mut State,
store: &dyn KeyValueDB,
config: &Config,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
valid: bool,
) -> Result<(), Error> {
// Load session info.
let validators = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
return Ok(())
}
Some(info) => info.validators.clone(),
};
let votes = db::v1::load_candidate_votes(
store,
&config.column_config(),
session,
&candidate_hash
)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
valid: Vec::new(),
invalid: Vec::new(),
});
// Sign a statement for each validator index we control which has
// not already voted. This should generally be maximum 1 statement.
let voted_indices = votes.voted_indices();
let mut statements = Vec::new();
let voted_indices: HashSet<_> = voted_indices.into_iter().collect();
for (index, validator) in validators.iter().enumerate() {
let index = ValidatorIndex(index as _);
if voted_indices.contains(&index) { continue }
if state.keystore.key_pair::<ValidatorPair>(validator).ok().flatten().is_none() {
continue
}
let keystore = state.keystore.clone() as Arc<_>;
let res = SignedDisputeStatement::sign_explicit(
&keystore,
valid,
candidate_hash,
session,
validator.clone(),
).await;
match res {
Ok(Some(signed_dispute_statement)) => {
statements.push((signed_dispute_statement, index));
}
Ok(None) => {}
Err(e) => {
tracing::error!(
target: LOG_TARGET,
err = ?e,
"Encountered keystore error while signing dispute statement",
);
}
}
}
// Do import
if !statements.is_empty() {
handle_import_statements(
ctx,
store,
state,
config,
candidate_hash,
candidate_receipt,
session,
statements,
).await?;
}
Ok(())
}
fn determine_undisputed_chain(
store: &dyn KeyValueDB,
config: &Config,
base_number: BlockNumber,
block_descriptions: Vec<(Hash, SessionIndex, Vec<CandidateHash>)>,
) -> Result<Option<(BlockNumber, Hash)>, Error> {
let last = block_descriptions.last()
.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.0));
// Fast path for no disputes.
let active_disputes = match db::v1::load_active_disputes(store, &config.column_config())? {
None => return Ok(last),
Some(a) if a.disputed.is_empty() => return Ok(last),
Some(a) => a,
};
for (i, (_, session, candidates)) in block_descriptions.iter().enumerate() {
if candidates.iter().any(|c| active_disputes.contains(*session, *c)) {
if i == 0 {
return Ok(None);
} else {
return Ok(Some((
base_number + i as BlockNumber,
block_descriptions[i - 1].0,
)));
}
}
}
Ok(last)
}
#[cfg(test)]
mod tests {
use super::*;
use polkadot_primitives::v1::{BlakeTwo256, HashT, ValidatorId, Header, SessionInfo};
use polkadot_primitives::v1::{
BlakeTwo256, HashT, ValidatorId, Header, SessionInfo, BlockNumber,
};
use polkadot_node_subsystem::{jaeger, ActiveLeavesUpdate, ActivatedLeaf, LeafStatus};
use polkadot_node_subsystem::messages::{
AllMessages, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
......@@ -658,6 +487,7 @@ mod tests {
use sp_core::testing::TaskExecutor;
use sp_keyring::Sr25519Keyring;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sc_keystore::LocalKeystore;
use futures::future::{self, BoxFuture};
use parity_scale_codec::Encode;
use assert_matches::assert_matches;
......@@ -687,7 +517,6 @@ mod tests {
validator_public: Vec<ValidatorId>,
validator_groups: Vec<Vec<ValidatorIndex>>,
master_keystore: Arc<sc_keystore::LocalKeystore>,
subsystem_keystore: Arc<sc_keystore::LocalKeystore>,
db: Arc<dyn KeyValueDB>,
config: Config,
}
......@@ -714,7 +543,6 @@ mod tests {
];
let master_keystore = make_keystore(&validators).into();
let subsystem_keystore = make_keystore(&[Sr25519Keyring::Alice]).into();
let db = Arc::new(kvdb_memorydb::create(1));
let config = Config {
......@@ -726,7 +554,6 @@ mod tests {
validator_public,
validator_groups,
master_keystore,
subsystem_keystore,
db,
config,
}
......@@ -850,7 +677,6 @@ mod tests {
let subsystem = DisputeCoordinatorSubsystem::new(
state.db.clone(),
state.config.clone(),
state.subsystem_keystore.clone(),
);
let subsystem_task = run(subsystem, ctx);
......@@ -906,21 +732,6 @@ mod tests {
},
}).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeParticipation(DisputeParticipationMessage::Participate {
candidate_hash: c_hash,
candidate_receipt: c_receipt,
session: s,
voted_indices,
}) => {
assert_eq!(c_hash, candidate_hash);
assert_eq!(c_receipt, candidate_receipt);
assert_eq!(s, session);
assert_eq!(voted_indices, vec![ValidatorIndex(0), ValidatorIndex(1)]);
}
);
{
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
......@@ -977,7 +788,7 @@ mod tests {
}
#[test]
fn positive_votes_dont_trigger_participation() {
fn positive_votes_dont_create_dispute() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move {
let session = 1;
......@@ -1145,196 +956,4 @@ mod tests {
assert!(virtual_overseer.try_recv().await.is_none());
}));
}
#[test]
fn finality_votes_ignore_disputed_candidates() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move {
let session = 1;
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
test_state.activate_leaf_at_session(
&mut virtual_overseer,
session,
1,
).await;
let valid_vote = test_state.issue_statement_with_index(
0,
candidate_hash,
session,
true,
).await;
let invalid_vote = test_state.issue_statement_with_index(
1,
candidate_hash,
session,
false,
).await;
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(0)),
(invalid_vote, ValidatorIndex(1)),
],
},
}).await;
let _ = virtual_overseer.recv().await;
{
let (tx, rx) = oneshot::channel();
let block_hash_a = Hash::repeat_byte(0x0a);
let block_hash_b = Hash::repeat_byte(0x0b);
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
base_number: 10,
block_descriptions: vec![
(block_hash_a, session, vec![candidate_hash]),
],
tx,
},
}).await;
assert!(rx.await.unwrap().is_none());
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
base_number: 10,
block_descriptions: vec![
(block_hash_a, session, vec![]),
(block_hash_b, session, vec![candidate_hash]),
],
tx,
},
}).await;
assert_eq!(rx.await.unwrap(), Some((11, block_hash_a)));
}
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
}));
}
#[test]
fn supermajority_valid_dispute_may_be_finalized() {
test_harness(|test_state, mut virtual_overseer| Box::pin(async move {
let session = 1;
let candidate_receipt = CandidateReceipt::default();
let candidate_hash = candidate_receipt.hash();
test_state.activate_leaf_at_session(
&mut virtual_overseer,
session,
1,
).await;
let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold(
test_state.validators.len()
);
let valid_vote = test_state.issue_statement_with_index(
0,
candidate_hash,
session,
true,
).await;
let invalid_vote = test_state.issue_statement_with_index(
1,
candidate_hash,
session,
false,
).await;
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements: vec![
(valid_vote, ValidatorIndex(0)),
(invalid_vote, ValidatorIndex(1)),
],
},
}).await;
let _ = virtual_overseer.recv().await;
let mut statements = Vec::new();
for i in (0..supermajority_threshold - 1).map(|i| i + 2) {
let vote = test_state.issue_statement_with_index(
i,
candidate_hash,
session,
true,
).await;
statements.push((vote, ValidatorIndex(i as _)));
};
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
statements,
},
}).await;
{
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
}).await;
assert!(rx.await.unwrap().is_empty());
let (tx, rx) = oneshot::channel();
let block_hash_a = Hash::repeat_byte(0x0a);
let block_hash_b = Hash::repeat_byte(0x0b);
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
base_number: 10,
block_descriptions: vec![
(block_hash_a, session, vec![candidate_hash]),
],
tx,
},
}).await;
assert_eq!(rx.await.unwrap(), Some((11, block_hash_a)));
let (tx, rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
base_number: 10,
block_descriptions: vec![
(block_hash_a, session, vec![]),
(block_hash_b, session, vec![candidate_hash]),
],
tx,
},
}).await;
assert_eq!(rx.await.unwrap(), Some((12, block_hash_b)));
}
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
assert!(virtual_overseer.try_recv().await.is_none());
}));
}
}
......@@ -1251,7 +1251,6 @@ fn spread_event_to_subsystems_is_up_to_date() {
AllMessages::ApprovalDistribution(_) => { cnt += 1; }
AllMessages::GossipSupport(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeCoordinator(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeParticipation(_) => unreachable!("Not interetsed in network events"),
// Add variants here as needed, `{ cnt += 1; }` for those that need to be
// notified, `unreachable!()` for those that should not.
}
......
......@@ -627,7 +627,6 @@ impl ChannelsOut {
self.gossip_support.send(make_packet(signals_received, msg)).await
},
AllMessages::DisputeCoordinator(_) => Ok(()),
AllMessages::DisputeParticipation(_) => Ok(()),
};
if res.is_err() {
......@@ -731,7 +730,6 @@ impl ChannelsOut {
.map_err(|e| e.into_send_error())
},
AllMessages::DisputeCoordinator(_) => Ok(()),
AllMessages::DisputeParticipation(_) => Ok(()),
};
if res.is_err() {
......@@ -2064,7 +2062,6 @@ where
self.subsystems.gossip_support.send_message(msg).await?;
},