Unverified Commit a08a663e authored by Lldenaurois's avatar Lldenaurois Committed by GitHub
Browse files

Dispute coordinator overlay (#3462)

* node/dispute-coordinator: Modify db to return SubsystemResult.

In preparation of moving to the overlayed backend pattern, this commit
moves the db to return SubsystemResult values.

* node/dispute-coordinator: Add the Backend and OverlayedBackend.

This commit adds the backend and overlayed backend structs to the
dispute-coordinator subsystem.

* node/dispute-coordinator: Implement backend and overlayed-backend.

This commit finalizes the move from the previous transactional model
to the common overlay pattern in subsystem persistency. This can be
observed in the ApprovalVoting and ChainSelection subsystems.

* Add module docs + license

* Touchup merge
parent e8184e59
Pipeline #147026 passed with stages
in 43 minutes and 47 seconds
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! An abstraction over storage used by the chain selection subsystem.
//!
//! This provides both a [`Backend`] trait and an [`OverlayedBackend`]
//! struct which allows in-memory changes to be applied on top of a
//! [`Backend`], maintaining consistency between queries and temporary writes,
//! before any commit to the underlying storage is made.
use polkadot_primitives::v1::{CandidateHash, SessionIndex};
use polkadot_node_subsystem::SubsystemResult;
use std::collections::HashMap;
use super::db::v1::{RecentDisputes, CandidateVotes};
#[derive(Debug)]
pub enum BackendWriteOp {
WriteEarliestSession(SessionIndex),
WriteRecentDisputes(RecentDisputes),
WriteCandidateVotes(SessionIndex, CandidateHash, CandidateVotes),
DeleteCandidateVotes(SessionIndex, CandidateHash),
}
/// An abstraction over backend storage for the logic of this subsystem.
pub trait Backend {
/// Load the earliest session, if any.
fn load_earliest_session(&self) -> SubsystemResult<Option<SessionIndex>>;
/// Load the recent disputes, if any.
fn load_recent_disputes(&self) -> SubsystemResult<Option<RecentDisputes>>;
/// Load the candidate votes for the specific session-candidate pair, if any.
fn load_candidate_votes(
&self,
session: SessionIndex,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateVotes>>;
/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
fn write<I>(&mut self, ops: I) -> SubsystemResult<()>
where I: IntoIterator<Item = BackendWriteOp>;
}
/// An in-memory overllay for the backend.
///
/// This maintains read-only access to the underlying backend, but can be converted into a set of
/// write operations which will, when written to the underlying backend, give the same view as the
/// state of the overlay.
pub struct OverlayedBackend<'a, B: 'a> {
inner: &'a B,
// `None` means unchanged.
earliest_session: Option<SessionIndex>,
// `None` means unchanged.
recent_disputes: Option<RecentDisputes>,
// `None` means deleted, missing means query inner.
candidate_votes: HashMap<(SessionIndex, CandidateHash), Option<CandidateVotes>>,
}
impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> {
pub fn new(backend: &'a B) -> Self {
Self {
inner: backend,
earliest_session: None,
recent_disputes: None,
candidate_votes: HashMap::new(),
}
}
/// Returns true if the are no write operations to perform.
pub fn is_empty(&self) -> bool {
self.earliest_session.is_none() &&
self.recent_disputes.is_none() &&
self.candidate_votes.is_empty()
}
/// Load the earliest session, if any.
pub fn load_earliest_session(&self) -> SubsystemResult<Option<SessionIndex>> {
if let Some(val) = self.earliest_session {
return Ok(Some(val))
}
self.inner.load_earliest_session()
}
/// Load the recent disputes, if any.
pub fn load_recent_disputes(&self) -> SubsystemResult<Option<RecentDisputes>> {
if let Some(val) = &self.recent_disputes {
return Ok(Some(val.clone()))
}
self.inner.load_recent_disputes()
}
/// Load the candidate votes for the specific session-candidate pair, if any.
pub fn load_candidate_votes(
&self,
session: SessionIndex,
candidate_hash: &CandidateHash
) -> SubsystemResult<Option<CandidateVotes>> {
if let Some(val) = self.candidate_votes.get(&(session, *candidate_hash)) {
return Ok(val.clone())
}
self.inner.load_candidate_votes(session, candidate_hash)
}
/// Prepare a write to the 'earliest session' field of the DB.
///
/// Later calls to this function will override earlier ones.
pub fn write_earliest_session(&mut self, session: SessionIndex) {
self.earliest_session = Some(session);
}
/// Prepare a write to the recent disputes stored in the DB.
///
/// Later calls to this function will override earlier ones.
pub fn write_recent_disputes(&mut self, recent_disputes: RecentDisputes) {
self.recent_disputes = Some(recent_disputes)
}
/// Prepare a write of the candidate votes under the indicated candidate.
///
/// Later calls to this function for the same candidate will override earlier ones.
pub fn write_candidate_votes(
&mut self,
session: SessionIndex,
candidate_hash: CandidateHash,
votes: CandidateVotes
) {
self.candidate_votes.insert((session, candidate_hash), Some(votes));
}
/// Prepare a deletion of the candidate votes under the indicated candidate.
///
/// Later calls to this function for the same candidate will override earlier ones.
pub fn delete_candidate_votes(
&mut self,
session: SessionIndex,
candidate_hash: CandidateHash,
) {
self.candidate_votes.insert((session, candidate_hash), None);
}
/// Transform this backend into a set of write-ops to be written to the inner backend.
pub fn into_write_ops(self) -> impl Iterator<Item = BackendWriteOp> {
let earliest_session_ops = self.earliest_session
.map(|s| BackendWriteOp::WriteEarliestSession(s))
.into_iter();
let recent_dispute_ops = self.recent_disputes
.map(|d| BackendWriteOp::WriteRecentDisputes(d))
.into_iter();
let candidate_vote_ops = self.candidate_votes
.into_iter()
.map(|((session, candidate), votes)| match votes {
Some(votes) => BackendWriteOp::WriteCandidateVotes(session, candidate, votes),
None => BackendWriteOp::DeleteCandidateVotes(session, candidate),
});
earliest_session_ops
.chain(recent_dispute_ops)
.chain(candidate_vote_ops)
}
}
This diff is collapsed.
......@@ -52,9 +52,11 @@ use kvdb::KeyValueDB;
use parity_scale_codec::{Encode, Decode, Error as CodecError};
use sc_keystore::LocalKeystore;
use db::v1::RecentDisputes;
use db::v1::{RecentDisputes, DbBackend};
use backend::{Backend, OverlayedBackend};
mod db;
mod backend;
#[cfg(test)]
mod tests;
......@@ -112,7 +114,8 @@ where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(self, ctx, Box::new(SystemClock))
let backend = DbBackend::new(self.store.clone(), self.config.column_config());
let future = run(self, ctx, backend, Box::new(SystemClock))
.map(|_| Ok(()))
.boxed();
......@@ -262,17 +265,19 @@ impl DisputeStatus {
}
}
async fn run<Context>(
async fn run<B, Context>(
subsystem: DisputeCoordinatorSubsystem,
mut ctx: Context,
mut backend: B,
clock: Box<dyn Clock>,
)
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
Context: SubsystemContext<Message = DisputeCoordinatorMessage>
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
B: Backend,
{
loop {
let res = run_iteration(&mut ctx, &subsystem, &*clock).await;
let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await;
match res {
Err(e) => {
e.trace();
......@@ -294,24 +299,25 @@ where
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async fn run_iteration<Context>(
async fn run_iteration<B, Context>(
ctx: &mut Context,
subsystem: &DisputeCoordinatorSubsystem,
backend: &mut B,
clock: &dyn Clock,
)
-> Result<(), Error>
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
Context: SubsystemContext<Message = DisputeCoordinatorMessage>
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
B: Backend,
{
let DisputeCoordinatorSubsystem { ref store, ref keystore, ref config } = *subsystem;
let mut state = State {
keystore: keystore.clone(),
keystore: subsystem.keystore.clone(),
highest_session: None,
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
};
loop {
let mut overlay_db = OverlayedBackend::new(backend);
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::Conclude) => {
return Ok(())
......@@ -319,9 +325,8 @@ where
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
handle_new_activations(
ctx,
&**store,
&mut overlay_db,
&mut state,
config,
update.activated.into_iter().map(|a| a.hash),
).await?
}
......@@ -329,22 +334,25 @@ where
FromOverseer::Communication { msg } => {
handle_incoming(
ctx,
&**store,
&mut overlay_db,
&mut state,
config,
msg,
clock.now(),
).await?
}
}
if !overlay_db.is_empty() {
let ops = overlay_db.into_write_ops();
backend.write(ops)?;
}
}
}
async fn handle_new_activations(
ctx: &mut (impl SubsystemContext<Message = DisputeCoordinatorMessage> + overseer::SubsystemContext<Message = DisputeCoordinatorMessage>),
store: &dyn KeyValueDB,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
config: &Config,
new_activations: impl IntoIterator<Item = Hash>,
) -> Result<(), Error> {
for new_leaf in new_activations {
......@@ -388,11 +396,7 @@ async fn handle_new_activations(
state.highest_session = Some(session);
db::v1::note_current_session(
store,
&config.column_config(),
session,
)?;
db::v1::note_current_session(overlay_db, session)?;
}
}
_ => {}
......@@ -404,9 +408,8 @@ async fn handle_new_activations(
async fn handle_incoming(
ctx: &mut impl SubsystemContext,
store: &dyn KeyValueDB,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
config: &Config,
message: DisputeCoordinatorMessage,
now: Timestamp,
) -> Result<(), Error> {
......@@ -420,9 +423,8 @@ async fn handle_incoming(
} => {
handle_import_statements(
ctx,
store,
overlay_db,
state,
config,
candidate_hash,
candidate_receipt,
session,
......@@ -432,15 +434,11 @@ async fn handle_incoming(
).await?;
}
DisputeCoordinatorMessage::RecentDisputes(rx) => {
let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())?
.unwrap_or_default();
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let _ = rx.send(recent_disputes.keys().cloned().collect());
}
DisputeCoordinatorMessage::ActiveDisputes(rx) => {
let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())?
.unwrap_or_default();
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
let _ = rx.send(collect_active(recent_disputes, now));
}
DisputeCoordinatorMessage::QueryCandidateVotes(
......@@ -449,9 +447,7 @@ async fn handle_incoming(
) => {
let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
if let Some(v) = db::v1::load_candidate_votes(
store,
&config.column_config(),
if let Some(v) = overlay_db.load_candidate_votes(
session_index,
&candidate_hash,
)? {
......@@ -475,8 +471,7 @@ async fn handle_incoming(
issue_local_statement(
ctx,
state,
store,
config,
overlay_db,
candidate_hash,
candidate_receipt,
session,
......@@ -490,8 +485,7 @@ async fn handle_incoming(
tx,
} => {
let undisputed_chain = determine_undisputed_chain(
store,
&config,
overlay_db,
base_number,
block_descriptions
)?;
......@@ -528,9 +522,8 @@ fn insert_into_statement_vec<T>(
async fn handle_import_statements(
ctx: &mut impl SubsystemContext,
store: &dyn KeyValueDB,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
config: &Config,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
......@@ -563,12 +556,7 @@ async fn handle_import_statements(
let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold(n_validators);
let mut votes = db::v1::load_candidate_votes(
store,
&config.column_config(),
session,
&candidate_hash
)?
let mut votes = overlay_db.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
......@@ -617,86 +605,79 @@ async fn handle_import_statements(
let concluded_valid = votes.valid.len() >= supermajority_threshold;
let concluded_invalid = votes.invalid.len() >= supermajority_threshold;
let mut recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())?
.unwrap_or_default();
let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
{ // Scope so we will only confirm valid import after the import got actually persisted.
let mut tx = db::v1::Transaction::default();
let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone());
let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone());
let status = if is_disputed {
let status = recent_disputes
.entry((session, candidate_hash))
.or_insert(DisputeStatus::active());
let status = if is_disputed {
let status = recent_disputes
.entry((session, candidate_hash))
.or_insert(DisputeStatus::active());
// Note: concluded-invalid overwrites concluded-valid,
// so we do this check first. Dispute state machine is
// non-commutative.
if concluded_valid {
*status = status.concluded_for(now);
}
if concluded_invalid {
*status = status.concluded_against(now);
}
// Note: concluded-invalid overwrites concluded-valid,
// so we do this check first. Dispute state machine is
// non-commutative.
if concluded_valid {
*status = status.concluded_for(now);
}
Some(*status)
} else {
None
};
if concluded_invalid {
*status = status.concluded_against(now);
}
if status != prev_status {
// Only write when updated.
tx.put_recent_disputes(recent_disputes);
Some(*status)
} else {
None
};
// This branch is only hit when the candidate is freshly disputed -
// status was previously `None`, and now is not.
if prev_status.is_none() {
// No matter what, if the dispute is new, we participate.
if status != prev_status {
// This branch is only hit when the candidate is freshly disputed -
// status was previously `None`, and now is not.
if prev_status.is_none() {
// No matter what, if the dispute is new, we participate.
//
// We also block the coordinator while awaiting our determination
// of whether the vote is available.
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
n_validators: n_validators as u32,
report_availability,
}).await;
if !receive_availability.await.map_err(Error::Oneshot)? {
// If the data is not available, we disregard the dispute votes.
// This is an indication that the dispute does not correspond to any included
// candidate and that it should be ignored.
//
// We also block the coordinator while awaiting our determination
// of whether the vote is available.
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
n_validators: n_validators as u32,
report_availability,
}).await;
if !receive_availability.await.map_err(Error::Oneshot)? {
// If the data is not available, we disregard the dispute votes.
// This is an indication that the dispute does not correspond to any included
// candidate and that it should be ignored.
//
// We expect that if the candidate is truly disputed that the higher-level network
// code will retry.
pending_confirmation.send(ImportStatementsResult::InvalidImport)
.map_err(|_| Error::OneshotSend)?;
// We expect that if the candidate is truly disputed that the higher-level network
// code will retry.
pending_confirmation.send(ImportStatementsResult::InvalidImport)
.map_err(|_| Error::OneshotSend)?;
tracing::debug!(
target: LOG_TARGET,
"Recovering availability failed - invalid import."
);
return Ok(())
}
tracing::debug!(
target: LOG_TARGET,
"Recovering availability failed - invalid import."
);
return Ok(())
}
}
tx.put_candidate_votes(session, candidate_hash, votes.into());
tx.write(store, &config.column_config())?;
// Only write when updated and vote is available.
overlay_db.write_recent_disputes(recent_disputes);
}
overlay_db.write_candidate_votes(session, candidate_hash, votes.into());
Ok(())
}
async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
state: &mut State,
store: &dyn KeyValueDB,
config: &Config,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
......@@ -719,12 +700,7 @@ async fn issue_local_statement(
let validators = info.validators.clone();
let votes = db::v1::load_candidate_votes(
store,
&config.column_config(),
session,
&candidate_hash
)?
let votes = overlay_db.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
......@@ -792,9 +768,8 @@ async fn issue_local_statement(
let (pending_confirmation, _rx) = oneshot::channel();
handle_import_statements(
ctx,
store,
overlay_db,
state,
config,
candidate_hash,
candidate_receipt,
session,
......@@ -862,16 +837,16 @@ fn make_dispute_message(
}
fn determine_undisputed_chain(
store: &dyn KeyValueDB,
config: &Config,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
base_number: BlockNumber,
block_descriptions: Vec<(Hash, SessionIndex, Vec<CandidateHash>)>,
) -> Result<Option<(BlockNumber, Hash)>, Error> {
let last = block_descriptions.last()
.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.0));
// Fast path for no disputes.
let recent_disputes = match db::v1::load_recent_disputes(store, &config.column_config())? {
let recent_disputes = match overlay_db.load_recent_disputes()? {
None => return Ok(last),
Some(a) if a.is_empty() => return Ok(last),
Some(a) => a,
......
......@@ -250,7 +250,8 @@ fn test_harness<F>(test: F)
state.subsystem_keystore.clone(),
);
let subsystem_task = run(subsystem, ctx, Box::new(state.clock.clone()));
let backend = DbBackend::new(state.db.clone(), state.config.column_config());
let subsystem_task = run(subsystem, ctx, backend, Box::new(state.clock.clone()));
let test_task = test(state, ctx_handle);
futures::executor::block_on(future::join(subsystem_task, test_task));
......