Unverified Commit d488955d authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Dispute spam protection (#4134)



* Mostly notes.

* Better error messages.

* Introduce Fatal/NonFatal + drop back channel participation

- Fatal/NonFatal - in order to make it easier to use utility functions.
- We drop the back channel in dispute participation as it won't be
needed any more.

* Better error messages.

* Utility function for receiving `CandidateEvent`s.

* Ordering module typechecks.

* cargo fmt

* Prepare spam slots module.

* Implement SpamSlots mechanism.

* Implement queues.

* cargo fmt

* Participation.

* Participation taking shape.

* Finish participation.

* cargo fmt

* Cleanup.

* WIP: Cleanup + Integration.

* Make `RollingSessionWindow` initialized by default.

* Make approval voting typecheck.

* Get rid of lazy_static & fix approval voting tests

* Move `SessionWindowSize` to node primitives.

* Implement dispute coordinator initialization.

* cargo fmt

* Make queues return error instead of boolean.

* Initialized: WIP

* Introduce chain api for getting finalized block.

* Fix ordering to only prune candidates on finalized events.

* Pruning of old sessions in spam slots.

* New import logic.

* Make everything typecheck.

* Fix warnings.

* Get rid of obsolete dispute-participation.

* Fixes.

* Add back accidentelly deleted Cargo.lock

* Deliver disputes in an ordered fashion.

* Add module docs for errors

* Use type synonym.

* hidden docs.

* Fix overseer tests.

* Ordering provider taking `CandidateReceipt`.

... To be kicked on one next commit.

* Fix ordering to use relay_parent

as included block is not unique per candidate.

* Add comment in ordering.rs.

* Take care of duplicate entries in queues.

* Better spam slots.

* Review remarks + docs.

* Fix db tests.

* Participation tests.

* Also scrape votes on first leaf for good measure.

* Make tests typecheck.

* Spelling.

* Only participate in actual disputes, not on every import.

* Don't account backing votes to spam slots.

* Fix more tests.

* Don't participate if we don't have keys.

* Fix tests, typos and warnings.

* Fix merge error.

* Spelling fixes.

* Add missing docs.

* Queue tests.

* More tests.

* Add metrics + don't short circuit import.

* Basic test for ordering provider.

* Import fix.

* Remove dead link.

* One more dead link.
Co-authored-by: Lldenaurois's avatarLldenaurois <Ljdenaurois@gmail.com>
parent 9f059fb1
Pipeline #167012 canceled with stages
in 15 minutes and 9 seconds
......@@ -6368,22 +6368,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "polkadot-node-core-dispute-participation"
version = "0.9.13"
dependencies = [
"assert_matches",
"futures 0.3.17",
"parity-scale-codec",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-primitives",
"sp-core",
"thiserror",
"tracing",
]
[[package]]
name = "polkadot-node-core-parachains-inherent"
version = "0.9.13"
......@@ -6591,6 +6575,7 @@ dependencies = [
"env_logger 0.9.0",
"futures 0.3.17",
"itertools",
"lazy_static",
"log",
"lru 0.7.0",
"metered-channel",
......@@ -6599,6 +6584,7 @@ dependencies = [
"polkadot-node-jaeger",
"polkadot-node-metrics",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-overseer",
......@@ -6955,7 +6941,6 @@ dependencies = [
"polkadot-node-core-chain-api",
"polkadot-node-core-chain-selection",
"polkadot-node-core-dispute-coordinator",
"polkadot-node-core-dispute-participation",
"polkadot-node-core-parachains-inherent",
"polkadot-node-core-provisioner",
"polkadot-node-core-runtime-api",
......
......@@ -56,7 +56,6 @@ members = [
"node/core/chain-api",
"node/core/chain-selection",
"node/core/dispute-coordinator",
"node/core/dispute-participation",
"node/core/parachains-inherent",
"node/core/provisioner",
"node/core/pvf",
......
......@@ -76,7 +76,7 @@ struct ImportedBlockInfo {
}
struct ImportedBlockInfoEnv<'a> {
session_window: &'a RollingSessionWindow,
session_window: &'a Option<RollingSessionWindow>,
assignment_criteria: &'a (dyn AssignmentCriteria + Send + Sync),
keystore: &'a LocalKeystore,
}
......@@ -133,7 +133,11 @@ async fn imported_block_info(
Err(_) => return Ok(None),
};
if env.session_window.earliest_session().map_or(true, |e| session_index < e) {
if env
.session_window
.as_ref()
.map_or(true, |s| session_index < s.earliest_session())
{
tracing::debug!(
target: LOG_TARGET,
"Block {} is from ancient session {}. Skipping",
......@@ -180,7 +184,8 @@ async fn imported_block_info(
}
};
let session_info = match env.session_window.session_info(session_index) {
let session_info = match env.session_window.as_ref().and_then(|s| s.session_info(session_index))
{
Some(s) => s,
None => {
tracing::debug!(
......@@ -324,7 +329,7 @@ pub(crate) async fn handle_new_head(
}
};
match state.session_window.cache_session_info_for_head(ctx, head).await {
match state.cache_session_info_for_head(ctx, head).await {
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
......@@ -335,7 +340,7 @@ pub(crate) async fn handle_new_head(
return Ok(Vec::new())
},
Ok(a @ SessionWindowUpdate::Advanced { .. }) => {
Ok(Some(a @ SessionWindowUpdate::Advanced { .. })) => {
tracing::info!(
target: LOG_TARGET,
update = ?a,
......@@ -431,8 +436,9 @@ pub(crate) async fn handle_new_head(
let session_info = state
.session_window
.session_info(session_index)
.expect("imported_block_info requires session to be available; qed");
.as_ref()
.and_then(|s| s.session_info(session_index))
.expect("imported_block_info requires session info to be available; qed");
let (block_tick, no_show_duration) = {
let block_tick = slot_number_to_tick(state.slot_duration_millis, slot);
......@@ -608,7 +614,7 @@ pub(crate) mod tests {
fn blank_state() -> State {
State {
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
session_window: None,
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
clock: Box::new(MockClock::default()),
......@@ -618,11 +624,11 @@ pub(crate) mod tests {
fn single_session_state(index: SessionIndex, info: SessionInfo) -> State {
State {
session_window: RollingSessionWindow::with_session_info(
session_window: Some(RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
index,
vec![info],
),
)),
..blank_state()
}
}
......@@ -740,7 +746,7 @@ pub(crate) mod tests {
let header = header.clone();
Box::pin(async move {
let env = ImportedBlockInfoEnv {
session_window: &session_window,
session_window: &Some(session_window),
assignment_criteria: &MockAssignmentCriteria,
keystore: &LocalKeystore::in_memory(),
};
......@@ -849,7 +855,7 @@ pub(crate) mod tests {
let header = header.clone();
Box::pin(async move {
let env = ImportedBlockInfoEnv {
session_window: &session_window,
session_window: &Some(session_window),
assignment_criteria: &MockAssignmentCriteria,
keystore: &LocalKeystore::in_memory(),
};
......@@ -942,7 +948,7 @@ pub(crate) mod tests {
.collect::<Vec<_>>();
let test_fut = {
let session_window = RollingSessionWindow::new(APPROVAL_SESSIONS);
let session_window = None;
let header = header.clone();
Box::pin(async move {
......@@ -1037,11 +1043,11 @@ pub(crate) mod tests {
.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
.collect::<Vec<_>>();
let session_window = RollingSessionWindow::with_session_info(
let session_window = Some(RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
);
));
let header = header.clone();
Box::pin(async move {
......
......@@ -44,7 +44,10 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
rolling_session_window::RollingSessionWindow,
rolling_session_window::{
new_session_window_size, RollingSessionWindow, SessionWindowSize, SessionWindowUpdate,
SessionsUnavailable,
},
TimeoutExt,
};
use polkadot_primitives::v1::{
......@@ -92,7 +95,8 @@ use crate::{
#[cfg(test)]
mod tests;
const APPROVAL_SESSIONS: SessionIndex = 6;
pub const APPROVAL_SESSIONS: SessionWindowSize = new_session_window_size!(6);
const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
const APPROVAL_CACHE_SIZE: usize = 1024;
const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
......@@ -568,7 +572,7 @@ impl CurrentlyCheckingSet {
}
struct State {
session_window: RollingSessionWindow,
session_window: Option<RollingSessionWindow>,
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
clock: Box<dyn Clock + Send + Sync>,
......@@ -577,9 +581,30 @@ struct State {
impl State {
fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
self.session_window.session_info(i)
self.session_window.as_ref().and_then(|w| w.session_info(i))
}
/// Bring `session_window` up to date.
pub async fn cache_session_info_for_head(
&mut self,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
head: Hash,
) -> Result<Option<SessionWindowUpdate>, SessionsUnavailable> {
let session_window = self.session_window.take();
match session_window {
None => {
self.session_window =
Some(RollingSessionWindow::new(ctx, APPROVAL_SESSIONS, head).await?);
Ok(None)
},
Some(mut session_window) => {
let r =
session_window.cache_session_info_for_head(ctx, head).await.map(Option::Some);
self.session_window = Some(session_window);
r
},
}
}
// Compute the required tranches for approval for this block and candidate combo.
// Fails if there is no approval entry for the block under the candidate or no candidate entry
// under the block, or if the session is out of bounds.
......@@ -671,7 +696,7 @@ where
B: Backend,
{
let mut state = State {
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
session_window: None,
keystore: subsystem.keystore,
slot_duration_millis: subsystem.slot_duration_millis,
clock,
......
......@@ -24,6 +24,8 @@ struct MetricsInner {
votes: prometheus::CounterVec<prometheus::U64>,
/// Conclusion across all disputes.
concluded: prometheus::CounterVec<prometheus::U64>,
/// Number of participations that have been queued.
queued_participations: prometheus::CounterVec<prometheus::U64>,
}
/// Candidate validation metrics.
......@@ -61,6 +63,18 @@ impl Metrics {
metrics.concluded.with_label_values(&["invalid"]).inc();
}
}
pub(crate) fn on_queued_priority_participation(&self) {
if let Some(metrics) = &self.0 {
metrics.queued_participations.with_label_values(&["priority"]).inc();
}
}
pub(crate) fn on_queued_best_effort_participation(&self) {
if let Some(metrics) = &self.0 {
metrics.queued_participations.with_label_values(&["best-effort"]).inc();
}
}
}
impl metrics::Metrics for Metrics {
......@@ -93,6 +107,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
queued_participations: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_dispute_participations",
"Total number of queued participations, grouped by priority and best-effort. (Not every queueing will necessarily lead to an actual participation because of duplicates.)",
),
&["priority"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
......
......@@ -26,7 +26,10 @@ use polkadot_primitives::v1::{CandidateHash, SessionIndex};
use std::collections::HashMap;
use super::db::v1::{CandidateVotes, RecentDisputes};
use super::{
db::v1::{CandidateVotes, RecentDisputes},
error::FatalResult,
};
#[derive(Debug)]
pub enum BackendWriteOp {
......@@ -53,7 +56,7 @@ pub trait Backend {
/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
fn write<I>(&mut self, ops: I) -> SubsystemResult<()>
fn write<I>(&mut self, ops: I) -> FatalResult<()>
where
I: IntoIterator<Item = BackendWriteOp>;
}
......
......@@ -27,12 +27,11 @@ use std::sync::Arc;
use kvdb::{DBTransaction, KeyValueDB};
use parity_scale_codec::{Decode, Encode};
use crate::{
real::{
backend::{Backend, BackendWriteOp, OverlayedBackend},
DISPUTE_WINDOW,
},
DisputeStatus,
use crate::real::{
backend::{Backend, BackendWriteOp, OverlayedBackend},
error::{Fatal, FatalResult},
status::DisputeStatus,
DISPUTE_WINDOW,
};
const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes";
......@@ -72,7 +71,7 @@ impl Backend for DbBackend {
/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
fn write<I>(&mut self, ops: I) -> SubsystemResult<()>
fn write<I>(&mut self, ops: I) -> FatalResult<()>
where
I: IntoIterator<Item = BackendWriteOp>,
{
......@@ -98,7 +97,7 @@ impl Backend for DbBackend {
}
}
self.inner.write(tx).map_err(Into::into)
self.inner.write(tx).map_err(Fatal::DbWriteFailed)
}
}
......@@ -214,7 +213,7 @@ pub(crate) fn note_current_session(
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
current_session: SessionIndex,
) -> SubsystemResult<()> {
let new_earliest = current_session.saturating_sub(DISPUTE_WINDOW);
let new_earliest = current_session.saturating_sub(DISPUTE_WINDOW.get());
match overlay_db.load_earliest_session()? {
None => {
// First launch - write new-earliest.
......@@ -421,7 +420,7 @@ mod tests {
let prev_earliest_session = 0;
let new_earliest_session = 5;
let current_session = 5 + DISPUTE_WINDOW;
let current_session = 5 + DISPUTE_WINDOW.get();
let very_old = 3;
let slightly_old = 4;
......
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use futures::channel::oneshot;
use thiserror::Error;
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
SubsystemError,
};
use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime};
use super::{db, participation};
use crate::real::{CodecError, LOG_TARGET};
/// Errors for this subsystem.
#[derive(Debug, Error)]
#[error(transparent)]
pub enum Error {
/// All fatal errors.
Fatal(#[from] Fatal),
/// All nonfatal/potentially recoverable errors.
NonFatal(#[from] NonFatal),
}
/// General `Result` type for dispute coordinator.
pub type Result<R> = std::result::Result<R, Error>;
/// Result type with only fatal errors.
pub type FatalResult<R> = std::result::Result<R, Fatal>;
/// Result type with only non fatal errors.
pub type NonFatalResult<R> = std::result::Result<R, NonFatal>;
impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
match o {
runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)),
runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)),
}
}
}
impl From<SubsystemError> for Error {
fn from(o: SubsystemError) -> Self {
match o {
SubsystemError::Context(msg) => Self::Fatal(Fatal::SubsystemContext(msg)),
_ => Self::NonFatal(NonFatal::Subsystem(o)),
}
}
}
/// Fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum Fatal {
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information {0}")]
Runtime(#[from] runtime::Fatal),
/// We received a legacy `SubystemError::Context` error which is considered fatal.
#[error("SubsystemError::Context error: {0}")]
SubsystemContext(String),
/// `ctx.spawn` failed with an error.
#[error("Spawning a task failed: {0}")]
SpawnFailed(SubsystemError),
#[error("Participation worker receiver exhausted.")]
ParticipationWorkerReceiverExhausted,
/// Receiving subsystem message from overseer failed.
#[error("Receiving message from overseer failed: {0}")]
SubsystemReceive(#[source] SubsystemError),
#[error("Writing to database failed: {0}")]
DbWriteFailed(std::io::Error),
#[error("Oneshow for receiving block number from chain API got cancelled")]
CanceledBlockNumber,
#[error("Retrieving block number from chain API failed with error: {0}")]
ChainApiBlockNumber(ChainApiError),
}
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum NonFatal {
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
#[error(transparent)]
ChainApi(#[from] ChainApiError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error("Dispute import confirmation send failed (receiver canceled)")]
DisputeImportOneshotSend,
#[error(transparent)]
Subsystem(SubsystemError),
#[error(transparent)]
Codec(#[from] CodecError),
/// `RollingSessionWindow` was not able to retrieve `SessionInfo`s.
#[error("Sessions unavailable in `RollingSessionWindow`: {0}")]
RollingSessionWindow(#[from] SessionsUnavailable),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information: {0}")]
Runtime(#[from] runtime::NonFatal),
#[error(transparent)]
QueueError(#[from] participation::QueueError),
}
impl From<db::v1::Error> for Error {
fn from(err: db::v1::Error) -> Self {
match err {
db::v1::Error::Io(io) => Self::NonFatal(NonFatal::Io(io)),
db::v1::Error::Codec(e) => Self::NonFatal(NonFatal::Codec(e)),
}
}
}
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>) -> std::result::Result<(), Fatal> {
match result {
Err(Error::Fatal(f)) => Err(f),
Err(Error::NonFatal(error)) => {
error.log();
Ok(())
},
Ok(()) => Ok(()),
}
}
impl NonFatal {
/// Log a `NonFatal`.
pub fn log(self) {
match self {
// don't spam the log with spurious errors
Self::RuntimeApi(_) | Self::Oneshot(_) =>
tracing::debug!(target: LOG_TARGET, error = ?self),
// it's worth reporting otherwise
_ => tracing::warn!(target: LOG_TARGET, error = ?self),
}
}
}
This diff is collapsed.
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{
cmp::{Ord, Ordering, PartialOrd},
collections::{BTreeMap, HashSet},
};
use futures::channel::oneshot;
use polkadot_node_subsystem::{
messages::ChainApiMessage, ActivatedLeaf, ActiveLeavesUpdate, SubsystemSender,
};
use polkadot_node_subsystem_util::runtime::get_candidate_events;
use polkadot_primitives::v1::{BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash};
use super::{
error::{Fatal, FatalResult, Result},
LOG_TARGET,
};
#[cfg(test)]
mod tests;
/// Provider of `CandidateComparator` for candidates.
pub struct OrderingProvider {
/// All candidates we have seen included, which not yet have been finalized.
included_candidates: HashSet<CandidateHash>,
/// including block -> `CandidateHash`
///
/// We need this to clean up `included_candidates` on `ActiveLeavesUpdate`.
candidates_by_block_number: BTreeMap<BlockNumber, HashSet<CandidateHash>>,
}
/// `Comparator` for ordering of disputes for candidates.
///
/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness
/// between chains in case of equally old disputes.
///
/// Objective ordering between nodes is important in case of lots disputes, so nodes will pull in
/// the same direction and work on resolving the same disputes first. This ensures that we will
/// conclude some disputes, even if there are lots of them. While any objective ordering would
/// suffice for this goal, ordering by age ensures we are not only resolving disputes, but also
/// resolve the oldest one first, which are also the most urgent and important ones to resolve.
///
/// Note: That by `oldest` we mean oldest in terms of relay chain block number, for any block
/// number that has not yet been finalized. If a block has been finalized already it should be
/// treated as low priority when it comes to disputes, as even in the case of a negative outcome,
/// we are already too late. The ordering mechanism here serves to prevent this from happening in
/// the first place.
#[derive(Copy, Clone)]
pub struct CandidateComparator {
/// Block number of the relay parent.
///
/// Important, so we will be participating in oldest disputes first.