From 247d05114a69d42576ae04a048671b3b82b769e8 Mon Sep 17 00:00:00 2001 From: Davide Galassi <davxy@datawok.net> Date: Fri, 21 Oct 2022 10:41:46 +0200 Subject: [PATCH] Single ParachainBlockImport instance (#1782) --- .../client/consensus/aura/src/import_queue.rs | 7 +- cumulus/client/consensus/aura/src/lib.rs | 34 ++-- cumulus/client/consensus/common/src/lib.rs | 6 + .../consensus/relay-chain/src/import_queue.rs | 12 +- .../client/consensus/relay-chain/src/lib.rs | 8 +- .../parachain-template/node/src/command.rs | 1 - .../parachain-template/node/src/service.rs | 24 ++- cumulus/polkadot-parachain/src/service.rs | 164 ++++++++++-------- cumulus/test/service/src/lib.rs | 19 +- 9 files changed, 154 insertions(+), 121 deletions(-) diff --git a/cumulus/client/consensus/aura/src/import_queue.rs b/cumulus/client/consensus/aura/src/import_queue.rs index 80b35a2bab9..2c21bf546b4 100644 --- a/cumulus/client/consensus/aura/src/import_queue.rs +++ b/cumulus/client/consensus/aura/src/import_queue.rs @@ -17,6 +17,7 @@ //! Parachain specific wrapper for the AuRa import queue. use codec::Codec; +use cumulus_client_consensus_common::ParachainBlockImport; use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider}; use sc_consensus::{import_queue::DefaultImportQueue, BlockImport}; use sc_consensus_aura::AuraVerifier; @@ -33,10 +34,10 @@ use sp_runtime::traits::Block as BlockT; use std::{fmt::Debug, hash::Hash, sync::Arc}; use substrate_prometheus_endpoint::Registry; -/// Parameters of [`import_queue`]. +/// Parameters for [`import_queue`]. pub struct ImportQueueParams<'a, I, C, CIDP, S> { /// The block import to use. - pub block_import: I, + pub block_import: ParachainBlockImport<I>, /// The client to interact with the chain. pub client: Arc<C>, /// The inherent data providers, to create the inherent data. @@ -83,7 +84,7 @@ where CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { sc_consensus_aura::import_queue::<P, _, _, _, _, _>(sc_consensus_aura::ImportQueueParams { - block_import: cumulus_client_consensus_common::ParachainBlockImport::new(block_import), + block_import, justification_import: None, client, create_inherent_data_providers, diff --git a/cumulus/client/consensus/aura/src/lib.rs b/cumulus/client/consensus/aura/src/lib.rs index 1be562e37cb..5dbd9fcd51a 100644 --- a/cumulus/client/consensus/aura/src/lib.rs +++ b/cumulus/client/consensus/aura/src/lib.rs @@ -71,6 +71,22 @@ impl<B, CIDP, W> Clone for AuraConsensus<B, CIDP, W> { } } +/// Parameters of [`AuraConsensus::build`]. +pub struct BuildAuraConsensusParams<PF, BI, CIDP, Client, BS, SO> { + pub proposer_factory: PF, + pub create_inherent_data_providers: CIDP, + pub block_import: ParachainBlockImport<BI>, + pub para_client: Arc<Client>, + pub backoff_authoring_blocks: Option<BS>, + pub sync_oracle: SO, + pub keystore: SyncCryptoStorePtr, + pub force_authoring: bool, + pub slot_duration: SlotDuration, + pub telemetry: Option<TelemetryHandle>, + pub block_proposal_slot_portion: SlotProportion, + pub max_block_proposal_slot_portion: Option<SlotProportion>, +} + impl<B, CIDP> AuraConsensus<B, CIDP, ()> where B: BlockT, @@ -117,7 +133,7 @@ where let worker = sc_consensus_aura::build_aura_worker::<P, _, _, _, _, _, _, _, _>( BuildAuraWorkerParams { client: para_client, - block_import: ParachainBlockImport::new(block_import), + block_import, justification_sync_link: (), proposer_factory, sync_oracle, @@ -216,19 +232,3 @@ where Some(ParachainCandidate { block: res.block, proof: res.storage_proof }) } } - -/// Parameters of [`AuraConsensus::build`]. -pub struct BuildAuraConsensusParams<PF, BI, CIDP, Client, BS, SO> { - pub proposer_factory: PF, - pub create_inherent_data_providers: CIDP, - pub block_import: BI, - pub para_client: Arc<Client>, - pub backoff_authoring_blocks: Option<BS>, - pub sync_oracle: SO, - pub keystore: SyncCryptoStorePtr, - pub force_authoring: bool, - pub slot_duration: SlotDuration, - pub telemetry: Option<TelemetryHandle>, - pub block_proposal_slot_portion: SlotProportion, - pub max_block_proposal_slot_portion: Option<SlotProportion>, -} diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs index 61098dfd434..d5d33585439 100644 --- a/cumulus/client/consensus/common/src/lib.rs +++ b/cumulus/client/consensus/common/src/lib.rs @@ -83,6 +83,12 @@ impl<I> ParachainBlockImport<I> { } } +impl<I: Clone> Clone for ParachainBlockImport<I> { + fn clone(&self) -> Self { + ParachainBlockImport(self.0.clone()) + } +} + #[async_trait::async_trait] impl<Block, I> BlockImport<Block> for ParachainBlockImport<I> where diff --git a/cumulus/client/consensus/relay-chain/src/import_queue.rs b/cumulus/client/consensus/relay-chain/src/import_queue.rs index 3792a9c04bb..e5a31e600b6 100644 --- a/cumulus/client/consensus/relay-chain/src/import_queue.rs +++ b/cumulus/client/consensus/relay-chain/src/import_queue.rs @@ -16,6 +16,8 @@ use std::{marker::PhantomData, sync::Arc}; +use cumulus_client_consensus_common::ParachainBlockImport; + use sc_consensus::{ import_queue::{BasicQueue, Verifier as VerifierT}, BlockImport, BlockImportParams, @@ -103,7 +105,7 @@ where /// Start an import queue for a Cumulus collator that does not uses any special authoring logic. pub fn import_queue<Client, Block: BlockT, I, CIDP>( client: Arc<Client>, - block_import: I, + block_import: ParachainBlockImport<I>, create_inherent_data_providers: CIDP, spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&substrate_prometheus_endpoint::Registry>, @@ -117,11 +119,5 @@ where { let verifier = Verifier::new(client, create_inherent_data_providers); - Ok(BasicQueue::new( - verifier, - Box::new(cumulus_client_consensus_common::ParachainBlockImport::new(block_import)), - None, - spawner, - registry, - )) + Ok(BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry)) } diff --git a/cumulus/client/consensus/relay-chain/src/lib.rs b/cumulus/client/consensus/relay-chain/src/lib.rs index 20d3b7cc757..decef373977 100644 --- a/cumulus/client/consensus/relay-chain/src/lib.rs +++ b/cumulus/client/consensus/relay-chain/src/lib.rs @@ -90,16 +90,14 @@ where para_id: ParaId, proposer_factory: PF, create_inherent_data_providers: CIDP, - block_import: BI, + block_import: ParachainBlockImport<BI>, relay_chain_interface: RCInterface, ) -> Self { Self { para_id, proposer_factory: Arc::new(Mutex::new(proposer_factory)), create_inherent_data_providers: Arc::new(create_inherent_data_providers), - block_import: Arc::new(futures::lock::Mutex::new(ParachainBlockImport::new( - block_import, - ))), + block_import: Arc::new(futures::lock::Mutex::new(block_import)), relay_chain_interface, _phantom: PhantomData, } @@ -222,7 +220,7 @@ pub struct BuildRelayChainConsensusParams<PF, BI, CIDP, RCInterface> { pub para_id: ParaId, pub proposer_factory: PF, pub create_inherent_data_providers: CIDP, - pub block_import: BI, + pub block_import: ParachainBlockImport<BI>, pub relay_chain_interface: RCInterface, } diff --git a/cumulus/parachain-template/node/src/command.rs b/cumulus/parachain-template/node/src/command.rs index 7301b6d570b..14a7e046bd1 100644 --- a/cumulus/parachain-template/node/src/command.rs +++ b/cumulus/parachain-template/node/src/command.rs @@ -217,7 +217,6 @@ pub fn run() -> Result<()> { let partials = new_partial(&config)?; let db = partials.backend.expose_db(); let storage = partials.backend.expose_storage(); - cmd.run(config, partials.client.clone(), db, storage) }), BenchmarkCmd::Machine(cmd) => diff --git a/cumulus/parachain-template/node/src/service.rs b/cumulus/parachain-template/node/src/service.rs index 0980bb2cf78..eab33374529 100644 --- a/cumulus/parachain-template/node/src/service.rs +++ b/cumulus/parachain-template/node/src/service.rs @@ -9,7 +9,9 @@ use parachain_template_runtime::{opaque::Block, Hash, RuntimeApi}; // Cumulus Imports use cumulus_client_consensus_aura::{AuraConsensus, BuildAuraConsensusParams, SlotProportion}; -use cumulus_client_consensus_common::ParachainConsensus; +use cumulus_client_consensus_common::{ + ParachainBlockImport as TParachainBlockImport, ParachainConsensus, +}; use cumulus_client_network::BlockAnnounceValidator; use cumulus_client_service::{ prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams, @@ -51,6 +53,8 @@ type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>; type ParachainBackend = TFullBackend<Block>; +type ParachainBlockImport = TParachainBlockImport<Arc<ParachainClient>>; + /// Starts a `ServiceBuilder` for a full service. /// /// Use this macro if you don't actually need the full service, but just the builder in order to @@ -64,7 +68,7 @@ pub fn new_partial( (), sc_consensus::DefaultImportQueue<Block, ParachainClient>, sc_transaction_pool::FullPool<Block, ParachainClient>, - (Option<Telemetry>, Option<TelemetryWorkerHandle>), + (ParachainBlockImport, Option<Telemetry>, Option<TelemetryWorkerHandle>), >, sc_service::Error, > { @@ -109,8 +113,11 @@ pub fn new_partial( client.clone(), ); + let block_import = ParachainBlockImport::new(client.clone()); + let import_queue = build_import_queue( client.clone(), + block_import.clone(), config, telemetry.as_ref().map(|telemetry| telemetry.handle()), &task_manager, @@ -124,7 +131,7 @@ pub fn new_partial( task_manager, transaction_pool, select_chain: (), - other: (telemetry, telemetry_worker_handle), + other: (block_import, telemetry, telemetry_worker_handle), }) } @@ -163,7 +170,7 @@ async fn start_node_impl( let parachain_config = prepare_node_config(parachain_config); let params = new_partial(¶chain_config)?; - let (mut telemetry, telemetry_worker_handle) = params.other; + let (block_import, mut telemetry, telemetry_worker_handle) = params.other; let client = params.client.clone(); let backend = params.backend.clone(); @@ -255,6 +262,7 @@ async fn start_node_impl( if validator { let parachain_consensus = build_consensus( client.clone(), + block_import, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), &task_manager, @@ -304,6 +312,7 @@ async fn start_node_impl( /// Build the import queue for the parachain runtime. fn build_import_queue( client: Arc<ParachainClient>, + block_import: ParachainBlockImport, config: &Configuration, telemetry: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -318,8 +327,8 @@ fn build_import_queue( _, _, >(cumulus_client_consensus_aura::ImportQueueParams { - block_import: client.clone(), - client: client.clone(), + block_import, + client, create_inherent_data_providers: move |_, _| async move { let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); @@ -340,6 +349,7 @@ fn build_import_queue( fn build_consensus( client: Arc<ParachainClient>, + block_import: ParachainBlockImport, prometheus_registry: Option<&Registry>, telemetry: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -389,7 +399,7 @@ fn build_consensus( Ok((slot, timestamp, parachain_inherent)) } }, - block_import: client.clone(), + block_import, para_client: client, backoff_authoring_blocks: Option::<()>::None, sync_oracle, diff --git a/cumulus/polkadot-parachain/src/service.rs b/cumulus/polkadot-parachain/src/service.rs index 2f38da0bd5d..4c87c20d772 100644 --- a/cumulus/polkadot-parachain/src/service.rs +++ b/cumulus/polkadot-parachain/src/service.rs @@ -18,7 +18,7 @@ use codec::Codec; use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_aura::{AuraConsensus, BuildAuraConsensusParams, SlotProportion}; use cumulus_client_consensus_common::{ - ParachainBlockImport, ParachainCandidate, ParachainConsensus, + ParachainBlockImport as TParachainBlockImport, ParachainCandidate, ParachainConsensus, }; use cumulus_client_network::BlockAnnounceValidator; use cumulus_client_service::{ @@ -73,6 +73,8 @@ type ParachainClient<RuntimeApi> = TFullClient<Block, RuntimeApi, WasmExecutor<H type ParachainBackend = TFullBackend<Block>; +type ParachainBlockImport<RuntimeApi> = TParachainBlockImport<Arc<ParachainClient<RuntimeApi>>>; + /// Native executor instance. pub struct ShellRuntimeExecutor; @@ -162,7 +164,7 @@ pub fn new_partial<RuntimeApi, BIQ>( (), sc_consensus::DefaultImportQueue<Block, ParachainClient<RuntimeApi>>, sc_transaction_pool::FullPool<Block, ParachainClient<RuntimeApi>>, - (Option<Telemetry>, Option<TelemetryWorkerHandle>), + (ParachainBlockImport<RuntimeApi>, Option<Telemetry>, Option<TelemetryWorkerHandle>), >, sc_service::Error, > @@ -179,6 +181,7 @@ where sc_client_api::StateBackendFor<ParachainBackend, Block>: sp_api::StateBackend<BlakeTwo256>, BIQ: FnOnce( Arc<ParachainClient<RuntimeApi>>, + ParachainBlockImport<RuntimeApi>, &Configuration, Option<TelemetryHandle>, &TaskManager, @@ -229,8 +232,11 @@ where client.clone(), ); + let block_import = ParachainBlockImport::new(client.clone()); + let import_queue = build_import_queue( client.clone(), + block_import.clone(), config, telemetry.as_ref().map(|telemetry| telemetry.handle()), &task_manager, @@ -244,7 +250,7 @@ where task_manager, transaction_pool, select_chain: (), - other: (telemetry, telemetry_worker_handle), + other: (block_import, telemetry, telemetry_worker_handle), }; Ok(params) @@ -279,7 +285,7 @@ async fn start_shell_node_impl<RuntimeApi, RB, BIQ, BIC>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, rpc_ext_builder: RB, build_import_queue: BIQ, build_consensus: BIC, @@ -298,10 +304,10 @@ where + cumulus_primitives_core::CollectCollationInfo<Block>, sc_client_api::StateBackendFor<ParachainBackend, Block>: sp_api::StateBackend<BlakeTwo256>, RB: Fn(Arc<ParachainClient<RuntimeApi>>) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error> - + Send + 'static, BIQ: FnOnce( Arc<ParachainClient<RuntimeApi>>, + ParachainBlockImport<RuntimeApi>, &Configuration, Option<TelemetryHandle>, &TaskManager, @@ -311,6 +317,7 @@ where >, BIC: FnOnce( Arc<ParachainClient<RuntimeApi>>, + ParachainBlockImport<RuntimeApi>, Option<&Registry>, Option<TelemetryHandle>, &TaskManager, @@ -324,7 +331,7 @@ where let parachain_config = prepare_node_config(parachain_config); let params = new_partial::<RuntimeApi, BIQ>(¶chain_config, build_import_queue)?; - let (mut telemetry, telemetry_worker_handle) = params.other; + let (block_import, mut telemetry, telemetry_worker_handle) = params.other; let client = params.client.clone(); let backend = params.backend.clone(); @@ -345,7 +352,8 @@ where s => s.to_string().into(), })?; - let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id); + let block_announce_validator = + BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id); let force_authoring = parachain_config.force_authoring; let validator = parachain_config.role.is_authority(); @@ -405,6 +413,7 @@ where if validator { let parachain_consensus = build_consensus( client.clone(), + block_import, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), &task_manager, @@ -418,7 +427,7 @@ where let spawner = task_manager.spawn_handle(); let params = StartCollatorParams { - para_id: id, + para_id, block_status: client.clone(), announce_block, client: client.clone(), @@ -437,7 +446,7 @@ where client: client.clone(), announce_block, task_manager: &mut task_manager, - para_id: id, + para_id, relay_chain_interface, relay_chain_slot_duration, import_queue, @@ -459,7 +468,7 @@ async fn start_node_impl<RuntimeApi, RB, BIQ, BIC>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, _rpc_ext_builder: RB, build_import_queue: BIQ, build_consensus: BIC, @@ -479,20 +488,20 @@ where + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance> + frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>, sc_client_api::StateBackendFor<ParachainBackend, Block>: sp_api::StateBackend<BlakeTwo256>, - RB: Fn(Arc<ParachainClient<RuntimeApi>>) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error> - + Send - + 'static, + RB: Fn(Arc<ParachainClient<RuntimeApi>>) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error>, BIQ: FnOnce( - Arc<ParachainClient<RuntimeApi>>, - &Configuration, - Option<TelemetryHandle>, - &TaskManager, - ) -> Result< - sc_consensus::DefaultImportQueue<Block, ParachainClient<RuntimeApi>>, - sc_service::Error, - > + 'static, + Arc<ParachainClient<RuntimeApi>>, + ParachainBlockImport<RuntimeApi>, + &Configuration, + Option<TelemetryHandle>, + &TaskManager, + ) -> Result< + sc_consensus::DefaultImportQueue<Block, ParachainClient<RuntimeApi>>, + sc_service::Error, + >, BIC: FnOnce( Arc<ParachainClient<RuntimeApi>>, + ParachainBlockImport<RuntimeApi>, Option<&Registry>, Option<TelemetryHandle>, &TaskManager, @@ -506,7 +515,7 @@ where let parachain_config = prepare_node_config(parachain_config); let params = new_partial::<RuntimeApi, BIQ>(¶chain_config, build_import_queue)?; - let (mut telemetry, telemetry_worker_handle) = params.other; + let (block_import, mut telemetry, telemetry_worker_handle) = params.other; let client = params.client.clone(); let backend = params.backend.clone(); @@ -526,7 +535,8 @@ where s => s.to_string().into(), })?; - let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id); + let block_announce_validator = + BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id); let force_authoring = parachain_config.force_authoring; let validator = parachain_config.role.is_authority(); @@ -598,6 +608,7 @@ where if validator { let parachain_consensus = build_consensus( client.clone(), + block_import, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), &task_manager, @@ -611,7 +622,7 @@ where let spawner = task_manager.spawn_handle(); let params = StartCollatorParams { - para_id: id, + para_id, block_status: client.clone(), announce_block, client: client.clone(), @@ -630,7 +641,7 @@ where client: client.clone(), announce_block, task_manager: &mut task_manager, - para_id: id, + para_id, relay_chain_interface, relay_chain_slot_duration, import_queue, @@ -647,6 +658,7 @@ where /// Build the import queue for the rococo parachain runtime. pub fn rococo_parachain_build_import_queue( client: Arc<ParachainClient<rococo_parachain_runtime::RuntimeApi>>, + block_import: ParachainBlockImport<rococo_parachain_runtime::RuntimeApi>, config: &Configuration, telemetry: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -664,7 +676,7 @@ pub fn rococo_parachain_build_import_queue( _, _, >(cumulus_client_consensus_aura::ImportQueueParams { - block_import: client.clone(), + block_import, client, create_inherent_data_providers: move |_, _| async move { let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); @@ -689,7 +701,7 @@ pub async fn start_rococo_parachain_node( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, hwbench: Option<sc_sysinfo::HwBench>, ) -> sc_service::error::Result<( TaskManager, @@ -699,10 +711,11 @@ pub async fn start_rococo_parachain_node( parachain_config, polkadot_config, collator_options, - id, + para_id, |_| Ok(RpcModule::new(())), rococo_parachain_build_import_queue, |client, + block_import, prometheus_registry, telemetry, task_manager, @@ -733,7 +746,7 @@ pub async fn start_rococo_parachain_node( relay_parent, &relay_chain_interface, &validation_data, - id, + para_id, ).await; let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); @@ -753,7 +766,7 @@ pub async fn start_rococo_parachain_node( Ok((slot, timestamp, parachain_inherent)) } }, - block_import: client.clone(), + block_import, para_client: client, backoff_authoring_blocks: Option::<()>::None, sync_oracle, @@ -776,6 +789,7 @@ pub async fn start_rococo_parachain_node( /// Build the import queue for the shell runtime. pub fn shell_build_import_queue<RuntimeApi>( client: Arc<ParachainClient<RuntimeApi>>, + block_import: ParachainBlockImport<RuntimeApi>, config: &Configuration, _: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -794,7 +808,7 @@ where { cumulus_client_consensus_relay_chain::import_queue( client.clone(), - client, + block_import, |_, _| async { Ok(()) }, &task_manager.spawn_essential_handle(), config.prometheus_registry(), @@ -807,7 +821,7 @@ pub async fn start_shell_node<RuntimeApi>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, hwbench: Option<sc_sysinfo::HwBench>, ) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient<RuntimeApi>>)> where @@ -827,10 +841,11 @@ where parachain_config, polkadot_config, collator_options, - id, + para_id, |_| Ok(RpcModule::new(())), shell_build_import_queue, |client, + block_import, prometheus_registry, telemetry, task_manager, @@ -841,7 +856,7 @@ where _| { let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( task_manager.spawn_handle(), - client.clone(), + client, transaction_pool, prometheus_registry, telemetry, @@ -849,9 +864,9 @@ where Ok(cumulus_client_consensus_relay_chain::build_relay_chain_consensus( cumulus_client_consensus_relay_chain::BuildRelayChainConsensusParams { - para_id: id, + para_id, proposer_factory, - block_import: client, + block_import, relay_chain_interface: relay_chain_interface.clone(), create_inherent_data_providers: move |_, (relay_parent, validation_data)| { let relay_chain_interface = relay_chain_interface.clone(); @@ -861,7 +876,7 @@ where relay_parent, &relay_chain_interface, &validation_data, - id, + para_id, ).await; let parachain_inherent = parachain_inherent.ok_or_else(|| { Box::<dyn std::error::Error + Send + Sync>::from( @@ -989,6 +1004,7 @@ where /// Build the import queue for Statemint and other Aura-based runtimes. pub fn aura_build_import_queue<RuntimeApi, AuraId: AppKey>( client: Arc<ParachainClient<RuntimeApi>>, + block_import: ParachainBlockImport<RuntimeApi>, config: &Configuration, telemetry_handle: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -1045,13 +1061,7 @@ where let registry = config.prometheus_registry(); let spawner = task_manager.spawn_essential_handle(); - Ok(BasicQueue::new( - verifier, - Box::new(ParachainBlockImport::new(client)), - None, - &spawner, - registry, - )) + Ok(BasicQueue::new(verifier, Box::new(block_import), None, &spawner, registry)) } /// Start an aura powered parachain node. @@ -1060,7 +1070,7 @@ pub async fn start_generic_aura_node<RuntimeApi, AuraId: AppKey>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, hwbench: Option<sc_sysinfo::HwBench>, ) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient<RuntimeApi>>)> where @@ -1085,10 +1095,11 @@ where parachain_config, polkadot_config, collator_options, - id, + para_id, |_| Ok(RpcModule::new(())), aura_build_import_queue::<_, AuraId>, |client, + block_import, prometheus_registry, telemetry, task_manager, @@ -1097,8 +1108,9 @@ where sync_oracle, keystore, force_authoring| { - let client2 = client.clone(); let spawn_handle = task_manager.spawn_handle(); + let client2 = client.clone(); + let block_import2 = block_import.clone(); let transaction_pool2 = transaction_pool.clone(); let telemetry2 = telemetry.clone(); let prometheus_registry2 = prometheus_registry.map(|r| (*r).clone()); @@ -1127,7 +1139,7 @@ where relay_parent, &relay_chain_for_aura, &validation_data, - id, + para_id, ).await; let timestamp = @@ -1149,8 +1161,8 @@ where Ok((slot, timestamp, parachain_inherent)) } }, - block_import: client2.clone(), - para_client: client2.clone(), + block_import: block_import2, + para_client: client2, backoff_authoring_blocks: Option::<()>::None, sync_oracle, keystore, @@ -1176,9 +1188,9 @@ where let relay_chain_consensus = cumulus_client_consensus_relay_chain::build_relay_chain_consensus( cumulus_client_consensus_relay_chain::BuildRelayChainConsensusParams { - para_id: id, + para_id, proposer_factory, - block_import: client.clone(), + block_import, relay_chain_interface: relay_chain_interface.clone(), create_inherent_data_providers: move |_, (relay_parent, validation_data)| { @@ -1189,7 +1201,7 @@ where relay_parent, &relay_chain_interface, &validation_data, - id, + para_id, ).await; let parachain_inherent = parachain_inherent.ok_or_else(|| { @@ -1222,7 +1234,7 @@ async fn start_contracts_rococo_node_impl<RuntimeApi, RB, BIQ, BIC>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, _rpc_ext_builder: RB, build_import_queue: BIQ, build_consensus: BIC, @@ -1242,20 +1254,20 @@ where + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance> + frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>, sc_client_api::StateBackendFor<ParachainBackend, Block>: sp_api::StateBackend<BlakeTwo256>, - RB: Fn(Arc<ParachainClient<RuntimeApi>>) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error> - + Send - + 'static, + RB: Fn(Arc<ParachainClient<RuntimeApi>>) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error>, BIQ: FnOnce( - Arc<ParachainClient<RuntimeApi>>, - &Configuration, - Option<TelemetryHandle>, - &TaskManager, - ) -> Result< - sc_consensus::DefaultImportQueue<Block, ParachainClient<RuntimeApi>>, - sc_service::Error, - > + 'static, + Arc<ParachainClient<RuntimeApi>>, + ParachainBlockImport<RuntimeApi>, + &Configuration, + Option<TelemetryHandle>, + &TaskManager, + ) -> Result< + sc_consensus::DefaultImportQueue<Block, ParachainClient<RuntimeApi>>, + sc_service::Error, + >, BIC: FnOnce( Arc<ParachainClient<RuntimeApi>>, + ParachainBlockImport<RuntimeApi>, Option<&Registry>, Option<TelemetryHandle>, &TaskManager, @@ -1269,7 +1281,7 @@ where let parachain_config = prepare_node_config(parachain_config); let params = new_partial::<RuntimeApi, BIQ>(¶chain_config, build_import_queue)?; - let (mut telemetry, telemetry_worker_handle) = params.other; + let (block_import, mut telemetry, telemetry_worker_handle) = params.other; let client = params.client.clone(); let backend = params.backend.clone(); @@ -1289,7 +1301,8 @@ where s => s.to_string().into(), })?; - let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id); + let block_announce_validator = + BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id); let force_authoring = parachain_config.force_authoring; let validator = parachain_config.role.is_authority(); @@ -1361,6 +1374,7 @@ where if validator { let parachain_consensus = build_consensus( client.clone(), + block_import, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), &task_manager, @@ -1374,7 +1388,7 @@ where let spawner = task_manager.spawn_handle(); let params = StartCollatorParams { - para_id: id, + para_id, block_status: client.clone(), announce_block, client: client.clone(), @@ -1393,7 +1407,7 @@ where client: client.clone(), announce_block, task_manager: &mut task_manager, - para_id: id, + para_id, relay_chain_interface, relay_chain_slot_duration, import_queue, @@ -1410,6 +1424,7 @@ where #[allow(clippy::type_complexity)] pub fn contracts_rococo_build_import_queue( client: Arc<ParachainClient<contracts_rococo_runtime::RuntimeApi>>, + block_import: ParachainBlockImport<contracts_rococo_runtime::RuntimeApi>, config: &Configuration, telemetry: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -1427,7 +1442,7 @@ pub fn contracts_rococo_build_import_queue( _, _, >(cumulus_client_consensus_aura::ImportQueueParams { - block_import: client.clone(), + block_import, client, create_inherent_data_providers: move |_, _| async move { let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); @@ -1452,7 +1467,7 @@ pub async fn start_contracts_rococo_node( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, hwbench: Option<sc_sysinfo::HwBench>, ) -> sc_service::error::Result<( TaskManager, @@ -1462,10 +1477,11 @@ pub async fn start_contracts_rococo_node( parachain_config, polkadot_config, collator_options, - id, + para_id, |_| Ok(RpcModule::new(())), contracts_rococo_build_import_queue, |client, + block_import, prometheus_registry, telemetry, task_manager, @@ -1495,7 +1511,7 @@ pub async fn start_contracts_rococo_node( relay_parent, &relay_chain_interface, &validation_data, - id, + para_id, ).await; let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); @@ -1515,7 +1531,7 @@ pub async fn start_contracts_rococo_node( Ok((slot, timestamp, parachain_inherent)) } }, - block_import: client.clone(), + block_import, para_client: client, backoff_authoring_blocks: Option::<()>::None, sync_oracle, diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index 9e9f5883f97..29f37806ccf 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -30,7 +30,9 @@ use url::Url; use crate::runtime::Weight; use cumulus_client_cli::CollatorOptions; -use cumulus_client_consensus_common::{ParachainCandidate, ParachainConsensus}; +use cumulus_client_consensus_common::{ + ParachainBlockImport as TParachainBlockImport, ParachainCandidate, ParachainConsensus, +}; use cumulus_client_network::BlockAnnounceValidator; use cumulus_client_service::{ prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams, @@ -117,6 +119,8 @@ pub type Client = TFullClient< /// Transaction pool type used by the test service pub type TransactionPool = Arc<sc_transaction_pool::FullPool<Block, Client>>; +type ParachainBlockImport = TParachainBlockImport<Arc<Client>>; + /// Starts a `ServiceBuilder` for a full service. /// /// Use this macro if you don't actually need the full service, but just the builder in order to @@ -130,7 +134,7 @@ pub fn new_partial( (), sc_consensus::import_queue::BasicQueue<Block, PrefixedMemoryDB<BlakeTwo256>>, sc_transaction_pool::FullPool<Block, Client>, - (), + ParachainBlockImport, >, sc_service::Error, > { @@ -145,6 +149,8 @@ pub fn new_partial( sc_service::new_full_parts::<Block, RuntimeApi, _>(config, None, executor)?; let client = Arc::new(client); + let block_import = ParachainBlockImport::new(client.clone()); + let registry = config.prometheus_registry(); let transaction_pool = sc_transaction_pool::BasicPool::new_full( @@ -157,7 +163,7 @@ pub fn new_partial( let import_queue = cumulus_client_consensus_relay_chain::import_queue( client.clone(), - client.clone(), + block_import.clone(), |_, _| async { Ok(sp_timestamp::InherentDataProvider::from_system_time()) }, &task_manager.spawn_essential_handle(), registry, @@ -171,7 +177,7 @@ pub fn new_partial( task_manager, transaction_pool, select_chain: (), - other: (), + other: block_import, }; Ok(params) @@ -244,6 +250,8 @@ where let client = params.client.clone(); let backend = params.backend.clone(); + let block_import = params.other; + let relay_chain_interface = build_relay_chain_interface( relay_chain_config, collator_key.clone(), @@ -275,7 +283,6 @@ where let rpc_builder = { let client = client.clone(); - Box::new(move |_, _| rpc_ext_builder(client.clone())) }; @@ -338,7 +345,7 @@ where Ok((time, parachain_inherent)) } }, - client.clone(), + block_import, relay_chain_interface2, )) }, -- GitLab