From 98082c53269242cc979de8665edba1b6da454cc6 Mon Sep 17 00:00:00 2001 From: Andronik Ordian <write@reusable.software> Date: Thu, 1 Apr 2021 23:51:01 +0200 Subject: [PATCH] gossip: move authorities request to runtime api subsystem (#2798) --- polkadot/Cargo.lock | 3 +- polkadot/node/core/runtime-api/Cargo.toml | 1 + polkadot/node/core/runtime-api/src/cache.rs | 23 ++++++++ polkadot/node/core/runtime-api/src/lib.rs | 47 +++++++++++++++-- .../node/network/gossip-support/Cargo.toml | 2 - .../node/network/gossip-support/src/lib.rs | 52 ++++++------------- polkadot/node/service/src/lib.rs | 1 - polkadot/node/subsystem-util/src/lib.rs | 3 ++ polkadot/node/subsystem/src/messages.rs | 2 + 9 files changed, 87 insertions(+), 47 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 49ee4697f8b..5138e3b7ab9 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5636,9 +5636,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-util", "polkadot-primitives", - "sp-api", "sp-application-crypto", - "sp-authority-discovery", "sp-keystore", "tracing", ] @@ -5890,6 +5888,7 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-primitives", "sp-api", + "sp-authority-discovery", "sp-consensus-babe", "sp-core", "tracing", diff --git a/polkadot/node/core/runtime-api/Cargo.toml b/polkadot/node/core/runtime-api/Cargo.toml index 5d0889bf5a5..c552a019ee7 100644 --- a/polkadot/node/core/runtime-api/Cargo.toml +++ b/polkadot/node/core/runtime-api/Cargo.toml @@ -11,6 +11,7 @@ memory-lru = "0.1.0" parity-util-mem = { version = "0.9.0", default-features = false } sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/core/runtime-api/src/cache.rs b/polkadot/node/core/runtime-api/src/cache.rs index 1dce87e68c2..1bc6f682ff6 100644 --- a/polkadot/node/core/runtime-api/src/cache.rs +++ b/polkadot/node/core/runtime-api/src/cache.rs @@ -19,6 +19,7 @@ use polkadot_primitives::v1::{ CoreState, GroupRotationInfo, InboundDownwardMessage, InboundHrmpMessage, Hash, PersistedValidationData, Id as ParaId, OccupiedCoreAssumption, SessionIndex, SessionInfo, ValidationCode, ValidatorId, ValidatorIndex, + AuthorityDiscoveryId, }; use sp_consensus_babe::Epoch; use parity_util_mem::{MallocSizeOf, MallocSizeOfExt}; @@ -28,6 +29,7 @@ use memory_lru::{MemoryLruCache, ResidentSize}; use std::collections::btree_map::BTreeMap; +const AUTHORITIES_CACHE_SIZE: usize = 128 * 1024; const VALIDATORS_CACHE_SIZE: usize = 64 * 1024; const VALIDATOR_GROUPS_CACHE_SIZE: usize = 64 * 1024; const AVAILABILITY_CORES_CACHE_SIZE: usize = 64 * 1024; @@ -59,7 +61,18 @@ impl<T> ResidentSize for DoesNotAllocate<T> { } } +// this is an ugly workaround for `AuthorityDiscoveryId` +// not implementing `MallocSizeOf` +struct VecOfDoesNotAllocate<T>(Vec<T>); + +impl<T> ResidentSize for VecOfDoesNotAllocate<T> { + fn resident_size(&self) -> usize { + std::mem::size_of::<T>() * self.0.capacity() + } +} + pub(crate) struct RequestResultCache { + authorities: MemoryLruCache<Hash, VecOfDoesNotAllocate<AuthorityDiscoveryId>>, validators: MemoryLruCache<Hash, ResidentSizeOf<Vec<ValidatorId>>>, validator_groups: MemoryLruCache<Hash, ResidentSizeOf<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>>, availability_cores: MemoryLruCache<Hash, ResidentSizeOf<Vec<CoreState>>>, @@ -79,6 +92,7 @@ pub(crate) struct RequestResultCache { impl Default for RequestResultCache { fn default() -> Self { Self { + authorities: MemoryLruCache::new(AUTHORITIES_CACHE_SIZE), validators: MemoryLruCache::new(VALIDATORS_CACHE_SIZE), validator_groups: MemoryLruCache::new(VALIDATOR_GROUPS_CACHE_SIZE), availability_cores: MemoryLruCache::new(AVAILABILITY_CORES_CACHE_SIZE), @@ -98,6 +112,14 @@ impl Default for RequestResultCache { } impl RequestResultCache { + pub(crate) fn authorities(&mut self, relay_parent: &Hash) -> Option<&Vec<AuthorityDiscoveryId>> { + self.authorities.get(relay_parent).map(|v| &v.0) + } + + pub(crate) fn cache_authorities(&mut self, relay_parent: Hash, authorities: Vec<AuthorityDiscoveryId>) { + self.authorities.insert(relay_parent, VecOfDoesNotAllocate(authorities)); + } + pub(crate) fn validators(&mut self, relay_parent: &Hash) -> Option<&Vec<ValidatorId>> { self.validators.get(relay_parent).map(|v| &v.0) } @@ -212,6 +234,7 @@ impl RequestResultCache { } pub(crate) enum RequestResult { + Authorities(Hash, Vec<AuthorityDiscoveryId>), Validators(Hash, Vec<ValidatorId>), ValidatorGroups(Hash, (Vec<Vec<ValidatorIndex>>, GroupRotationInfo)), AvailabilityCores(Hash, Vec<CoreState>), diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index d7dfa8b162f..0e10ac25b5e 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -34,6 +34,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost}; use sp_api::ProvideRuntimeApi; +use sp_authority_discovery::AuthorityDiscoveryApi; use sp_core::traits::SpawnNamed; use sp_consensus_babe::BabeApi; @@ -83,7 +84,7 @@ impl<Client> RuntimeApiSubsystem<Client> { impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where Client: ProvideRuntimeApi<Block> + Send + 'static + Sync, - Client::Api: ParachainHost<Block> + BabeApi<Block>, + Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, Context: SubsystemContext<Message = RuntimeApiMessage> { fn start(self, ctx: Context) -> SpawnedSubsystem { @@ -96,12 +97,14 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where impl<Client> RuntimeApiSubsystem<Client> where Client: ProvideRuntimeApi<Block> + Send + 'static + Sync, - Client::Api: ParachainHost<Block> + BabeApi<Block>, + Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, { fn store_cache(&mut self, result: RequestResult) { use RequestResult::*; match result { + Authorities(relay_parent, authorities) => + self.requests_cache.cache_authorities(relay_parent, authorities), Validators(relay_parent, validators) => self.requests_cache.cache_validators(relay_parent, validators), ValidatorGroups(relay_parent, groups) => @@ -160,6 +163,8 @@ impl<Client> RuntimeApiSubsystem<Client> where } match request { + Request::Authorities(sender) => query!(authorities(), sender) + .map(|sender| Request::Authorities(sender)), Request::Validators(sender) => query!(validators(), sender) .map(|sender| Request::Validators(sender)), Request::ValidatorGroups(sender) => query!(validator_groups(), sender) @@ -263,7 +268,7 @@ async fn run<Client>( mut subsystem: RuntimeApiSubsystem<Client>, ) -> SubsystemResult<()> where Client: ProvideRuntimeApi<Block> + Send + Sync + 'static, - Client::Api: ParachainHost<Block> + BabeApi<Block>, + Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, { loop { select! { @@ -291,7 +296,7 @@ fn make_runtime_api_request<Client>( ) -> Option<RequestResult> where Client: ProvideRuntimeApi<Block>, - Client::Api: ParachainHost<Block> + BabeApi<Block>, + Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, { let _timer = metrics.time_make_runtime_api_request(); @@ -327,6 +332,7 @@ where } match request { + Request::Authorities(sender) => query!(Authorities, authorities(), sender), Request::Validators(sender) => query!(Validators, validators(), sender), Request::ValidatorGroups(sender) => query!(ValidatorGroups, validator_groups(), sender), Request::AvailabilityCores(sender) => query!(AvailabilityCores, availability_cores(), sender), @@ -416,7 +422,7 @@ mod tests { ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData, Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode, CommittedCandidateReceipt, CandidateEvent, InboundDownwardMessage, - BlockNumber, InboundHrmpMessage, SessionInfo, + BlockNumber, InboundHrmpMessage, SessionInfo, AuthorityDiscoveryId, }; use polkadot_node_subsystem_test_helpers as test_helpers; use sp_core::testing::TaskExecutor; @@ -428,6 +434,7 @@ mod tests { #[derive(Default, Clone)] struct MockRuntimeApi { + authorities: Vec<AuthorityDiscoveryId>, validators: Vec<ValidatorId>, validator_groups: Vec<Vec<ValidatorIndex>>, availability_cores: Vec<CoreState>, @@ -582,6 +589,36 @@ mod tests { None } } + + impl AuthorityDiscoveryApi<Block> for MockRuntimeApi { + fn authorities(&self) -> Vec<AuthorityDiscoveryId> { + self.authorities.clone() + } + } + } + + #[test] + fn requests_authorities() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let runtime_api = Arc::new(MockRuntimeApi::default()); + let relay_parent = [1; 32].into(); + let spawner = sp_core::testing::TaskExecutor::new(); + + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request(relay_parent, Request::Authorities(tx)) + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), runtime_api.authorities); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); } #[test] diff --git a/polkadot/node/network/gossip-support/Cargo.toml b/polkadot/node/network/gossip-support/Cargo.toml index b1606173434..0de20c53781 100644 --- a/polkadot/node/network/gossip-support/Cargo.toml +++ b/polkadot/node/network/gossip-support/Cargo.toml @@ -5,10 +5,8 @@ authors = ["Parity Technologies <admin@parity.io>"] edition = "2018" [dependencies] -sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } -sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-node-network-protocol = { path = "../protocol" } polkadot-node-subsystem = { path = "../../subsystem" } diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 0f68f5af714..1b25e6971e2 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -19,9 +19,6 @@ //! the gossiping subsystems on every new session. use futures::{channel::mpsc, FutureExt as _}; -use std::sync::Arc; -use sp_api::ProvideRuntimeApi; -use sp_authority_discovery::AuthorityDiscoveryApi; use polkadot_node_subsystem::{ messages::{ GossipSupportMessage, @@ -34,7 +31,7 @@ use polkadot_node_subsystem_util::{ self as util, }; use polkadot_primitives::v1::{ - Hash, SessionIndex, AuthorityDiscoveryId, Block, BlockId, + Hash, SessionIndex, AuthorityDiscoveryId, }; use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId}; use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; @@ -43,8 +40,7 @@ use sp_application_crypto::{Public, AppKey}; const LOG_TARGET: &str = "parachain::gossip-support"; /// The Gossip Support subsystem. -pub struct GossipSupport<Client> { - client: Arc<Client>, +pub struct GossipSupport { keystore: SyncCryptoStorePtr, } @@ -55,15 +51,10 @@ struct State { _last_connection_request: Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>, } -impl<Client> GossipSupport<Client> -where - Client: ProvideRuntimeApi<Block>, - Client::Api: AuthorityDiscoveryApi<Block>, -{ +impl GossipSupport { /// Create a new instance of the [`GossipSupport`] subsystem. - pub fn new(keystore: SyncCryptoStorePtr, client: Arc<Client>) -> Self { + pub fn new(keystore: SyncCryptoStorePtr) -> Self { Self { - client, keystore, } } @@ -74,7 +65,7 @@ where Context: SubsystemContext<Message = GossipSupportMessage>, { let mut state = State::default(); - let Self { client, keystore } = self; + let Self { keystore } = self; loop { let message = match ctx.recv().await { Ok(message) => message, @@ -96,7 +87,7 @@ where tracing::trace!(target: LOG_TARGET, "active leaves signal"); let leaves = activated.into_iter().map(|a| a.hash); - if let Err(e) = state.handle_active_leaves(&mut ctx, client.clone(), &keystore, leaves).await { + if let Err(e) = state.handle_active_leaves(&mut ctx, &keystore, leaves).await { tracing::debug!(target: LOG_TARGET, error = ?e); } } @@ -109,18 +100,12 @@ where } } -async fn determine_relevant_authorities<Client>( - client: Arc<Client>, +async fn determine_relevant_authorities( + ctx: &mut impl SubsystemContext, relay_parent: Hash, -) -> Result<Vec<AuthorityDiscoveryId>, util::Error> -where - Client: ProvideRuntimeApi<Block>, - Client::Api: AuthorityDiscoveryApi<Block>, -{ - let api = client.runtime_api(); - let result = api.authorities(&BlockId::Hash(relay_parent)) - .map_err(|e| util::Error::RuntimeApi(format!("{:?}", e).into())); - result +) -> Result<Vec<AuthorityDiscoveryId>, util::Error> { + let authorities = util::request_authorities_ctx(relay_parent, ctx).await?.await??; + Ok(authorities) } /// Return an error if we're not a validator in the given set (do not have keys). @@ -143,17 +128,12 @@ impl State { /// 1. Determine if the current session index has changed. /// 2. If it has, determine relevant validators /// and issue a connection request. - async fn handle_active_leaves<Client>( + async fn handle_active_leaves( &mut self, ctx: &mut impl SubsystemContext, - client: Arc<Client>, keystore: &SyncCryptoStorePtr, leaves: impl Iterator<Item = Hash>, - ) -> Result<(), util::Error> - where - Client: ProvideRuntimeApi<Block>, - Client::Api: AuthorityDiscoveryApi<Block>, - { + ) -> Result<(), util::Error> { for leaf in leaves { let current_index = util::request_session_index_for_child_ctx(leaf, ctx).await?.await??; let maybe_new_session = match self.last_session_index { @@ -163,7 +143,7 @@ impl State { if let Some((new_session, relay_parent)) = maybe_new_session { tracing::debug!(target: LOG_TARGET, %new_session, "New session detected"); - let authorities = determine_relevant_authorities(client.clone(), relay_parent).await?; + let authorities = determine_relevant_authorities(ctx, relay_parent).await?; ensure_i_am_an_authority(keystore, &authorities).await?; tracing::debug!(target: LOG_TARGET, num = ?authorities.len(), "Issuing a connection request"); @@ -182,11 +162,9 @@ impl State { } } -impl<Client, Context> Subsystem<Context> for GossipSupport<Client> +impl<Context> Subsystem<Context> for GossipSupport where Context: SubsystemContext<Message = GossipSupportMessage> + Sync + Send, - Client: ProvideRuntimeApi<Block> + Send + 'static + Sync, - Client::Api: AuthorityDiscoveryApi<Block>, { fn start(self, ctx: Context) -> SpawnedSubsystem { let future = self.run(ctx) diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 0f909492648..f51f204b7c1 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -570,7 +570,6 @@ where ), gossip_support: GossipSupportSubsystem::new( keystore.clone(), - runtime_client.clone(), ), }; diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 47c4406a95c..fbcc6588bc3 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -39,6 +39,7 @@ use polkadot_primitives::v1::{ CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption, SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, SessionInfo, + AuthorityDiscoveryId, }; use sp_core::{traits::SpawnNamed, Public}; use sp_application_crypto::AppKey; @@ -166,6 +167,7 @@ macro_rules! specialize_requests { } specialize_requests! { + fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities; fn request_validators() -> Vec<ValidatorId>; Validators; fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups; fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores; @@ -247,6 +249,7 @@ macro_rules! specialize_requests_ctx { } specialize_requests_ctx! { + fn request_authorities_ctx() -> Vec<AuthorityDiscoveryId>; Authorities; fn request_validators_ctx() -> Vec<ValidatorId>; Validators; fn request_validator_groups_ctx() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups; fn request_availability_cores_ctx() -> Vec<CoreState>; AvailabilityCores; diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index f80cd77a4c1..4910d32e9f3 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -437,6 +437,8 @@ pub type RuntimeApiSender<T> = oneshot::Sender<Result<T, crate::errors::RuntimeA /// A request to the Runtime API subsystem. #[derive(Debug)] pub enum RuntimeApiRequest { + /// Get the next, current and some previous authority discovery set deduplicated. + Authorities(RuntimeApiSender<Vec<AuthorityDiscoveryId>>), /// Get the current validator set. Validators(RuntimeApiSender<Vec<ValidatorId>>), /// Get the validator groups and group rotation info. -- GitLab