From 4c1de66d5dfffed20a40ca95961fde745d9efd33 Mon Sep 17 00:00:00 2001 From: Andronik Ordian <write@reusable.software> Date: Tue, 2 Mar 2021 11:40:06 +0100 Subject: [PATCH] subsystem for issuing background connection requests (#2538) * initial subsystem for issuing connection requests * finish the initial impl * integrate with the overseer * rename to gossip-support * fix renamings leftover * remove run_inner * fix compilation * random subset of sqrt --- polkadot/Cargo.lock | 15 ++ polkadot/Cargo.toml | 1 + polkadot/node/network/bridge/src/lib.rs | 1 + .../node/network/gossip-support/Cargo.toml | 16 ++ .../node/network/gossip-support/src/lib.rs | 170 ++++++++++++++++++ polkadot/node/overseer/src/lib.rs | 132 ++++++++++---- polkadot/node/service/Cargo.toml | 2 + polkadot/node/service/src/lib.rs | 2 + polkadot/node/subsystem/src/messages.rs | 8 + 9 files changed, 317 insertions(+), 30 deletions(-) create mode 100644 polkadot/node/network/gossip-support/Cargo.toml create mode 100644 polkadot/node/network/gossip-support/src/lib.rs diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 9cdadaa65f8..cc85da55c13 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5273,6 +5273,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "polkadot-gossip-support" +version = "0.1.0" +dependencies = [ + "futures 0.3.12", + "polkadot-node-network-protocol", + "polkadot-node-subsystem", + "polkadot-node-subsystem-util", + "polkadot-primitives", + "rand 0.8.3", + "tracing", + "tracing-futures", +] + [[package]] name = "polkadot-network-bridge" version = "0.1.0" @@ -5994,6 +6008,7 @@ dependencies = [ "polkadot-availability-distribution", "polkadot-availability-recovery", "polkadot-collator-protocol", + "polkadot-gossip-support", "polkadot-network-bridge", "polkadot-node-collation-generation", "polkadot-node-core-approval-voting", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 080126db433..b1b5068fb67 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -64,6 +64,7 @@ members = [ "node/network/availability-distribution", "node/network/availability-recovery", "node/network/collator-protocol", + "node/network/gossip-support", "node/overseer", "node/primitives", "node/service", diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 25baae8b467..a49363846ce 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -1547,6 +1547,7 @@ mod tests { AllMessages::CollationGeneration(_) => unreachable!("Not interested in network events"), AllMessages::ApprovalVoting(_) => unreachable!("Not interested in network events"), AllMessages::ApprovalDistribution(_) => { cnt += 1; } + AllMessages::GossipSupport(_) => unreachable!("Not interested in network events"), // Add variants here as needed, `{ cnt += 1; }` for those that need to be // notified, `unreachable!()` for those that should not. } diff --git a/polkadot/node/network/gossip-support/Cargo.toml b/polkadot/node/network/gossip-support/Cargo.toml new file mode 100644 index 00000000000..8400f031b83 --- /dev/null +++ b/polkadot/node/network/gossip-support/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "polkadot-gossip-support" +version = "0.1.0" +authors = ["Parity Technologies <admin@parity.io>"] +edition = "2018" + +[dependencies] +polkadot-node-network-protocol = { path = "../protocol" } +polkadot-node-subsystem = { path = "../../subsystem" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } +polkadot-primitives = { path = "../../../primitives" } + +futures = "0.3.8" +tracing = "0.1.22" +tracing-futures = "0.2.4" +rand = "0.8.3" diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs new file mode 100644 index 00000000000..8ade01a0ced --- /dev/null +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -0,0 +1,170 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! This subsystem is responsible for keeping track of session changes +//! and issuing a connection request to the validators relevant to +//! the gossiping subsystems on every new session. + +use futures::FutureExt as _; +use rand::seq::SliceRandom as _; +use polkadot_node_subsystem::{ + messages::{ + GossipSupportMessage, + }, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, + Subsystem, SpawnedSubsystem, SubsystemContext, +}; +use polkadot_node_subsystem_util::{ + validator_discovery::{ConnectionRequest, self}, + self as util, +}; +use polkadot_primitives::v1::{ + Hash, ValidatorId, SessionIndex, +}; +use polkadot_node_network_protocol::peer_set::PeerSet; + +const LOG_TARGET: &str = "gossip_support"; + +/// The Gossip Support subsystem. +pub struct GossipSupport {} + +#[derive(Default)] +struct State { + last_session_index: Option<SessionIndex>, + /// when we overwrite this, it automatically drops the previous request + last_connection_request: Option<ConnectionRequest>, +} + +impl GossipSupport { + /// Create a new instance of the [`GossipSupport`] subsystem. + pub fn new() -> Self { + Self {} + } + + #[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))] + async fn run<Context>(self, mut ctx: Context) + where + Context: SubsystemContext<Message = GossipSupportMessage>, + { + let mut state = State::default(); + loop { + let message = match ctx.recv().await { + Ok(message) => message, + Err(e) => { + tracing::debug!( + target: LOG_TARGET, + err = ?e, + "Failed to receive a message from Overseer, exiting" + ); + return; + }, + }; + match message { + FromOverseer::Communication { .. } => {}, + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated, + .. + })) => { + tracing::trace!(target: LOG_TARGET, "active leaves signal"); + + let leaves = activated.into_iter().map(|(h, _)| h); + if let Err(e) = state.handle_active_leaves(&mut ctx, leaves).await { + tracing::debug!(target: LOG_TARGET, "Error {}", e); + } + } + FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, _number)) => {}, + FromOverseer::Signal(OverseerSignal::Conclude) => { + return; + } + } + } + } +} + +async fn determine_relevant_validators( + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + _session: SessionIndex, +) -> Result<Vec<ValidatorId>, util::Error> { + let validators = util::request_validators_ctx(relay_parent, ctx).await?.await??; + Ok(validators) +} + +// chooses a random subset of sqrt(v.len()), but at least 25 elements +fn choose_random_subset<T>(mut v: Vec<T>) -> Vec<T> { + let mut rng = rand::thread_rng(); + v.shuffle(&mut rng); + + let sqrt = (v.len() as f64).sqrt() as usize; + let len = std::cmp::max(25, sqrt); + v.truncate(len); + v +} + +impl State { + /// 1. Determine if the current session index has changed. + /// 2. If it has, determine relevant validators + /// and issue a connection request. + async fn handle_active_leaves( + &mut self, + ctx: &mut impl SubsystemContext, + leaves: impl Iterator<Item = Hash>, + ) -> Result<(), util::Error> { + for leaf in leaves { + let current_index = util::request_session_index_for_child_ctx(leaf, ctx).await?.await??; + let maybe_new_session = match self.last_session_index { + Some(i) if i <= current_index => None, + _ => Some((current_index, leaf)), + }; + + if let Some((new_session, relay_parent)) = maybe_new_session { + tracing::debug!(target: LOG_TARGET, "New session detected {}", new_session); + let validators = determine_relevant_validators(ctx, relay_parent, new_session).await?; + let validators = choose_random_subset(validators); + tracing::debug!(target: LOG_TARGET, "Issuing a connection request to {:?}", validators); + + let request = validator_discovery::connect_to_validators_in_session( + ctx, + relay_parent, + validators, + PeerSet::Validation, + new_session, + ).await?; + + self.last_session_index = Some(new_session); + self.last_connection_request = Some(request); + } + } + + Ok(()) + } +} + +impl<C> Subsystem<C> for GossipSupport +where + C: SubsystemContext<Message = GossipSupportMessage> + Sync + Send, +{ + fn start(self, ctx: C) -> SpawnedSubsystem { + let future = self.run(ctx) + .map(|_| Ok(())) + .boxed(); + + SpawnedSubsystem { + name: "gossip-support-subsystem", + future, + } + } +} diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index c7578d671a7..ae88e53805d 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -86,7 +86,7 @@ use polkadot_subsystem::messages::{ ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage, AvailabilityRecoveryMessage, ApprovalDistributionMessage, - ApprovalVotingMessage, + ApprovalVotingMessage, GossipSupportMessage, }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, @@ -565,6 +565,9 @@ pub struct Overseer<S> { /// An Approval Voting subsystem. approval_voting_subsystem: OverseenSubsystem<ApprovalVotingMessage>, + /// A Gossip Support subsystem. + gossip_support_subsystem: OverseenSubsystem<GossipSupportMessage>, + /// Spawner to spawn tasks to. s: S, @@ -606,6 +609,7 @@ pub struct Overseer<S> { pub struct AllSubsystems< CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (), PoVD = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (), + GS = (), > { /// A candidate validation subsystem. pub candidate_validation: CV, @@ -643,10 +647,12 @@ pub struct AllSubsystems< pub approval_distribution: ApD, /// An Approval Voting subsystem. pub approval_voting: ApV, + /// A Connection Request Issuer subsystem. + pub gossip_support: GS, } -impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> - AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> +impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> + AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { /// Create a new instance of [`AllSubsystems`]. /// @@ -679,6 +685,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, > { AllSubsystems { candidate_validation: DummySubsystem, @@ -699,6 +706,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: DummySubsystem, approval_distribution: DummySubsystem, approval_voting: DummySubsystem, + gossip_support: DummySubsystem, } } @@ -706,7 +714,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_candidate_validation<NEW>( self, candidate_validation: NEW, - ) -> AllSubsystems<NEW, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<NEW, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation, candidate_backing: self.candidate_backing, @@ -726,6 +734,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -733,7 +742,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_candidate_backing<NEW>( self, candidate_backing: NEW, - ) -> AllSubsystems<CV, NEW, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, NEW, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing, @@ -753,6 +762,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -760,7 +770,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_candidate_selection<NEW>( self, candidate_selection: NEW, - ) -> AllSubsystems<CV, CB, NEW, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, NEW, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -780,6 +790,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -787,7 +798,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_statement_distribution<NEW>( self, statement_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, NEW, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, NEW, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -807,6 +818,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -814,7 +826,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_availability_distribution<NEW>( self, availability_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, NEW, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, NEW, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -834,6 +846,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -841,7 +854,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_availability_recovery<NEW>( self, availability_recovery: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, NEW, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, NEW, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -861,6 +874,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -868,7 +882,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_bitfield_signing<NEW>( self, bitfield_signing: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, NEW, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, NEW, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -888,6 +902,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -895,7 +910,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_bitfield_distribution<NEW>( self, bitfield_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, NEW, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, NEW, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -915,6 +930,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -922,7 +938,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_provisioner<NEW>( self, provisioner: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, NEW, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, NEW, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -942,6 +958,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -949,7 +966,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_pov_distribution<NEW>( self, pov_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, NEW, RA, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, NEW, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -969,6 +986,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -976,7 +994,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_runtime_api<NEW>( self, runtime_api: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, NEW, AS, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, NEW, AS, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -996,6 +1014,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -1003,7 +1022,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_availability_store<NEW>( self, availability_store: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, NEW, NB, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, NEW, NB, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1023,6 +1042,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -1030,7 +1050,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_network_bridge<NEW>( self, network_bridge: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NEW, CA, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NEW, CA, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1050,6 +1070,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -1057,7 +1078,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_chain_api<NEW>( self, chain_api: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, NEW, CG, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, NEW, CG, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1077,6 +1098,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -1084,7 +1106,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_collation_generation<NEW>( self, collation_generation: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, NEW, CP, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, NEW, CP, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1104,6 +1126,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -1111,7 +1134,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_collator_protocol<NEW>( self, collator_protocol: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, NEW, ApD, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, NEW, ApD, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1131,6 +1154,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -1138,7 +1162,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_approval_distribution<NEW>( self, approval_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, NEW, ApV> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, NEW, ApV, GS> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1158,6 +1182,7 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collator_protocol: self.collator_protocol, approval_distribution, approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } @@ -1165,7 +1190,35 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> pub fn replace_approval_voting<NEW>( self, approval_voting: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, NEW> { + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, NEW, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + pov_distribution: self.pov_distribution, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting, + gossip_support: self.gossip_support, + } + } + + /// Replace the `gossip_support` instance in `self`. + pub fn replace_gossip_support<NEW>( + self, + gossip_support: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, NEW> { AllSubsystems { candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, @@ -1184,7 +1237,8 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV> collation_generation: self.collation_generation, collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, - approval_voting + approval_voting: self.approval_voting, + gossip_support, } } } @@ -1395,9 +1449,9 @@ where /// # /// # }); } /// ``` - pub fn new<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV>( + pub fn new<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>( leaves: impl IntoIterator<Item = BlockInfo>, - all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV>, + all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>, prometheus_registry: Option<&prometheus::Registry>, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> @@ -1420,6 +1474,7 @@ where CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send, ApD: Subsystem<OverseerSubsystemContext<ApprovalDistributionMessage>> + Send, ApV: Subsystem<OverseerSubsystemContext<ApprovalVotingMessage>> + Send, + GS: Subsystem<OverseerSubsystemContext<GossipSupportMessage>> + Send, { let (events_tx, events_rx) = metered::channel(CHANNEL_CAPACITY, "overseer_events"); @@ -1631,6 +1686,16 @@ where TaskKind::Blocking, )?; + let gossip_support_subsystem = spawn( + &mut s, + &mut running_subsystems, + metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + all_subsystems.gossip_support, + &metrics, + &mut seed, + TaskKind::Regular, + )?; + let leaves = leaves .into_iter() .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)) @@ -1658,6 +1723,7 @@ where collator_protocol_subsystem, approval_distribution_subsystem, approval_voting_subsystem, + gossip_support_subsystem, s, running_subsystems, to_overseer_rx: to_overseer_rx.fuse(), @@ -1692,6 +1758,7 @@ where let _ = self.collation_generation_subsystem.send_signal(OverseerSignal::Conclude).await; let _ = self.approval_distribution_subsystem.send_signal(OverseerSignal::Conclude).await; let _ = self.approval_voting_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.gossip_support_subsystem.send_signal(OverseerSignal::Conclude).await; let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); @@ -1861,7 +1928,8 @@ where self.collator_protocol_subsystem.send_signal(signal.clone()).await?; self.collation_generation_subsystem.send_signal(signal.clone()).await?; self.approval_distribution_subsystem.send_signal(signal.clone()).await?; - self.approval_voting_subsystem.send_signal(signal).await?; + self.approval_voting_subsystem.send_signal(signal.clone()).await?; + self.gossip_support_subsystem.send_signal(signal).await?; Ok(()) } @@ -1925,6 +1993,9 @@ where AllMessages::ApprovalVoting(msg) => { self.approval_voting_subsystem.send_message(msg).await?; }, + AllMessages::GossipSupport(msg) => { + self.gossip_support_subsystem.send_message(msg).await?; + }, } Ok(()) @@ -2824,6 +2895,7 @@ mod tests { chain_api: subsystem.clone(), approval_distribution: subsystem.clone(), approval_voting: subsystem.clone(), + gossip_support: subsystem.clone(), }; let (overseer, mut handler) = Overseer::new( vec![], @@ -2843,7 +2915,7 @@ mod tests { }).await; // send a msg to each subsystem - // except for BitfieldSigning as the message is not instantiable + // except for BitfieldSigning and GossipSupport as the messages are not instantiable handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await; handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await; @@ -2852,6 +2924,7 @@ mod tests { handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await; handler.send_msg(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await; // handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await; + // handler.send_msg(AllMessages::GossipSupport(test_bitfield_signing_msg())).await; handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await; handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await; handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await; @@ -2867,13 +2940,12 @@ mod tests { select! { res = overseer_fut => { - const NUM_SUBSYSTEMS: usize = 18; + const NUM_SUBSYSTEMS: usize = 19; assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); - // x2 because of broadcast_signal on startup assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); - // -2 for BitfieldSigning and Availability distribution - assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 2); + // -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution + assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 3); assert!(res.is_ok()); }, diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 650768b50c2..c41d19fa659 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -83,6 +83,7 @@ polkadot-availability-bitfield-distribution = { path = "../network/bitfield-dist polkadot-availability-distribution = { path = "../network/availability-distribution", optional = true } polkadot-availability-recovery = { path = "../network/availability-recovery", optional = true } polkadot-collator-protocol = { path = "../network/collator-protocol", optional = true } +polkadot-gossip-support = { path = "../network/gossip-support", optional = true } polkadot-network-bridge = { path = "../network/bridge", optional = true } polkadot-node-collation-generation = { path = "../collation-generation", optional = true } polkadot-node-core-av-store = { path = "../core/av-store", optional = true } @@ -128,6 +129,7 @@ real-overseer = [ "polkadot-availability-distribution", "polkadot-availability-recovery", "polkadot-collator-protocol", + "polkadot-gossip-support", "polkadot-network-bridge", "polkadot-node-collation-generation", "polkadot-node-core-backing", diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 20598ef4424..17d255c5e60 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -418,6 +418,7 @@ where use polkadot_availability_recovery::AvailabilityRecoverySubsystem; use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem; use polkadot_node_core_approval_voting::ApprovalVotingSubsystem; + use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem; let all_subsystems = AllSubsystems { availability_distribution: AvailabilityDistributionSubsystem::new( @@ -497,6 +498,7 @@ where approval_voting_config, keystore.clone(), )?, + gossip_support: GossipSupportSubsystem::new(), }; Overseer::new( diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index b5985fb16d3..7800b463170 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -682,6 +682,11 @@ pub enum ApprovalDistributionMessage { NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::ApprovalDistributionMessage>), } +/// Message to the Gossip Support subsystem. +#[derive(Debug)] +pub enum GossipSupportMessage { +} + /// A message type tying together all message types that are used across Subsystems. #[subsystem_dispatch_gen(NetworkBridgeEvent<protocol_v1::ValidationProtocol>)] #[derive(Debug, derive_more::From)] @@ -735,6 +740,9 @@ pub enum AllMessages { ApprovalVoting(ApprovalVotingMessage), /// Message for the Approval Distribution subsystem. ApprovalDistribution(ApprovalDistributionMessage), + /// Message for the Gossip Support subsystem. + #[skip] + GossipSupport(GossipSupportMessage), } impl From<IncomingRequest<req_res_v1::AvailabilityFetchingRequest>> for AllMessages { -- GitLab