Unverified Commit f91c8ee4 authored by André Silva's avatar André Silva Committed by GitHub
Browse files

implement dispute participation subsystem (#3234)



* implement dispute participation subsystem

* guide: minor fix in dispute participation

* Update node/core/dispute-participation/src/lib.rs
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* dispute: add comments to participation logic

* dispute-coordinator: fix test compilation

* implementers-guide: update dispute participation

* dispute-participation: add error for missing validation code

* dispute-participation: add tests

* Update node/core/dispute-participation/src/lib.rs
Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>

* guide: update overseer protocol dispute participation message

* dispute-participation: remove duplication in tests
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>
parent ec07677c
Pipeline #142283 passed with stages
in 29 minutes and 38 seconds
......@@ -6117,6 +6117,22 @@ dependencies = [
"tracing",
]
[[package]]
name = "polkadot-node-core-dispute-participation"
version = "0.1.0"
dependencies = [
"assert_matches",
"futures 0.3.15",
"parity-scale-codec",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-primitives",
"sp-core",
"thiserror",
"tracing",
]
[[package]]
name = "polkadot-node-core-parachains-inherent"
version = "0.1.0"
......
......@@ -49,6 +49,7 @@ members = [
"node/core/candidate-validation",
"node/core/chain-api",
"node/core/dispute-coordinator",
"node/core/dispute-participation",
"node/core/parachains-inherent",
"node/core/provisioner",
"node/core/pvf",
......
......@@ -483,13 +483,11 @@ async fn handle_import_statements(
|active| active.insert(session, candidate_hash),
)?;
let voted_indices = votes.voted_indices();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
voted_indices,
n_validators: n_validators as u32,
}.into()).await;
}
......
......@@ -279,12 +279,12 @@ fn conflicting_votes_lead_to_dispute_participation() {
candidate_hash: c_hash,
candidate_receipt: c_receipt,
session: s,
voted_indices,
n_validators,
}) => {
assert_eq!(c_hash, candidate_hash);
assert_eq!(c_receipt, candidate_receipt);
assert_eq!(s, session);
assert_eq!(voted_indices, vec![ValidatorIndex(0), ValidatorIndex(1)]);
assert_eq!(n_validators, test_state.validators.len() as u32);
}
);
......
[package]
name = "polkadot-node-core-dispute-participation"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.12"
thiserror = "1.0.23"
tracing = "0.1.26"
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-primitives = { path = "../../../primitives" }
[dev-dependencies]
assert_matches = "1.5.0"
parity-scale-codec = "2.0.0"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers"}
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Implements the dispute participation subsystem.
//!
//! This subsystem is responsible for actually participating in disputes: when
//! notified of a dispute, we recover the candidate data, validate the
//! candidate, and cast our vote in the dispute.
use futures::channel::oneshot;
use futures::prelude::*;
use polkadot_node_primitives::ValidationResult;
use polkadot_node_subsystem::{
errors::{RecoveryError, RuntimeApiError},
messages::{
AllMessages, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
CandidateValidationMessage, DisputeCoordinatorMessage, DisputeParticipationMessage,
RuntimeApiMessage, RuntimeApiRequest,
},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError,
};
use polkadot_primitives::v1::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex};
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::dispute-participation";
struct State {
recent_block: Option<(BlockNumber, Hash)>,
}
/// An implementation of the dispute participation subsystem.
pub struct DisputeParticipationSubsystem;
impl DisputeParticipationSubsystem {
/// Create a new instance of the subsystem.
pub fn new() -> Self {
DisputeParticipationSubsystem
}
}
impl<Context> Subsystem<Context> for DisputeParticipationSubsystem
where
Context: SubsystemContext<Message = DisputeParticipationMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem {
name: "dispute-participation-subsystem",
future,
}
}
}
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
#[error(transparent)]
Subsystem(#[from] SubsystemError),
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error(transparent)]
Participation(#[from] ParticipationError),
}
#[derive(Debug, thiserror::Error)]
pub enum ParticipationError {
#[error("Missing recent block state to participate in dispute")]
MissingRecentBlockState,
#[error("Failed to recover available data for candidate {0}")]
MissingAvailableData(CandidateHash),
#[error("Failed to recover validation code for candidate {0}")]
MissingValidationCode(CandidateHash),
}
impl Error {
fn trace(&self) {
match self {
// don't spam the log with spurious errors
Self::RuntimeApi(_) | Self::Oneshot(_) => {
tracing::debug!(target: LOG_TARGET, err = ?self)
}
// it's worth reporting otherwise
_ => tracing::warn!(target: LOG_TARGET, err = ?self),
}
}
}
async fn run<Context>(mut ctx: Context)
where
Context: SubsystemContext<Message = DisputeParticipationMessage>,
{
let mut state = State { recent_block: None };
loop {
match ctx.recv().await {
Err(_) => return,
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => {
tracing::info!(target: LOG_TARGET, "Received `Conclude` signal, exiting");
return;
}
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _))) => {}
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(update))) => {
update_state(&mut state, update);
}
Ok(FromOverseer::Communication { msg }) => {
if let Err(err) = handle_incoming(&mut ctx, &mut state, msg).await {
err.trace();
if let Error::Subsystem(SubsystemError::Context(_)) = err {
return;
}
}
}
}
}
}
fn update_state(state: &mut State, update: ActiveLeavesUpdate) {
for active in update.activated {
if state.recent_block.map_or(true, |s| active.number > s.0) {
state.recent_block = Some((active.number, active.hash));
}
}
}
async fn handle_incoming(
ctx: &mut impl SubsystemContext,
state: &mut State,
message: DisputeParticipationMessage,
) -> Result<(), Error> {
match message {
DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt,
session,
n_validators,
} => {
if let Some((_, block_hash)) = state.recent_block {
participate(
ctx,
block_hash,
candidate_hash,
candidate_receipt,
session,
n_validators,
)
.await
} else {
return Err(ParticipationError::MissingRecentBlockState.into());
}
}
}
}
async fn participate(
ctx: &mut impl SubsystemContext,
block_hash: Hash,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
n_validators: u32,
) -> Result<(), Error> {
let (recover_available_data_tx, recover_available_data_rx) = oneshot::channel();
let (code_tx, code_rx) = oneshot::channel();
let (store_available_data_tx, store_available_data_rx) = oneshot::channel();
let (validation_tx, validation_rx) = oneshot::channel();
// in order to validate a candidate we need to start by recovering the
// available data
ctx.send_message(
AvailabilityRecoveryMessage::RecoverAvailableData(
candidate_receipt.clone(),
session,
None,
recover_available_data_tx,
)
.into(),
)
.await;
let available_data = match recover_available_data_rx.await? {
Ok(data) => data,
Err(RecoveryError::Invalid) => {
// the available data was recovered but it is invalid, therefore we'll
// vote negatively for the candidate dispute
cast_invalid_vote(ctx, candidate_hash, candidate_receipt, session).await;
return Ok(());
}
Err(RecoveryError::Unavailable) => {
return Err(ParticipationError::MissingAvailableData(candidate_hash).into());
}
};
// we also need to fetch the validation code which we can reference by its
// hash as taken from the candidate descriptor
ctx.send_message(
RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::ValidationCodeByHash(
candidate_receipt.descriptor.validation_code_hash,
code_tx,
),
)
.into(),
)
.await;
let validation_code = match code_rx.await?? {
Some(code) => code,
None => {
tracing::warn!(
target: LOG_TARGET,
"Validation code unavailable for code hash {:?} in the state of block {:?}",
candidate_receipt.descriptor.validation_code_hash,
block_hash,
);
return Err(ParticipationError::MissingValidationCode(candidate_hash).into());
}
};
// we dispatch a request to store the available data for the candidate. we
// want to maximize data availability for other potential checkers involved
// in the dispute
ctx.send_message(
AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
None,
n_validators,
available_data.clone(),
store_available_data_tx,
)
.into(),
)
.await;
match store_available_data_rx.await? {
Err(_) => {
tracing::warn!(
target: LOG_TARGET,
"Failed to store available data for candidate {:?}",
candidate_hash,
);
}
Ok(()) => {}
}
// we issue a request to validate the candidate with the provided exhaustive
// parameters
ctx.send_message(
CandidateValidationMessage::ValidateFromExhaustive(
available_data.validation_data,
validation_code,
candidate_receipt.descriptor.clone(),
available_data.pov,
validation_tx,
)
.into(),
)
.await;
// we cast votes (either positive or negative) depending on the outcome of
// the validation and if valid, whether the commitments hash matches
match validation_rx.await? {
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
"Candidate {:?} validation failed with: {:?}",
candidate_receipt.hash(),
err,
);
cast_invalid_vote(ctx, candidate_hash, candidate_receipt, session).await;
}
Ok(ValidationResult::Invalid(invalid)) => {
tracing::warn!(
target: LOG_TARGET,
"Candidate {:?} considered invalid: {:?}",
candidate_hash,
invalid,
);
cast_invalid_vote(ctx, candidate_hash, candidate_receipt, session).await;
}
Ok(ValidationResult::Valid(commitments, _)) => {
if commitments.hash() != candidate_receipt.commitments_hash {
tracing::warn!(
target: LOG_TARGET,
expected = ?candidate_receipt.commitments_hash,
got = ?commitments.hash(),
"Candidate is valid but commitments hash doesn't match",
);
cast_invalid_vote(ctx, candidate_hash, candidate_receipt, session).await;
} else {
cast_valid_vote(ctx, candidate_hash, candidate_receipt, session).await;
}
}
}
Ok(())
}
async fn cast_valid_vote(
ctx: &mut impl SubsystemContext,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
) {
tracing::info!(
target: LOG_TARGET,
"Casting valid vote in dispute for candidate {:?}",
candidate_hash,
);
issue_local_statement(ctx, candidate_hash, candidate_receipt, session, true).await;
}
async fn cast_invalid_vote(
ctx: &mut impl SubsystemContext,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
) {
tracing::info!(
target: LOG_TARGET,
"Casting invalid vote in dispute for candidate {:?}",
candidate_hash,
);
issue_local_statement(ctx, candidate_hash, candidate_receipt, session, false).await;
}
async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
valid: bool,
) {
ctx.send_message(AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::IssueLocalStatement(
session,
candidate_hash,
candidate_receipt,
valid,
),
))
.await
}
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use assert_matches::assert_matches;
use futures::future::{self, BoxFuture};
use std::sync::Arc;
use sp_core::testing::TaskExecutor;
use super::*;
use parity_scale_codec::Encode;
use polkadot_node_primitives::{AvailableData, BlockData, InvalidCandidate, PoV};
use polkadot_node_subsystem::{
jaeger, messages::ValidationFailed, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
};
use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystemContextHandle};
use polkadot_primitives::v1::{BlakeTwo256, CandidateCommitments, HashT, Header, ValidationCode};
type VirtualOverseer = TestSubsystemContextHandle<DisputeParticipationMessage>;
fn test_harness<F>(test: F)
where
F: FnOnce(VirtualOverseer) -> BoxFuture<'static, VirtualOverseer>,
{
let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
let subsystem = DisputeParticipationSubsystem::new();
let spawned_subsystem = subsystem.start(ctx);
let test_future = test(ctx_handle);
let (subsystem_result, _) =
futures::executor::block_on(future::join(spawned_subsystem.future, async move {
let mut ctx_handle = test_future.await;
ctx_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
// no further request is received by the overseer which means that
// no further attempt to participate was made
assert!(ctx_handle.try_recv().await.is_none());
}));
subsystem_result.unwrap();
}
async fn activate_leaf(virtual_overseer: &mut VirtualOverseer, block_number: BlockNumber) {
let block_header = Header {
parent_hash: BlakeTwo256::hash(&block_number.encode()),
number: block_number,
digest: Default::default(),
state_root: Default::default(),
extrinsics_root: Default::default(),
};
let block_hash = block_header.hash();
virtual_overseer
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: block_hash,
span: Arc::new(jaeger::Span::Disabled),
number: block_number,
status: LeafStatus::Fresh,
}),
)))
.await;
}
async fn participate(virtual_overseer: &mut VirtualOverseer) {
let commitments = CandidateCommitments::default();
let candidate_receipt = {
let mut receipt = CandidateReceipt::default();
receipt.commitments_hash = commitments.hash();
receipt
};
let candidate_hash = candidate_receipt.hash();
let session = 1;
let n_validators = 10;
virtual_overseer
.send(FromOverseer::Communication {
msg: DisputeParticipationMessage::Participate {
candidate_hash,
candidate_receipt: candidate_receipt.clone(),
session,
n_validators,
},
})
.await;
}
async fn recover_available_data(virtual_overseer: &mut VirtualOverseer) {
let pov_block = PoV {
block_data: BlockData(Vec::new()),
};
let available_data = AvailableData {
pov: Arc::new(pov_block),
validation_data: Default::default(),
};
assert_matches!(
virtual_overseer.recv().await,
AllMessages::AvailabilityRecovery(
AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx)
) => {
tx.send(Ok(available_data)).unwrap();
},
"overseer did not receive recover available data message",
);
}
async fn fetch_validation_code(virtual_overseer: &mut VirtualOverseer) {
let validation_code = ValidationCode(Vec::new());
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_,
RuntimeApiRequest::ValidationCodeByHash(
_,
tx,
)
)) => {
tx.send(Ok(Some(validation_code))).unwrap();
},
"overseer did not receive runtime api request for validation code",
);
}
async fn store_available_data(virtual_overseer: &mut VirtualOverseer, success: bool) {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData(