Unverified Commit a3902fbf authored by Bernhard Schuster's avatar Bernhard Schuster Committed by GitHub
Browse files

add dispute metrics, some chores (#3842)



* rename: MsgFilter -> MessageInterceptor

* feat: add dispute metrics

* fixup

* test fixins

* fix metrics

* dummysubsystem export and trait fn fix

* chore: fmt

* undo unwanted changes

* foo

* pfmt

* fixup

* fixup

* revert

* some more

* Update node/malus/Cargo.toml
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Update node/core/dispute-coordinator/src/metrics.rs
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Update node/core/dispute-coordinator/src/metrics.rs
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Update node/core/dispute-coordinator/src/metrics.rs
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* add license header

* fix lockfile

* new with opts

* fmt

* Update node/core/dispute-coordinator/src/metrics.rs

* feature gate
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 9dc2d42b
Pipeline #157475 passed with stages
in 38 minutes and 36 seconds
......@@ -6977,12 +6977,15 @@ dependencies = [
"assert_matches",
"async-trait",
"color-eyre",
"futures 0.3.17",
"parity-util-mem",
"polkadot-cli",
"polkadot-node-core-candidate-validation",
"polkadot-node-core-pvf",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"sp-core",
"structopt",
]
......
......@@ -30,6 +30,8 @@ use kvdb::KeyValueDB;
use parity_scale_codec::{Decode, Encode, Error as CodecError};
use sc_keystore::LocalKeystore;
use crate::metrics::Metrics;
const LOG_TARGET: &str = "parachain::dispute-coordinator";
/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots.
......@@ -52,7 +54,7 @@ pub struct DisputeCoordinatorSubsystem {}
impl DisputeCoordinatorSubsystem {
/// Create a new instance of the subsystem.
pub fn new(_: Arc<dyn KeyValueDB>, _: Config, _: Arc<LocalKeystore>) -> Self {
pub fn new(_: Arc<dyn KeyValueDB>, _: Config, _: Arc<LocalKeystore>, _: Metrics) -> Self {
DisputeCoordinatorSubsystem {}
}
}
......
......@@ -25,6 +25,8 @@
//! another node, this will trigger the dispute participation subsystem to recover and validate the block and call
//! back to this subsystem.
mod metrics;
#[cfg(feature = "disputes")]
mod real;
#[cfg(feature = "disputes")]
......
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use polkadot_node_subsystem_util::metrics::{self, prometheus};
#[derive(Clone)]
struct MetricsInner {
/// Number of opened disputes.
open: prometheus::Counter<prometheus::U64>,
/// Votes of all disputes.
votes: prometheus::CounterVec<prometheus::U64>,
/// Conclusion across all disputes.
concluded: prometheus::CounterVec<prometheus::U64>,
}
/// Candidate validation metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
#[cfg(feature = "disputes")]
impl Metrics {
pub(crate) fn on_open(&self) {
if let Some(metrics) = &self.0 {
metrics.open.inc();
}
}
pub(crate) fn on_valid_vote(&self) {
if let Some(metrics) = &self.0 {
metrics.votes.with_label_values(&["valid"]).inc();
}
}
pub(crate) fn on_invalid_vote(&self) {
if let Some(metrics) = &self.0 {
metrics.votes.with_label_values(&["invalid"]).inc();
}
}
pub(crate) fn on_concluded_valid(&self) {
if let Some(metrics) = &self.0 {
metrics.concluded.with_label_values(&["valid"]).inc();
}
}
pub(crate) fn on_concluded_invalid(&self) {
if let Some(metrics) = &self.0 {
metrics.concluded.with_label_values(&["invalid"]).inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
open: prometheus::register(
prometheus::Counter::with_opts(prometheus::Opts::new(
"parachain_candidate_disputes_total",
"Total number of raised disputes.",
))?,
registry,
)?,
concluded: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_candidate_dispute_concluded",
"Concluded dispute votes, sorted by candidate is `valid` and `invalid`.",
),
&["validity"],
)?,
registry,
)?,
votes: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_candidate_dispute_votes",
"Accumulated dispute votes, sorted by candidate is `valid` and `invalid`.",
),
&["validity"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
......@@ -56,6 +56,7 @@ use kvdb::KeyValueDB;
use parity_scale_codec::{Decode, Encode, Error as CodecError};
use sc_keystore::LocalKeystore;
use crate::metrics::Metrics;
use backend::{Backend, OverlayedBackend};
use db::v1::{DbBackend, RecentDisputes};
......@@ -116,12 +117,18 @@ pub struct DisputeCoordinatorSubsystem {
config: Config,
store: Arc<dyn KeyValueDB>,
keystore: Arc<LocalKeystore>,
metrics: Metrics,
}
impl DisputeCoordinatorSubsystem {
/// Create a new instance of the subsystem.
pub fn new(store: Arc<dyn KeyValueDB>, config: Config, keystore: Arc<LocalKeystore>) -> Self {
DisputeCoordinatorSubsystem { store, config, keystore }
pub fn new(
store: Arc<dyn KeyValueDB>,
config: Config,
keystore: Arc<LocalKeystore>,
metrics: Metrics,
) -> Self {
DisputeCoordinatorSubsystem { store, config, keystore, metrics }
}
}
......@@ -329,6 +336,7 @@ where
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
recovery_state: Participation::Pending,
};
let metrics = &subsystem.metrics;
loop {
let mut overlay_db = OverlayedBackend::new(backend);
......@@ -348,7 +356,8 @@ where
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
FromOverseer::Communication { msg } =>
handle_incoming(ctx, &mut overlay_db, &mut state, msg, clock.now()).await?,
handle_incoming(ctx, &mut overlay_db, &mut state, msg, clock.now(), &metrics)
.await?,
}
if !overlay_db.is_empty() {
......@@ -518,6 +527,7 @@ async fn handle_incoming(
state: &mut State,
message: DisputeCoordinatorMessage,
now: Timestamp,
metrics: &Metrics,
) -> Result<(), Error> {
match message {
DisputeCoordinatorMessage::ImportStatements {
......@@ -537,6 +547,7 @@ async fn handle_incoming(
statements,
now,
pending_confirmation,
metrics,
)
.await?;
},
......@@ -578,6 +589,7 @@ async fn handle_incoming(
session,
valid,
now,
metrics,
)
.await?;
},
......@@ -635,6 +647,7 @@ async fn handle_import_statements(
statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
now: Timestamp,
pending_confirmation: oneshot::Sender<ImportStatementsResult>,
metrics: &Metrics,
) -> Result<(), Error> {
if state.highest_session.map_or(true, |h| session + DISPUTE_WINDOW < h) {
// It is not valid to participate in an ancient dispute (spam?).
......@@ -694,6 +707,7 @@ async fn handle_import_statements(
match statement.statement().clone() {
DisputeStatement::Valid(valid_kind) => {
metrics.on_valid_vote();
insert_into_statement_vec(
&mut votes.valid,
valid_kind,
......@@ -702,6 +716,7 @@ async fn handle_import_statements(
);
},
DisputeStatement::Invalid(invalid_kind) => {
metrics.on_invalid_vote();
insert_into_statement_vec(
&mut votes.invalid,
invalid_kind,
......@@ -784,6 +799,14 @@ async fn handle_import_statements(
);
return Ok(())
}
metrics.on_open();
if concluded_valid {
metrics.on_concluded_valid();
}
if concluded_invalid {
metrics.on_concluded_invalid();
}
}
// Only write when updated and vote is available.
......@@ -824,6 +847,7 @@ async fn issue_local_statement(
session: SessionIndex,
valid: bool,
now: Timestamp,
metrics: &Metrics,
) -> Result<(), Error> {
// Load session info.
let info = match state.rolling_session_window.session_info(session) {
......@@ -857,7 +881,6 @@ async fn issue_local_statement(
let voted_indices: HashSet<_> = voted_indices.into_iter().collect();
let controlled_indices = find_controlled_validator_indices(&state.keystore, &validators[..]);
for index in controlled_indices {
if voted_indices.contains(&index) {
continue
......@@ -914,6 +937,7 @@ async fn issue_local_statement(
statements,
now,
pending_confirmation,
metrics,
)
.await?;
match rx.await {
......
......@@ -286,6 +286,7 @@ impl TestState {
self.db.clone(),
self.config.clone(),
self.subsystem_keystore.clone(),
Metrics::default(),
);
let backend = DbBackend::new(self.db.clone(), self.config.column_config());
let subsystem_task = run(subsystem, ctx, backend, Box::new(self.clock.clone()));
......
......@@ -27,3 +27,8 @@ color-eyre = { version = "0.5.11", default-features = false }
assert_matches = "1.5"
structopt = "0.3.23"
async-trait = "0.1.51"
[dev-dependencies]
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.17", features = ["thread-pool"] }
......@@ -24,38 +24,52 @@ use polkadot_node_subsystem::*;
pub use polkadot_node_subsystem::{messages::AllMessages, overseer, FromOverseer};
use std::{future::Future, pin::Pin};
#[cfg(test)]
mod tests;
/// Filter incoming and outgoing messages.
pub trait MsgFilter: Send + Sync + Clone + 'static {
pub trait MessageInterceptor<Sender>: Send + Sync + Clone + 'static
where
Sender: overseer::SubsystemSender<Self::Message> + Clone + 'static,
{
/// The message type the original subsystem handles incoming.
type Message: Send + 'static;
/// Filter messages that are to be received by
/// the subsystem.
fn filter_in(&self, msg: FromOverseer<Self::Message>) -> Option<FromOverseer<Self::Message>> {
///
/// For non-trivial cases, the `sender` can be used to send
/// multiple messages after doing some additional processing.
fn intercept_incoming(
&self,
_sender: &mut Sender,
msg: FromOverseer<Self::Message>,
) -> Option<FromOverseer<Self::Message>> {
Some(msg)
}
/// Modify outgoing messages.
fn filter_out(&self, msg: AllMessages) -> Option<AllMessages> {
fn intercept_outgoing(&self, msg: AllMessages) -> Option<AllMessages> {
Some(msg)
}
}
/// A sender with the outgoing messages filtered.
#[derive(Clone)]
pub struct FilteredSender<Sender, Fil> {
pub struct InterceptedSender<Sender, Fil> {
inner: Sender,
message_filter: Fil,
}
#[async_trait::async_trait]
impl<Sender, Fil> overseer::SubsystemSender<AllMessages> for FilteredSender<Sender, Fil>
impl<Sender, Fil> overseer::SubsystemSender<AllMessages> for InterceptedSender<Sender, Fil>
where
Sender: overseer::SubsystemSender<AllMessages>,
Fil: MsgFilter,
Sender: overseer::SubsystemSender<AllMessages>
+ overseer::SubsystemSender<<Fil as MessageInterceptor<Sender>>::Message>,
Fil: MessageInterceptor<Sender>,
{
async fn send_message(&mut self, msg: AllMessages) {
if let Some(msg) = self.message_filter.filter_out(msg) {
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
self.inner.send_message(msg).await;
}
}
......@@ -71,26 +85,39 @@ where
}
fn send_unbounded_message(&mut self, msg: AllMessages) {
if let Some(msg) = self.message_filter.filter_out(msg) {
if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
self.inner.send_unbounded_message(msg);
}
}
}
/// A subsystem context, that filters the outgoing messages.
pub struct FilteredContext<Context: overseer::SubsystemContext + SubsystemContext, Fil: MsgFilter> {
pub struct InterceptedContext<Context, Fil>
where
Context: overseer::SubsystemContext + SubsystemContext,
Fil: MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>,
<Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
<Fil as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>>::Message,
>,
{
inner: Context,
message_filter: Fil,
sender: FilteredSender<<Context as overseer::SubsystemContext>::Sender, Fil>,
sender: InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>,
}
impl<Context, Fil> FilteredContext<Context, Fil>
impl<Context, Fil> InterceptedContext<Context, Fil>
where
Context: overseer::SubsystemContext + SubsystemContext,
Fil: MsgFilter<Message = <Context as overseer::SubsystemContext>::Message>,
Fil: MessageInterceptor<
<Context as overseer::SubsystemContext>::Sender,
Message = <Context as overseer::SubsystemContext>::Message,
>,
<Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
<Fil as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>>::Message,
>,
{
pub fn new(mut inner: Context, message_filter: Fil) -> Self {
let sender = FilteredSender::<<Context as overseer::SubsystemContext>::Sender, Fil> {
let sender = InterceptedSender::<<Context as overseer::SubsystemContext>::Sender, Fil> {
inner: inner.sender().clone(),
message_filter: message_filter.clone(),
};
......@@ -99,15 +126,21 @@ where
}
#[async_trait::async_trait]
impl<Context, Fil> overseer::SubsystemContext for FilteredContext<Context, Fil>
impl<Context, Fil> overseer::SubsystemContext for InterceptedContext<Context, Fil>
where
Context: overseer::SubsystemContext + SubsystemContext,
Fil: MsgFilter<Message = <Context as overseer::SubsystemContext>::Message>,
Fil: MessageInterceptor<
<Context as overseer::SubsystemContext>::Sender,
Message = <Context as overseer::SubsystemContext>::Message,
>,
<Context as overseer::SubsystemContext>::AllMessages:
From<<Context as overseer::SubsystemContext>::Message>,
<Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
<Fil as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>>::Message,
>,
{
type Message = <Context as overseer::SubsystemContext>::Message;
type Sender = FilteredSender<<Context as overseer::SubsystemContext>::Sender, Fil>;
type Sender = InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>;
type Error = <Context as overseer::SubsystemContext>::Error;
type AllMessages = <Context as overseer::SubsystemContext>::AllMessages;
type Signal = <Context as overseer::SubsystemContext>::Signal;
......@@ -117,7 +150,9 @@ where
match self.inner.try_recv().await? {
None => return Ok(None),
Some(msg) =>
if let Some(msg) = self.message_filter.filter_in(msg) {
if let Some(msg) =
self.message_filter.intercept_incoming(self.inner.sender(), msg)
{
return Ok(Some(msg))
},
}
......@@ -127,7 +162,7 @@ where
async fn recv(&mut self) -> SubsystemResult<FromOverseer<Self::Message>> {
loop {
let msg = self.inner.recv().await?;
if let Some(msg) = self.message_filter.filter_in(msg) {
if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
return Ok(msg)
}
}
......@@ -155,27 +190,33 @@ where
}
/// A subsystem to which incoming and outgoing filters are applied.
pub struct FilteredSubsystem<Sub, Fil> {
subsystem: Sub,
message_filter: Fil,
pub struct InterceptedSubsystem<Sub, Interceptor> {
pub subsystem: Sub,
pub message_interceptor: Interceptor,
}
impl<Sub, Fil> FilteredSubsystem<Sub, Fil> {
pub fn new(subsystem: Sub, message_filter: Fil) -> Self {
Self { subsystem, message_filter }
impl<Sub, Interceptor> InterceptedSubsystem<Sub, Interceptor> {
pub fn new(subsystem: Sub, message_interceptor: Interceptor) -> Self {
Self { subsystem, message_interceptor }
}
}
impl<Context, Sub, Fil> overseer::Subsystem<Context, SubsystemError> for FilteredSubsystem<Sub, Fil>
impl<Context, Sub, Interceptor> overseer::Subsystem<Context, SubsystemError> for InterceptedSubsystem<Sub, Interceptor>
where
Context: overseer::SubsystemContext + SubsystemContext + Sync + Send,
Sub: overseer::Subsystem<FilteredContext<Context, Fil>, SubsystemError>,
FilteredContext<Context, Fil>: overseer::SubsystemContext + SubsystemContext,
Fil: MsgFilter<Message = <Context as overseer::SubsystemContext>::Message>,
Sub: overseer::Subsystem<InterceptedContext<Context, Interceptor>, SubsystemError>,
InterceptedContext<Context, Interceptor>: overseer::SubsystemContext + SubsystemContext,
Interceptor: MessageInterceptor<
<Context as overseer::SubsystemContext>::Sender,
Message = <Context as overseer::SubsystemContext>::Message,
>,
<Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
<Interceptor as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>>::Message,
>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let ctx = FilteredContext::new(ctx, self.message_filter);
overseer::Subsystem::<FilteredContext<Context, Fil>, SubsystemError>::start(
let ctx = InterceptedContext::new(ctx, self.message_interceptor);
overseer::Subsystem::<InterceptedContext<Context, Interceptor>, SubsystemError>::start(
self.subsystem,
ctx,
)
......
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use polkadot_node_subsystem_test_helpers::*;
use polkadot_node_subsystem::{
messages::{AllMessages, AvailabilityStoreMessage},
overseer::{gen::TimeoutExt, Subsystem},
DummySubsystem,
};
#[derive(Clone, Debug)]
struct BlackHoleInterceptor;
impl<Sender> MessageInterceptor<Sender> for BlackHoleInterceptor
where
Sender: overseer::SubsystemSender<AllMessages>
+ overseer::SubsystemSender<AvailabilityStoreMessage>
+ Clone
+ 'static,
{
type Message = AvailabilityStoreMessage;
fn intercept_incoming(
&self,
_sender: &mut Sender,
msg: FromOverseer<Self::Message>,
) -> Option<FromOverseer<Self::Message>> {
match msg {
FromOverseer::Communication { msg: _msg } => None,
// to conclude the test cleanly
sig => Some(sig),
}
}
}
async fn overseer_send<T: Into<AllMessages>>(overseer: &mut TestSubsystemContextHandle<T>, msg: T) {
overseer.send(FromOverseer::Communication { msg }).await;
}
#[test]
fn integrity_test() {
let pool = sp_core::testing::TaskExecutor::new();
let (context, mut overseer) = make_subsystem_context(pool);
let sub = DummySubsystem;
let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor);
// Try to send a message we know is going to be filtered.
let test_fut = async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
};
let subsystem = async move {
sub_intercepted.start(context).future.await.unwrap();
};
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
futures::executor::block_on(futures::future::join(
async move {
let mut overseer = test_fut.await;
overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
},
subsystem,
))
.1;
}
......@@ -27,7 +27,7 @@ use polkadot_cli::{
create_default_subsystems,
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer,
OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost, ProvideRuntimeApi, SpawnNamed,
OverseerGen, OverseerGenArgs, ParachainHost, ProvideRuntimeApi, SpawnNamed,
},
Cli,
};
......@@ -35,11 +35,15 @@ use polkadot_cli::{
// Import extra types relevant to the particular
// subsystem.
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
use polkadot_node_subsystem::messages::CandidateValidationMessage;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateValidationMessage},
overseer::{self, OverseerHandle},
FromOverseer,
};
// Filter wrapping related types.
use malus::*;