From 44a8aa23d5c4419608a8c685a5e2bcaa0a2acaf2 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster <bernhard@ahoi.io> Date: Wed, 16 Jun 2021 12:45:21 +0200 Subject: [PATCH] malus - mockable overseer mvp (#3224) --- polkadot/Cargo.lock | 19 +- polkadot/Cargo.toml | 1 + polkadot/cli/Cargo.toml | 2 + polkadot/cli/src/command.rs | 95 +++--- polkadot/cli/src/lib.rs | 5 +- .../node/core/candidate-validation/src/lib.rs | 1 + polkadot/node/malus/Cargo.toml | 28 ++ polkadot/node/malus/src/lib.rs | 184 ++++++++++++ polkadot/node/malus/src/variant-a.rs | 110 +++++++ polkadot/node/service/Cargo.toml | 1 + polkadot/node/service/src/grandpa_support.rs | 11 +- polkadot/node/service/src/lib.rs | 228 ++++---------- polkadot/node/service/src/overseer.rs | 280 ++++++++++++++++++ .../node/service/src/parachains_db/mod.rs | 3 + polkadot/node/test/service/src/lib.rs | 3 +- .../adder/collator/src/main.rs | 1 + 16 files changed, 753 insertions(+), 219 deletions(-) create mode 100644 polkadot/node/malus/Cargo.toml create mode 100644 polkadot/node/malus/src/lib.rs create mode 100644 polkadot/node/malus/src/variant-a.rs create mode 100644 polkadot/node/service/src/overseer.rs diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 7f6f2395c28..e8ed4e235e3 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -356,9 +356,9 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" [[package]] name = "async-trait" -version = "0.1.48" +version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36ea56748e10732c49404c153638a15ec3d6211ec5ff35d9bb20e13b93576adf" +checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" dependencies = [ "proc-macro2", "quote", @@ -6819,6 +6819,21 @@ dependencies = [ "substrate-test-client", ] +[[package]] +name = "polkadot-test-malus" +version = "0.9.4" +dependencies = [ + "assert_matches", + "async-trait", + "color-eyre", + "parity-util-mem", + "polkadot-cli", + "polkadot-node-core-candidate-validation", + "polkadot-node-subsystem", + "polkadot-node-subsystem-util", + "structopt", +] + [[package]] name = "polkadot-test-runtime" version = "0.9.5" diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index f7c341a99aa..be022f7b877 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -64,6 +64,7 @@ members = [ "node/network/collator-protocol", "node/network/gossip-support", "node/overseer", + "node/malus", "node/primitives", "node/service", "node/subsystem", diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index fd7125115b4..65a9267e511 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -67,3 +67,5 @@ try-runtime = [ "service/try-runtime" ] kusama-native = [ "service/kusama-native" ] westend-native = [ "service/westend-native" ] rococo-native = [ "service/rococo-native" ] + +malus = [ "full-node", "service/malus" ] diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index 6c3575b55e9..3a245580de7 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -181,53 +181,64 @@ fn ensure_dev(spec: &Box<dyn service::ChainSpec>) -> std::result::Result<(), Str } } -/// Parses polkadot specific CLI arguments and run the service. -pub fn run() -> Result<()> { - let cli = Cli::from_args(); +/// Launch a node, accepting arguments just like a regular node, +/// accepts an alternative overseer generator, to adjust behavior +/// for integration tests as needed. +#[cfg(feature = "malus")] +pub fn run_node(cli: Cli, overseer_gen: impl service::OverseerGen) -> Result<()> { + run_node_inner(cli, overseer_gen) +} - match &cli.subcommand { - None => { - let runner = cli.create_runner(&cli.run.base) - .map_err(Error::from)?; - let chain_spec = &runner.config().chain_spec; +fn run_node_inner(cli: Cli, overseer_gen: impl service::OverseerGen) -> Result<()> { + let runner = cli.create_runner(&cli.run.base) + .map_err(Error::from)?; + let chain_spec = &runner.config().chain_spec; - set_default_ss58_version(chain_spec); + set_default_ss58_version(chain_spec); - let grandpa_pause = if cli.run.grandpa_pause.is_empty() { - None - } else { - Some((cli.run.grandpa_pause[0], cli.run.grandpa_pause[1])) - }; + let grandpa_pause = if cli.run.grandpa_pause.is_empty() { + None + } else { + Some((cli.run.grandpa_pause[0], cli.run.grandpa_pause[1])) + }; - if chain_spec.is_kusama() { - info!("----------------------------"); - info!("This chain is not in any way"); - info!(" endorsed by the "); - info!(" KUSAMA FOUNDATION "); - info!("----------------------------"); - } + if chain_spec.is_kusama() { + info!("----------------------------"); + info!("This chain is not in any way"); + info!(" endorsed by the "); + info!(" KUSAMA FOUNDATION "); + info!("----------------------------"); + } - let jaeger_agent = cli.run.jaeger_agent; - - runner.run_node_until_exit(move |config| async move { - let role = config.role.clone(); - - match role { - #[cfg(feature = "browser")] - Role::Light => service::build_light(config).map(|(task_manager, _)| task_manager).map_err(Into::into), - #[cfg(not(feature = "browser"))] - Role::Light => Err(Error::Other("Light client not enabled".into())), - _ => service::build_full( - config, - service::IsCollator::No, - grandpa_pause, - cli.run.no_beefy, - jaeger_agent, - None, - ).map(|full| full.task_manager).map_err(Into::into) - } - }) - }, + let jaeger_agent = cli.run.jaeger_agent; + + runner.run_node_until_exit(move |config| async move { + let role = config.role.clone(); + + match role { + #[cfg(feature = "browser")] + Role::Light => service::build_light(config).map(|(task_manager, _)| task_manager).map_err(Into::into), + #[cfg(not(feature = "browser"))] + Role::Light => Err(Error::Other("Light client not enabled".into())), + _ => service::build_full( + config, + service::IsCollator::No, + grandpa_pause, + cli.run.no_beefy, + jaeger_agent, + None, + overseer_gen, + ).map(|full| full.task_manager).map_err(Into::into) + } + }) +} + +/// Parses polkadot specific CLI arguments and run the service. +pub fn run() -> Result<()> { + let cli = Cli::from_args(); + + match &cli.subcommand { + None => run_node_inner(cli, service::RealOverseerGen), Some(Subcommand::BuildSpec(cmd)) => { let runner = cli.create_runner(cmd)?; Ok(runner.sync_run(|config| { diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 43b8da81f96..02120081ee5 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -28,9 +28,12 @@ mod command; pub use service::{ self, ProvideRuntimeApi, CoreApi, IdentifyVariant, - Block, RuntimeApiCollection, TFullClient + Block, RuntimeApiCollection, TFullClient, }; +#[cfg(feature = "malus")] +pub use service::create_default_subsystems; + #[cfg(feature = "cli")] pub use cli::*; diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 4efec16db1c..b05ff9cabc3 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -56,6 +56,7 @@ use async_trait::async_trait; const LOG_TARGET: &'static str = "parachain::candidate-validation"; /// Configuration for the candidate validation subsystem +#[derive(Clone)] pub struct Config { /// The path where candidate validation can store compiled artifacts for PVFs. pub artifacts_cache_path: PathBuf, diff --git a/polkadot/node/malus/Cargo.toml b/polkadot/node/malus/Cargo.toml new file mode 100644 index 00000000000..30c1da1d405 --- /dev/null +++ b/polkadot/node/malus/Cargo.toml @@ -0,0 +1,28 @@ +[lib] +name = "malus" +path = "src/lib.rs" + +[[bin]] +name = "malus-variant-a" +path = "src/variant-a.rs" + +[package] +name = "polkadot-test-malus" +description = "Misbehaving nodes for local testnets, system and simnet tests." +license = "GPL-3.0-only" +version = "0.9.4" +authors = ["Parity Technologies <admin@parity.io>"] +edition = "2018" +readme = "README.md" +publish = false + +[dependencies] +polkadot-cli = { path = "../../cli", default-features = false, features = [ "cli", "malus" ] } +polkadot-node-subsystem = { path = "../subsystem" } +polkadot-node-subsystem-util = { path = "../subsystem-util" } +polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" } +parity-util-mem = { version = "*", default-features = false, features = ["jemalloc-global"] } +color-eyre = { version = "0.5.11", default-features = false } +assert_matches = "1.5" +structopt = "0.3.21" +async-trait = "0.1.50" diff --git a/polkadot/node/malus/src/lib.rs b/polkadot/node/malus/src/lib.rs new file mode 100644 index 00000000000..6cd45a3c4de --- /dev/null +++ b/polkadot/node/malus/src/lib.rs @@ -0,0 +1,184 @@ +// Copyright 2017-2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! A small set of wrapping types to cover most of our adversary test cases. +//! +//! This allows types with internal mutability to synchronize across +//! multiple subsystems and intercept or replace incoming and outgoing +//! messages on the overseer level. + +use polkadot_node_subsystem::*; +pub use polkadot_node_subsystem::{messages::AllMessages, FromOverseer}; +use std::future::Future; +use std::pin::Pin; + +/// Filter incoming and outgoing messages. +pub trait MsgFilter: Send + Sync + Clone + 'static { + /// The message type the original subsystm handles incoming. + type Message: Send + 'static; + + /// Filter messages that are to be received by + /// the subsystem. + fn filter_in(&self, msg: FromOverseer<Self::Message>) -> Option<FromOverseer<Self::Message>> { + Some(msg) + } + + /// Modify outgoing messages. + fn filter_out(&self, msg: AllMessages) -> Option<AllMessages> { + Some(msg) + } +} + +/// A sender with the outgoing messages filtered. +#[derive(Clone)] +pub struct FilteredSender<Sender, Fil> { + inner: Sender, + message_filter: Fil, +} + +#[async_trait::async_trait] +impl<Sender, Fil> SubsystemSender for FilteredSender<Sender, Fil> +where + Sender: SubsystemSender, + Fil: MsgFilter, +{ + async fn send_message(&mut self, msg: AllMessages) { + if let Some(msg) = self.message_filter.filter_out(msg) { + self.inner.send_message(msg).await; + } + } + + async fn send_messages<T>(&mut self, msgs: T) + where + T: IntoIterator<Item = AllMessages> + Send, + T::IntoIter: Send, + { + for msg in msgs { + self.send_message(msg).await; + } + } + + fn send_unbounded_message(&mut self, msg: AllMessages) { + if let Some(msg) = self.message_filter.filter_out(msg) { + self.inner.send_unbounded_message(msg); + } + } +} + +/// A subsystem context, that filters the outgoing messages. +pub struct FilteredContext<Context: SubsystemContext, Fil: MsgFilter> { + inner: Context, + message_filter: Fil, + sender: FilteredSender<<Context as SubsystemContext>::Sender, Fil>, +} + +impl<Context, Fil> FilteredContext<Context, Fil> +where + Context: SubsystemContext, + Fil: MsgFilter<Message = <Context as SubsystemContext>::Message>, +{ + pub fn new(mut inner: Context, message_filter: Fil) -> Self { + let sender = FilteredSender::<<Context as SubsystemContext>::Sender, Fil> { + inner: inner.sender().clone(), + message_filter: message_filter.clone(), + }; + Self { + inner, + message_filter, + sender, + } + } +} + +#[async_trait::async_trait] +impl<Context, Fil> SubsystemContext for FilteredContext<Context, Fil> +where + Context: SubsystemContext, + Fil: MsgFilter<Message = <Context as SubsystemContext>::Message>, +{ + type Message = <Context as SubsystemContext>::Message; + type Sender = FilteredSender<<Context as SubsystemContext>::Sender, Fil>; + + async fn try_recv(&mut self) -> Result<Option<FromOverseer<Self::Message>>, ()> { + loop { + match self.inner.try_recv().await? { + None => return Ok(None), + Some(msg) => { + if let Some(msg) = self.message_filter.filter_in(msg) { + return Ok(Some(msg)); + } + } + } + } + } + + async fn recv(&mut self) -> SubsystemResult<FromOverseer<Self::Message>> { + loop { + let msg = self.inner.recv().await?; + if let Some(msg) = self.message_filter.filter_in(msg) { + return Ok(msg); + } + } + } + + async fn spawn( + &mut self, + name: &'static str, + s: Pin<Box<dyn Future<Output = ()> + Send>>, + ) -> SubsystemResult<()> { + self.inner.spawn(name, s).await + } + + async fn spawn_blocking( + &mut self, + name: &'static str, + s: Pin<Box<dyn Future<Output = ()> + Send>>, + ) -> SubsystemResult<()> { + self.inner.spawn_blocking(name, s).await + } + + fn sender(&mut self) -> &mut Self::Sender { + &mut self.sender + } +} + +/// A subsystem to which incoming and outgoing filters are applied. +pub struct FilteredSubsystem<Sub, Fil> { + subsystem: Sub, + message_filter: Fil, +} + +impl<Sub, Fil> FilteredSubsystem<Sub, Fil> { + pub fn new(subsystem: Sub, message_filter: Fil) -> Self { + Self { + subsystem, + message_filter, + } + } +} + +impl<Context, Sub, Fil> Subsystem<Context> for FilteredSubsystem<Sub, Fil> +where + Context: SubsystemContext + Sync + Send, + Sub: Subsystem<FilteredContext<Context, Fil>>, + FilteredContext<Context, Fil>: SubsystemContext, + Fil: MsgFilter<Message = <Context as SubsystemContext>::Message>, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + let ctx = FilteredContext::new(ctx, self.message_filter); + Subsystem::<FilteredContext<Context, Fil>>::start(self.subsystem, ctx) + } +} diff --git a/polkadot/node/malus/src/variant-a.rs b/polkadot/node/malus/src/variant-a.rs new file mode 100644 index 00000000000..1e9cb7928cb --- /dev/null +++ b/polkadot/node/malus/src/variant-a.rs @@ -0,0 +1,110 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! A malicious overseer. +//! +//! An example on how to use the `OverseerGen` pattern to +//! instantiate a modified subsystem implementation +//! for usage with simnet/gurke. + +#![allow(missing_docs)] + +use color_eyre::eyre; +use polkadot_cli::{ + create_default_subsystems, + service::{ + AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer, + OverseerGen, OverseerGenArgs, OverseerHandler, ParachainHost, ProvideRuntimeApi, + SpawnNamed, + }, + Cli, +}; + +// Import extra types relevant to the particular +// subsystem. +use polkadot_node_core_candidate_validation::{CandidateValidationSubsystem, Metrics}; +use polkadot_node_subsystem::messages::CandidateValidationMessage; +use polkadot_node_subsystem_util::metrics::Metrics as _; + +// Filter wrapping related types. +use malus::*; + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use structopt::StructOpt; + +/// Silly example, just drop every second outgoing message. +#[derive(Clone, Default, Debug)] +struct Skippy(Arc<AtomicUsize>); + +impl MsgFilter for Skippy { + type Message = CandidateValidationMessage; + + fn filter_in(&self, msg: FromOverseer<Self::Message>) -> Option<FromOverseer<Self::Message>> { + if self.0.fetch_add(1, Ordering::Relaxed) % 2 == 0 { + Some(msg) + } else { + None + } + } + fn filter_out(&self, msg: AllMessages) -> Option<AllMessages> { + Some(msg) + } +} + +/// Generates an overseer that exposes bad behavior. +struct BehaveMaleficient; + +impl OverseerGen for BehaveMaleficient { + fn generate<'a, Spawner, RuntimeClient>( + &self, + args: OverseerGenArgs<'a, Spawner, RuntimeClient>, + ) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error> + where + RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, + RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, + Spawner: 'static + SpawnNamed + Clone + Unpin, + { + let spawner = args.spawner.clone(); + let leaves = args.leaves.clone(); + let runtime_client = args.runtime_client.clone(); + let registry = args.registry.clone(); + let candidate_validation_config = args.candidate_validation_config.clone(); + // modify the subsystem(s) as needed: + let all_subsystems = create_default_subsystems(args)?.replace_candidate_validation( + // create the filtered subsystem + FilteredSubsystem::new( + CandidateValidationSubsystem::with_config( + candidate_validation_config, + Metrics::register(registry)?, + ), + Skippy::default(), + ), + ); + + Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner) + .map_err(|e| e.into()) + } +} + +fn main() -> eyre::Result<()> { + color_eyre::install()?; + let cli = Cli::from_args(); + assert_matches::assert_matches!(cli.subcommand, None); + polkadot_cli::run_node(cli, BehaveMaleficient)?; + Ok(()) +} diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index a35e6cdd686..29ba03c5322 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -159,3 +159,4 @@ try-runtime = [ "westend-runtime/try-runtime", "rococo-runtime/try-runtime", ] +malus = ["full-node"] diff --git a/polkadot/node/service/src/grandpa_support.rs b/polkadot/node/service/src/grandpa_support.rs index f2d87a8d563..f2c1145cf3a 100644 --- a/polkadot/node/service/src/grandpa_support.rs +++ b/polkadot/node/service/src/grandpa_support.rs @@ -18,15 +18,13 @@ use std::sync::Arc; -use polkadot_primitives::v1::{BlockNumber, Hash}; - use sp_runtime::traits::{Block as BlockT, NumberFor}; use sp_runtime::generic::BlockId; use sp_runtime::traits::Header as _; #[cfg(feature = "full-node")] use { - polkadot_primitives::v1::{Block as PolkadotBlock, Header as PolkadotHeader}, + polkadot_primitives::v1::{Hash, Block as PolkadotBlock, Header as PolkadotHeader}, polkadot_subsystem::messages::ApprovalVotingMessage, prometheus_endpoint::{self, Registry}, polkadot_overseer::OverseerHandler, @@ -71,9 +69,10 @@ impl ApprovalCheckingVotingRule { } } +#[cfg(feature = "full-node")] #[derive(Debug, PartialEq)] +/// Vote explicitly on the given hash. enum ParachainVotingRuleTarget<H, N> { - /// Vote explicitly on the given hash. Explicit((H, N)), /// Vote on the current target. Current, @@ -81,6 +80,7 @@ enum ParachainVotingRuleTarget<H, N> { Base, } +#[cfg(feature = "full-node")] fn approval_checking_vote_to_grandpa_vote<H, N: PartialOrd>( approval_checking_vote: Option<(H, N)>, current_number: N, @@ -99,7 +99,8 @@ fn approval_checking_vote_to_grandpa_vote<H, N: PartialOrd>( /// The maximum amount of unfinalized blocks we are willing to allow due to approval checking lag. /// This is a safety net that should be removed at some point in the future. -const MAX_APPROVAL_CHECKING_FINALITY_LAG: BlockNumber = 50; +#[cfg(feature = "full-node")] +const MAX_APPROVAL_CHECKING_FINALITY_LAG: polkadot_primitives::v1::BlockNumber = 50; #[cfg(feature = "full-node")] impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingVotingRule diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index f3074ac863f..be78081ba1d 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -22,6 +22,17 @@ pub mod chain_spec; mod grandpa_support; mod parachains_db; +#[cfg(feature = "full-node")] +mod overseer; + +#[cfg(feature = "full-node")] +pub use self::overseer::{ + OverseerGen, + OverseerGenArgs, + RealOverseerGen, + create_default_subsystems, +}; + #[cfg(feature = "full-node")] use { tracing::info, @@ -30,22 +41,26 @@ use { polkadot_node_core_av_store::Error as AvailabilityError, polkadot_node_core_approval_voting::Config as ApprovalVotingConfig, polkadot_node_core_candidate_validation::Config as CandidateValidationConfig, - polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler}, - polkadot_primitives::v1::ParachainHost, - sc_authority_discovery::Service as AuthorityDiscoveryService, - sp_authority_discovery::AuthorityDiscoveryApi, - sp_blockchain::HeaderBackend, + polkadot_overseer::BlockInfo, sp_trie::PrefixedMemoryDB, - sc_client_api::{AuxStore, ExecutorProvider}, - sc_keystore::LocalKeystore, - sp_consensus_babe::BabeApi, + sc_client_api::ExecutorProvider, grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}, beefy_primitives::ecdsa::AuthoritySignature as BeefySignature, sp_runtime::traits::Header as HeaderT, }; -use sp_core::traits::SpawnNamed; +#[cfg(feature = "full-node")] +pub use { + sp_blockchain::HeaderBackend, + sp_consensus_babe::BabeApi, + sp_authority_discovery::AuthorityDiscoveryApi, + sc_client_api::AuxStore, + polkadot_primitives::v1::ParachainHost, + polkadot_overseer::{Overseer, OverseerHandler}, +}; +pub use sp_core::traits::SpawnNamed; +#[cfg(feature = "full-node")] use polkadot_subsystem::jaeger; use std::sync::Arc; @@ -53,7 +68,9 @@ use std::time::Duration; use prometheus_endpoint::Registry; use service::RpcHandlers; -use telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle}; +#[cfg(feature = "full-node")] +use telemetry::{Telemetry, TelemetryWorkerHandle}; +use telemetry::TelemetryWorker; #[cfg(feature = "rococo-native")] pub use polkadot_client::RococoExecutor; @@ -92,6 +109,7 @@ pub use rococo_runtime; pub use westend_runtime; /// The maximum number of active leaves we forward to the [`Overseer`] on startup. +#[cfg(any(test,feature = "full-node"))] const MAX_ACTIVE_LEAVES: usize = 4; #[derive(thiserror::Error, Debug)] @@ -182,6 +200,7 @@ fn set_prometheus_registry(config: &mut Configuration) -> Result<(), Error> { /// Initialize the `Jeager` collector. The destination must listen /// on the given address and port for `UDP` packets. +#[cfg(any(test,feature = "full-node"))] fn jaeger_launch_collector_with_agent(spawner: impl SpawnNamed, config: &Configuration, agent: Option<std::net::SocketAddr>) -> Result<(), Error> { if let Some(agent) = agent { let cfg = jaeger::JaegerConfig::builder() @@ -404,145 +423,6 @@ fn new_partial<RuntimeApi, Executor>( }) } -#[cfg(feature = "full-node")] -fn real_overseer<Spawner, RuntimeClient>( - leaves: impl IntoIterator<Item = BlockInfo>, - keystore: Arc<LocalKeystore>, - runtime_client: Arc<RuntimeClient>, - parachains_db: Arc<dyn kvdb::KeyValueDB>, - availability_config: AvailabilityConfig, - approval_voting_config: ApprovalVotingConfig, - network_service: Arc<sc_network::NetworkService<Block, Hash>>, - authority_discovery: AuthorityDiscoveryService, - request_multiplexer: RequestMultiplexer, - registry: Option<&Registry>, - spawner: Spawner, - is_collator: IsCollator, - candidate_validation_config: CandidateValidationConfig, -) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error> -where - RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, - RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, - Spawner: 'static + SpawnNamed + Clone + Unpin, -{ - use polkadot_node_subsystem_util::metrics::Metrics; - - use polkadot_availability_distribution::AvailabilityDistributionSubsystem; - use polkadot_node_core_av_store::AvailabilityStoreSubsystem; - use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem; - use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem; - use polkadot_node_core_backing::CandidateBackingSubsystem; - use polkadot_node_core_candidate_validation::CandidateValidationSubsystem; - use polkadot_node_core_chain_api::ChainApiSubsystem; - use polkadot_node_collation_generation::CollationGenerationSubsystem; - use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide}; - use polkadot_network_bridge::NetworkBridge as NetworkBridgeSubsystem; - use polkadot_node_core_provisioner::ProvisioningSubsystem as ProvisionerSubsystem; - use polkadot_node_core_runtime_api::RuntimeApiSubsystem; - use polkadot_statement_distribution::StatementDistribution as StatementDistributionSubsystem; - use polkadot_availability_recovery::AvailabilityRecoverySubsystem; - use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem; - use polkadot_node_core_approval_voting::ApprovalVotingSubsystem; - use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem; - - let all_subsystems = AllSubsystems { - availability_distribution: AvailabilityDistributionSubsystem::new( - keystore.clone(), - Metrics::register(registry)?, - ), - availability_recovery: AvailabilityRecoverySubsystem::with_chunks_only( - ), - availability_store: AvailabilityStoreSubsystem::new( - parachains_db.clone(), - availability_config, - Metrics::register(registry)?, - ), - bitfield_distribution: BitfieldDistributionSubsystem::new( - Metrics::register(registry)?, - ), - bitfield_signing: BitfieldSigningSubsystem::new( - spawner.clone(), - keystore.clone(), - Metrics::register(registry)?, - ), - candidate_backing: CandidateBackingSubsystem::new( - spawner.clone(), - keystore.clone(), - Metrics::register(registry)?, - ), - candidate_validation: CandidateValidationSubsystem::with_config( - candidate_validation_config, - Metrics::register(registry)?, - ), - chain_api: ChainApiSubsystem::new( - runtime_client.clone(), - Metrics::register(registry)?, - ), - collation_generation: CollationGenerationSubsystem::new( - Metrics::register(registry)?, - ), - collator_protocol: { - let side = match is_collator { - IsCollator::Yes(collator_pair) => ProtocolSide::Collator( - network_service.local_peer_id().clone(), - collator_pair, - Metrics::register(registry)?, - ), - IsCollator::No => ProtocolSide::Validator { - keystore: keystore.clone(), - eviction_policy: Default::default(), - metrics: Metrics::register(registry)?, - }, - }; - CollatorProtocolSubsystem::new( - side, - ) - }, - network_bridge: NetworkBridgeSubsystem::new( - network_service.clone(), - authority_discovery, - request_multiplexer, - Box::new(network_service.clone()), - Metrics::register(registry)?, - ), - provisioner: ProvisionerSubsystem::new( - spawner.clone(), - (), - Metrics::register(registry)?, - ), - runtime_api: RuntimeApiSubsystem::new( - runtime_client.clone(), - Metrics::register(registry)?, - spawner.clone(), - ), - statement_distribution: StatementDistributionSubsystem::new( - keystore.clone(), - Metrics::register(registry)?, - ), - approval_distribution: ApprovalDistributionSubsystem::new( - Metrics::register(registry)?, - ), - approval_voting: ApprovalVotingSubsystem::with_config( - approval_voting_config, - parachains_db, - keystore.clone(), - Box::new(network_service.clone()), - Metrics::register(registry)?, - ), - gossip_support: GossipSupportSubsystem::new( - keystore.clone(), - ), - }; - - Overseer::new( - leaves, - all_subsystems, - registry, - runtime_client.clone(), - spawner, - ).map_err(|e| e.into()) -} - #[cfg(feature = "full-node")] pub struct NewFull<C> { pub task_manager: TaskManager, @@ -652,7 +532,7 @@ where /// This is an advanced feature and not recommended for general use. Generally, `build_full` is /// a better choice. #[cfg(feature = "full-node")] -pub fn new_full<RuntimeApi, Executor>( +pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( mut config: Configuration, is_collator: IsCollator, grandpa_pause: Option<(u32, u32)>, @@ -660,12 +540,14 @@ pub fn new_full<RuntimeApi, Executor>( jaeger_agent: Option<std::net::SocketAddr>, telemetry_worker_handle: Option<TelemetryWorkerHandle>, program_path: Option<std::path::PathBuf>, + overseer_gen: OverseerGenerator, ) -> Result<NewFull<Arc<FullClient<RuntimeApi, Executor>>>, Error> where RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, Executor>> + Send + Sync + 'static, RuntimeApi::RuntimeApi: RuntimeApiCollection<StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>, Executor: NativeExecutionDispatch + 'static, + OverseerGenerator: OverseerGen, { let role = config.role.clone(); let force_authoring = config.force_authoring; @@ -840,20 +722,25 @@ pub fn new_full<RuntimeApi, Executor>( .and_then(move |k| authority_discovery_service.map(|a| (a, k))); let overseer_handler = if let Some((authority_discovery_service, keystore)) = maybe_params { - let (overseer, overseer_handler) = real_overseer( - active_leaves, - keystore, - overseer_client.clone(), - parachains_db, - availability_config, - approval_voting_config, - network.clone(), - authority_discovery_service, - request_multiplexer, - prometheus_registry.as_ref(), - spawner, - is_collator, - candidate_validation_config, + let (overseer, overseer_handler) = overseer_gen.generate::< + service::SpawnTaskHandle, + FullClient<RuntimeApi, Executor>, + >( + OverseerGenArgs { + leaves: active_leaves, + keystore, + runtime_client: overseer_client.clone(), + parachains_db, + availability_config, + approval_voting_config, + network_service: network.clone(), + authority_discovery_service, + request_multiplexer, + registry: prometheus_registry.as_ref(), + spawner, + is_collator, + candidate_validation_config, + } )?; let overseer_handler_clone = overseer_handler.clone(); @@ -1285,10 +1172,11 @@ pub fn build_full( disable_beefy: bool, jaeger_agent: Option<std::net::SocketAddr>, telemetry_worker_handle: Option<TelemetryWorkerHandle>, + overseer_gen: impl OverseerGen, ) -> Result<NewFull<Client>, Error> { #[cfg(feature = "rococo-native")] if config.chain_spec.is_rococo() || config.chain_spec.is_wococo() { - return new_full::<rococo_runtime::RuntimeApi, RococoExecutor>( + return new_full::<rococo_runtime::RuntimeApi, RococoExecutor, _>( config, is_collator, grandpa_pause, @@ -1296,12 +1184,13 @@ pub fn build_full( jaeger_agent, telemetry_worker_handle, None, + overseer_gen, ).map(|full| full.with_client(Client::Rococo)) } #[cfg(feature = "kusama-native")] if config.chain_spec.is_kusama() { - return new_full::<kusama_runtime::RuntimeApi, KusamaExecutor>( + return new_full::<kusama_runtime::RuntimeApi, KusamaExecutor, _>( config, is_collator, grandpa_pause, @@ -1309,12 +1198,13 @@ pub fn build_full( jaeger_agent, telemetry_worker_handle, None, + overseer_gen, ).map(|full| full.with_client(Client::Kusama)) } #[cfg(feature = "westend-native")] if config.chain_spec.is_westend() { - return new_full::<westend_runtime::RuntimeApi, WestendExecutor>( + return new_full::<westend_runtime::RuntimeApi, WestendExecutor, _>( config, is_collator, grandpa_pause, @@ -1322,10 +1212,11 @@ pub fn build_full( jaeger_agent, telemetry_worker_handle, None, + overseer_gen, ).map(|full| full.with_client(Client::Westend)) } - new_full::<polkadot_runtime::RuntimeApi, PolkadotExecutor>( + new_full::<polkadot_runtime::RuntimeApi, PolkadotExecutor, _>( config, is_collator, grandpa_pause, @@ -1333,5 +1224,6 @@ pub fn build_full( jaeger_agent, telemetry_worker_handle, None, + overseer_gen, ).map(|full| full.with_client(Client::Polkadot)) } diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs new file mode 100644 index 00000000000..968240074b8 --- /dev/null +++ b/polkadot/node/service/src/overseer.rs @@ -0,0 +1,280 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +use super::{ + Error, + Registry, + IsCollator, + Block, + SpawnNamed, + Hash, + AuthorityDiscoveryApi, +}; +use std::sync::Arc; +use polkadot_network_bridge::RequestMultiplexer; +use polkadot_node_core_av_store::Config as AvailabilityConfig; +use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig; +use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig; +use polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler}; +use polkadot_primitives::v1::ParachainHost; +use sc_authority_discovery::Service as AuthorityDiscoveryService; +use sp_api::ProvideRuntimeApi; +use sp_blockchain::HeaderBackend; +use sc_client_api::AuxStore; +use sc_keystore::LocalKeystore; +use sp_consensus_babe::BabeApi; + +pub use polkadot_availability_distribution::AvailabilityDistributionSubsystem; +pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem; +pub use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem; +pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem; +pub use polkadot_node_core_backing::CandidateBackingSubsystem; +pub use polkadot_node_core_candidate_validation::CandidateValidationSubsystem; +pub use polkadot_node_core_chain_api::ChainApiSubsystem; +pub use polkadot_node_collation_generation::CollationGenerationSubsystem; +pub use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide}; +pub use polkadot_network_bridge::NetworkBridge as NetworkBridgeSubsystem; +pub use polkadot_node_core_provisioner::ProvisioningSubsystem as ProvisionerSubsystem; +pub use polkadot_node_core_runtime_api::RuntimeApiSubsystem; +pub use polkadot_statement_distribution::StatementDistribution as StatementDistributionSubsystem; +pub use polkadot_availability_recovery::AvailabilityRecoverySubsystem; +pub use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem; +pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem; +pub use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem; + +/// Arguments passed for overseer construction. +pub struct OverseerGenArgs<'a, Spawner, RuntimeClient> where + RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, + RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, + Spawner: 'static + SpawnNamed + Clone + Unpin, +{ + /// Set of initial relay chain leaves to track. + pub leaves: Vec<BlockInfo>, + /// The keystore to use for i.e. validator keys. + pub keystore: Arc<LocalKeystore>, + /// Runtime client generic, providing the `ProvieRuntimeApi` trait besides others. + pub runtime_client: Arc<RuntimeClient>, + /// The underlying key value store for the parachains. + pub parachains_db: Arc<dyn kvdb::KeyValueDB>, + /// Configuration for the availability store subsystem. + pub availability_config: AvailabilityConfig, + /// Configuration for the approval voting subsystem. + pub approval_voting_config: ApprovalVotingConfig, + /// Underlying network service implementation. + pub network_service: Arc<sc_network::NetworkService<Block, Hash>>, + /// Underlying authority discovery service. + pub authority_discovery_service: AuthorityDiscoveryService, + /// A multiplexer to arbitrate incoming `IncomingRequest`s from the network. + pub request_multiplexer: RequestMultiplexer, + /// Prometheus registry, commonly used for production systems, less so for test. + pub registry: Option<&'a Registry>, + /// Task spawner to be used throughout the overseer and the APIs it provides. + pub spawner: Spawner, + /// Determines the behavior of the collator. + pub is_collator: IsCollator, + /// Configuration for the candidate validation subsystem. + pub candidate_validation_config: CandidateValidationConfig, +} + +/// Create a default, unaltered set of subsystems. +/// +/// A convenience for usage with malus, to avoid +/// repetitive code across multiple behavior strain implementations. +pub fn create_default_subsystems<'a, Spawner, RuntimeClient> +( + OverseerGenArgs { + keystore, + runtime_client, + parachains_db, + availability_config, + approval_voting_config, + network_service, + authority_discovery_service, + request_multiplexer, + registry, + spawner, + is_collator, + candidate_validation_config, + .. + } : OverseerGenArgs<'a, Spawner, RuntimeClient> +) -> Result< + AllSubsystems< + CandidateValidationSubsystem, + CandidateBackingSubsystem<Spawner>, + StatementDistributionSubsystem, + AvailabilityDistributionSubsystem, + AvailabilityRecoverySubsystem, + BitfieldSigningSubsystem<Spawner>, + BitfieldDistributionSubsystem, + ProvisionerSubsystem<Spawner>, + RuntimeApiSubsystem<RuntimeClient>, + AvailabilityStoreSubsystem, + NetworkBridgeSubsystem<Arc<sc_network::NetworkService<Block, Hash>>, AuthorityDiscoveryService>, + ChainApiSubsystem<RuntimeClient>, + CollationGenerationSubsystem, + CollatorProtocolSubsystem, + ApprovalDistributionSubsystem, + ApprovalVotingSubsystem, + GossipSupportSubsystem, +>, + Error +> +where + RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, + RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, + Spawner: 'static + SpawnNamed + Clone + Unpin +{ + use polkadot_node_subsystem_util::metrics::Metrics; + + let all_subsystems = AllSubsystems { + availability_distribution: AvailabilityDistributionSubsystem::new( + keystore.clone(), + Metrics::register(registry)?, + ), + availability_recovery: AvailabilityRecoverySubsystem::with_chunks_only( + ), + availability_store: AvailabilityStoreSubsystem::new( + parachains_db.clone(), + availability_config, + Metrics::register(registry)?, + ), + bitfield_distribution: BitfieldDistributionSubsystem::new( + Metrics::register(registry)?, + ), + bitfield_signing: BitfieldSigningSubsystem::new( + spawner.clone(), + keystore.clone(), + Metrics::register(registry)?, + ), + candidate_backing: CandidateBackingSubsystem::new( + spawner.clone(), + keystore.clone(), + Metrics::register(registry)?, + ), + candidate_validation: CandidateValidationSubsystem::with_config( + candidate_validation_config, + Metrics::register(registry)?, + ), + chain_api: ChainApiSubsystem::new( + runtime_client.clone(), + Metrics::register(registry)?, + ), + collation_generation: CollationGenerationSubsystem::new( + Metrics::register(registry)?, + ), + collator_protocol: { + let side = match is_collator { + IsCollator::Yes(collator_pair) => ProtocolSide::Collator( + network_service.local_peer_id().clone(), + collator_pair, + Metrics::register(registry)?, + ), + IsCollator::No => ProtocolSide::Validator { + keystore: keystore.clone(), + eviction_policy: Default::default(), + metrics: Metrics::register(registry)?, + }, + }; + CollatorProtocolSubsystem::new( + side, + ) + }, + network_bridge: NetworkBridgeSubsystem::new( + network_service.clone(), + authority_discovery_service, + request_multiplexer, + Box::new(network_service.clone()), + Metrics::register(registry)?, + ), + provisioner: ProvisionerSubsystem::new( + spawner.clone(), + (), + Metrics::register(registry)?, + ), + runtime_api: RuntimeApiSubsystem::new( + runtime_client.clone(), + Metrics::register(registry)?, + spawner.clone(), + ), + statement_distribution: StatementDistributionSubsystem::new( + keystore.clone(), + Metrics::register(registry)?, + ), + approval_distribution: ApprovalDistributionSubsystem::new( + Metrics::register(registry)?, + ), + approval_voting: ApprovalVotingSubsystem::with_config( + approval_voting_config, + parachains_db, + keystore.clone(), + Box::new(network_service.clone()), + Metrics::register(registry)?, + ), + gossip_support: GossipSupportSubsystem::new( + keystore.clone(), + ), + }; + Ok(all_subsystems) +} + + +/// Trait for the `fn` generating the overseer. +/// +/// Default behavior is to create an unmodified overseer, as `RealOverseerGen` +/// would do. +pub trait OverseerGen { + /// Overwrite the full generation of the overseer, including the subsystems. + fn generate<'a, Spawner, RuntimeClient>(&self, args: OverseerGenArgs<'a, Spawner, RuntimeClient>) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error> + where + RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, + RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, + Spawner: 'static + SpawnNamed + Clone + Unpin { + let gen = RealOverseerGen; + RealOverseerGen::generate::<Spawner, RuntimeClient>(&gen, args) + } + // It would be nice to make `create_subsystems` part of this trait, + // but the amount of generic arguments that would be required as + // as consequence make this rather annoying to implement and use. +} + +/// The regular set of subsystems. +pub struct RealOverseerGen; + +impl OverseerGen for RealOverseerGen { + fn generate<'a, Spawner, RuntimeClient>(&self, + args : OverseerGenArgs<'a, Spawner, RuntimeClient> + ) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error> + where + RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, + RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, + Spawner: 'static + SpawnNamed + Clone + Unpin + { + let spawner = args.spawner.clone(); + let leaves = args.leaves.clone(); + let runtime_client = args.runtime_client.clone(); + let registry = args.registry.clone(); + + let all_subsystems = create_default_subsystems::<Spawner, RuntimeClient>(args)?; + + Overseer::new( + leaves, + all_subsystems, + registry, + runtime_client, + spawner, + ).map_err(|e| e.into()) + } +} diff --git a/polkadot/node/service/src/parachains_db/mod.rs b/polkadot/node/service/src/parachains_db/mod.rs index 8853a6aebad..c9c86fad964 100644 --- a/polkadot/node/service/src/parachains_db/mod.rs +++ b/polkadot/node/service/src/parachains_db/mod.rs @@ -24,6 +24,7 @@ use { mod upgrade; +#[cfg(any(test,feature = "full-node"))] mod columns { pub const NUM_COLUMNS: u32 = 3; @@ -34,6 +35,7 @@ mod columns { } /// Columns used by different subsystems. +#[cfg(any(test,feature = "full-node"))] #[derive(Debug, Clone)] pub struct ColumnsConfig { /// The column used by the av-store for data. @@ -45,6 +47,7 @@ pub struct ColumnsConfig { } /// The real columns used by the parachains DB. +#[cfg(any(test,feature = "full-node"))] pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig { col_availability_data: columns::COL_AVAILABILITY_DATA, col_availability_meta: columns::COL_AVAILABILITY_META, diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs index c345c96875d..bb3bff46a46 100644 --- a/polkadot/node/test/service/src/lib.rs +++ b/polkadot/node/test/service/src/lib.rs @@ -78,7 +78,7 @@ pub fn new_full( NewFull<Arc<Client>>, Error, > { - polkadot_service::new_full::<polkadot_test_runtime::RuntimeApi, PolkadotTestExecutor>( + polkadot_service::new_full::<polkadot_test_runtime::RuntimeApi, PolkadotTestExecutor, _>( config, is_collator, None, @@ -86,6 +86,7 @@ pub fn new_full( None, None, worker_program_path, + polkadot_service::RealOverseerGen, ) } diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index 68a89dc3f40..2b6b219f0d3 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -65,6 +65,7 @@ fn main() -> Result<()> { true, None, None, + polkadot_service::RealOverseerGen, ).map_err(|e| e.to_string())?; let mut overseer_handler = full_node .overseer_handler -- GitLab