From b8da8faa0a675afbed1c9ed5d524a674e93910b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= <git@kchr.de> Date: Fri, 13 Dec 2024 11:31:14 +0100 Subject: [PATCH] slot-based-collator: Implement dedicated block import (#6481) The `SlotBasedBlockImport` job is to collect the storage proofs of all blocks getting imported. These storage proofs alongside the block are being forwarded to the collation task. Right now they are just being thrown away. More logic will follow later. Basically this will be required to include multiple blocks into one `PoV` which will then be done by the collation task. --------- Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Co-authored-by: GitHub Action <action@github.com> --- Cargo.lock | 2 + cumulus/client/consensus/aura/Cargo.toml | 1 + .../src/collators/slot_based/block_import.rs | 144 ++++++++++++++++++ .../collators/slot_based/collation_task.rs | 41 +++-- .../aura/src/collators/slot_based/mod.rs | 11 +- cumulus/polkadot-omni-node/lib/Cargo.toml | 1 + .../polkadot-omni-node/lib/src/common/spec.rs | 74 +++++++-- .../lib/src/common/types.rs | 14 +- .../polkadot-omni-node/lib/src/nodes/aura.rs | 119 ++++++++++++--- .../lib/src/nodes/manual_seal.rs | 18 ++- cumulus/test/service/src/lib.rs | 20 ++- prdoc/pr_6481.prdoc | 10 ++ 12 files changed, 395 insertions(+), 60 deletions(-) create mode 100644 cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs create mode 100644 prdoc/pr_6481.prdoc diff --git a/Cargo.lock b/Cargo.lock index f2379d4ee6d..d0abba9d4cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4656,6 +4656,7 @@ dependencies = [ "sp-runtime 31.0.1", "sp-state-machine 0.35.0", "sp-timestamp 26.0.0", + "sp-trie 29.0.0", "substrate-prometheus-endpoint", "tokio", "tracing", @@ -18135,6 +18136,7 @@ dependencies = [ "serde_json", "sp-api 26.0.0", "sp-block-builder 26.0.0", + "sp-consensus", "sp-consensus-aura 0.32.0", "sp-core 28.0.0", "sp-crypto-hashing 0.1.0", diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index 6e0c124591c..33f24e30ccf 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -35,6 +35,7 @@ sp-blockchain = { workspace = true, default-features = true } sp-consensus = { workspace = true, default-features = true } sp-consensus-aura = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } +sp-trie = { workspace = true, default-features = true } sp-inherents = { workspace = true, default-features = true } sp-keystore = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs new file mode 100644 index 00000000000..9c53da6a6b7 --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs @@ -0,0 +1,144 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see <http://www.gnu.org/licenses/>. + +use futures::{stream::FusedStream, StreamExt}; +use sc_consensus::{BlockImport, StateAction}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use sp_api::{ApiExt, CallApiAt, CallContext, Core, ProvideRuntimeApi, StorageProof}; +use sp_runtime::traits::{Block as BlockT, Header as _}; +use sp_trie::proof_size_extension::ProofSizeExt; +use std::sync::Arc; + +/// Handle for receiving the block and the storage proof from the [`SlotBasedBlockImport`]. +/// +/// This handle should be passed to [`Params`](super::Params) or can also be dropped if the node is +/// not running as collator. +pub struct SlotBasedBlockImportHandle<Block> { + receiver: TracingUnboundedReceiver<(Block, StorageProof)>, +} + +impl<Block> SlotBasedBlockImportHandle<Block> { + /// Returns the next item. + /// + /// The future will never return when the internal channel is closed. + pub async fn next(&mut self) -> (Block, StorageProof) { + loop { + if self.receiver.is_terminated() { + futures::pending!() + } else if let Some(res) = self.receiver.next().await { + return res + } + } + } +} + +/// Special block import for the slot based collator. +pub struct SlotBasedBlockImport<Block, BI, Client> { + inner: BI, + client: Arc<Client>, + sender: TracingUnboundedSender<(Block, StorageProof)>, +} + +impl<Block, BI, Client> SlotBasedBlockImport<Block, BI, Client> { + /// Create a new instance. + /// + /// The returned [`SlotBasedBlockImportHandle`] needs to be passed to the + /// [`Params`](super::Params), so that this block import instance can communicate with the + /// collation task. If the node is not running as a collator, just dropping the handle is fine. + pub fn new(inner: BI, client: Arc<Client>) -> (Self, SlotBasedBlockImportHandle<Block>) { + let (sender, receiver) = tracing_unbounded("SlotBasedBlockImportChannel", 1000); + + (Self { sender, client, inner }, SlotBasedBlockImportHandle { receiver }) + } +} + +impl<Block, BI: Clone, Client> Clone for SlotBasedBlockImport<Block, BI, Client> { + fn clone(&self) -> Self { + Self { inner: self.inner.clone(), client: self.client.clone(), sender: self.sender.clone() } + } +} + +#[async_trait::async_trait] +impl<Block, BI, Client> BlockImport<Block> for SlotBasedBlockImport<Block, BI, Client> +where + Block: BlockT, + BI: BlockImport<Block> + Send + Sync, + BI::Error: Into<sp_consensus::Error>, + Client: ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync, + Client::StateBackend: Send, + Client::Api: Core<Block>, +{ + type Error = sp_consensus::Error; + + async fn check_block( + &self, + block: sc_consensus::BlockCheckParams<Block>, + ) -> Result<sc_consensus::ImportResult, Self::Error> { + self.inner.check_block(block).await.map_err(Into::into) + } + + async fn import_block( + &self, + mut params: sc_consensus::BlockImportParams<Block>, + ) -> Result<sc_consensus::ImportResult, Self::Error> { + // If the channel exists and it is required to execute the block, we will execute the block + // here. This is done to collect the storage proof and to prevent re-execution, we push + // downwards the state changes. `StateAction::ApplyChanges` is ignored, because it either + // means that the node produced the block itself or the block was imported via state sync. + if !self.sender.is_closed() && !matches!(params.state_action, StateAction::ApplyChanges(_)) + { + let mut runtime_api = self.client.runtime_api(); + + runtime_api.set_call_context(CallContext::Onchain); + + runtime_api.record_proof(); + let recorder = runtime_api + .proof_recorder() + .expect("Proof recording is enabled in the line above; qed."); + runtime_api.register_extension(ProofSizeExt::new(recorder)); + + let parent_hash = *params.header.parent_hash(); + + let block = Block::new(params.header.clone(), params.body.clone().unwrap_or_default()); + + runtime_api + .execute_block(parent_hash, block.clone()) + .map_err(|e| Box::new(e) as Box<_>)?; + + let storage_proof = + runtime_api.extract_proof().expect("Proof recording was enabled above; qed"); + + let state = self.client.state_at(parent_hash).map_err(|e| Box::new(e) as Box<_>)?; + let gen_storage_changes = runtime_api + .into_storage_changes(&state, parent_hash) + .map_err(sp_consensus::Error::ChainLookup)?; + + if params.header.state_root() != &gen_storage_changes.transaction_storage_root { + return Err(sp_consensus::Error::Other(Box::new( + sp_blockchain::Error::InvalidStateRoot, + ))) + } + + params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes( + gen_storage_changes, + )); + + let _ = self.sender.unbounded_send((block, storage_proof)); + } + + self.inner.import_block(params).await.map_err(Into::into) + } +} diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs index 5b8151f6302..abaeb8319a4 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs @@ -47,6 +47,8 @@ pub struct Params<Block: BlockT, RClient, CS> { pub collator_service: CS, /// Receiver channel for communication with the block builder task. pub collator_receiver: TracingUnboundedReceiver<CollatorMessage<Block>>, + /// The handle from the special slot based block import. + pub block_import_handle: super::SlotBasedBlockImportHandle<Block>, } /// Asynchronously executes the collation task for a parachain. @@ -55,28 +57,49 @@ pub struct Params<Block: BlockT, RClient, CS> { /// collations to the relay chain. It listens for new best relay chain block notifications and /// handles collator messages. If our parachain is scheduled on a core and we have a candidate, /// the task will build a collation and send it to the relay chain. -pub async fn run_collation_task<Block, RClient, CS>(mut params: Params<Block, RClient, CS>) -where +pub async fn run_collation_task<Block, RClient, CS>( + Params { + relay_client, + collator_key, + para_id, + reinitialize, + collator_service, + mut collator_receiver, + mut block_import_handle, + }: Params<Block, RClient, CS>, +) where Block: BlockT, CS: CollatorServiceInterface<Block> + Send + Sync + 'static, RClient: RelayChainInterface + Clone + 'static, { - let Ok(mut overseer_handle) = params.relay_client.overseer_handle() else { + let Ok(mut overseer_handle) = relay_client.overseer_handle() else { tracing::error!(target: LOG_TARGET, "Failed to get overseer handle."); return }; cumulus_client_collator::initialize_collator_subsystems( &mut overseer_handle, - params.collator_key, - params.para_id, - params.reinitialize, + collator_key, + para_id, + reinitialize, ) .await; - let collator_service = params.collator_service; - while let Some(collator_message) = params.collator_receiver.next().await { - handle_collation_message(collator_message, &collator_service, &mut overseer_handle).await; + loop { + futures::select! { + collator_message = collator_receiver.next() => { + let Some(message) = collator_message else { + return; + }; + + handle_collation_message(message, &collator_service, &mut overseer_handle).await; + }, + block_import_msg = block_import_handle.next().fuse() => { + // TODO: Implement me. + // Issue: https://github.com/paritytech/polkadot-sdk/issues/6495 + let _ = block_import_msg; + } + } } } diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs index 18e63681d57..09afa18e6fb 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs @@ -54,11 +54,14 @@ use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, Member}; use std::{sync::Arc, time::Duration}; +pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle}; + mod block_builder_task; +mod block_import; mod collation_task; /// Parameters for [`run`]. -pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> { +pub struct Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> { /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. /// the timestamp, slot, and paras inherents should be omitted, as they are set by this /// collator. @@ -90,6 +93,8 @@ pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner /// Drift slots by a fixed duration. This can be used to create more preferrable authoring /// timings. pub slot_drift: Duration, + /// The handle returned by [`SlotBasedBlockImport`]. + pub block_import_handle: SlotBasedBlockImportHandle<Block>, /// Spawner for spawning futures. pub spawner: Spawner, } @@ -111,8 +116,9 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw authoring_duration, reinitialize, slot_drift, + block_import_handle, spawner, - }: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>, + }: Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>, ) where Block: BlockT, Client: ProvideRuntimeApi<Block> @@ -147,6 +153,7 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw reinitialize, collator_service: collator_service.clone(), collator_receiver: rx, + block_import_handle, }; let collation_task_fut = run_collation_task::<Block, _, _>(collator_task_params); diff --git a/cumulus/polkadot-omni-node/lib/Cargo.toml b/cumulus/polkadot-omni-node/lib/Cargo.toml index 4d003a69456..afbe03ada89 100644 --- a/cumulus/polkadot-omni-node/lib/Cargo.toml +++ b/cumulus/polkadot-omni-node/lib/Cargo.toml @@ -67,6 +67,7 @@ pallet-transaction-payment = { workspace = true, default-features = true } pallet-transaction-payment-rpc-runtime-api = { workspace = true, default-features = true } sp-inherents = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } +sp-consensus = { workspace = true, default-features = true } sp-consensus-aura = { workspace = true, default-features = true } sp-io = { workspace = true, default-features = true } sp-wasm-interface = { workspace = true, default-features = true } diff --git a/cumulus/polkadot-omni-node/lib/src/common/spec.rs b/cumulus/polkadot-omni-node/lib/src/common/spec.rs index 38f0e7d7288..868368f3ca1 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/spec.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/spec.rs @@ -44,23 +44,28 @@ use sc_transaction_pool::TransactionPoolHandle; use sp_keystore::KeystorePtr; use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; -pub(crate) trait BuildImportQueue<Block: BlockT, RuntimeApi> { +pub(crate) trait BuildImportQueue< + Block: BlockT, + RuntimeApi, + BlockImport: sc_consensus::BlockImport<Block>, +> +{ fn build_import_queue( client: Arc<ParachainClient<Block, RuntimeApi>>, - block_import: ParachainBlockImport<Block, RuntimeApi>, + block_import: ParachainBlockImport<Block, BlockImport>, config: &Configuration, telemetry_handle: Option<TelemetryHandle>, task_manager: &TaskManager, ) -> sc_service::error::Result<DefaultImportQueue<Block>>; } -pub(crate) trait StartConsensus<Block: BlockT, RuntimeApi> +pub(crate) trait StartConsensus<Block: BlockT, RuntimeApi, BI, BIAuxiliaryData> where RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>, { fn start_consensus( client: Arc<ParachainClient<Block, RuntimeApi>>, - block_import: ParachainBlockImport<Block, RuntimeApi>, + block_import: ParachainBlockImport<Block, BI>, prometheus_registry: Option<&Registry>, telemetry: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -74,6 +79,7 @@ where announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>, backend: Arc<ParachainBackend<Block>>, node_extra_args: NodeExtraArgs, + block_import_extra_return_value: BIAuxiliaryData, ) -> Result<(), sc_service::Error>; } @@ -92,6 +98,31 @@ fn warn_if_slow_hardware(hwbench: &sc_sysinfo::HwBench) { } } +pub(crate) trait InitBlockImport<Block: BlockT, RuntimeApi> { + type BlockImport: sc_consensus::BlockImport<Block> + Clone + Send + Sync; + type BlockImportAuxiliaryData; + + fn init_block_import( + client: Arc<ParachainClient<Block, RuntimeApi>>, + ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)>; +} + +pub(crate) struct ClientBlockImport; + +impl<Block: BlockT, RuntimeApi> InitBlockImport<Block, RuntimeApi> for ClientBlockImport +where + RuntimeApi: Send + ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>, +{ + type BlockImport = Arc<ParachainClient<Block, RuntimeApi>>; + type BlockImportAuxiliaryData = (); + + fn init_block_import( + client: Arc<ParachainClient<Block, RuntimeApi>>, + ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)> { + Ok((client.clone(), ())) + } +} + pub(crate) trait BaseNodeSpec { type Block: NodeBlock; @@ -100,7 +131,13 @@ pub(crate) trait BaseNodeSpec { ParachainClient<Self::Block, Self::RuntimeApi>, >; - type BuildImportQueue: BuildImportQueue<Self::Block, Self::RuntimeApi>; + type BuildImportQueue: BuildImportQueue< + Self::Block, + Self::RuntimeApi, + <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport, + >; + + type InitBlockImport: self::InitBlockImport<Self::Block, Self::RuntimeApi>; /// Starts a `ServiceBuilder` for a full service. /// @@ -108,7 +145,14 @@ pub(crate) trait BaseNodeSpec { /// be able to perform chain operations. fn new_partial( config: &Configuration, - ) -> sc_service::error::Result<ParachainService<Self::Block, Self::RuntimeApi>> { + ) -> sc_service::error::Result< + ParachainService< + Self::Block, + Self::RuntimeApi, + <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport, + <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData + > + >{ let telemetry = config .telemetry_endpoints .clone() @@ -160,7 +204,10 @@ pub(crate) trait BaseNodeSpec { .build(), ); - let block_import = ParachainBlockImport::new(client.clone(), backend.clone()); + let (block_import, block_import_auxiliary_data) = + Self::InitBlockImport::init_block_import(client.clone())?; + + let block_import = ParachainBlockImport::new(block_import, backend.clone()); let import_queue = Self::BuildImportQueue::build_import_queue( client.clone(), @@ -178,7 +225,7 @@ pub(crate) trait BaseNodeSpec { task_manager, transaction_pool, select_chain: (), - other: (block_import, telemetry, telemetry_worker_handle), + other: (block_import, telemetry, telemetry_worker_handle, block_import_auxiliary_data), }) } } @@ -190,7 +237,12 @@ pub(crate) trait NodeSpec: BaseNodeSpec { TransactionPoolHandle<Self::Block, ParachainClient<Self::Block, Self::RuntimeApi>>, >; - type StartConsensus: StartConsensus<Self::Block, Self::RuntimeApi>; + type StartConsensus: StartConsensus< + Self::Block, + Self::RuntimeApi, + <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport, + <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData, + >; const SYBIL_RESISTANCE: CollatorSybilResistance; @@ -212,7 +264,8 @@ pub(crate) trait NodeSpec: BaseNodeSpec { let parachain_config = prepare_node_config(parachain_config); let params = Self::new_partial(¶chain_config)?; - let (block_import, mut telemetry, telemetry_worker_handle) = params.other; + let (block_import, mut telemetry, telemetry_worker_handle, block_import_auxiliary_data) = + params.other; let client = params.client.clone(); let backend = params.backend.clone(); let mut task_manager = params.task_manager; @@ -340,6 +393,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec { announce_block, backend.clone(), node_extra_args, + block_import_auxiliary_data, )?; } diff --git a/cumulus/polkadot-omni-node/lib/src/common/types.rs b/cumulus/polkadot-omni-node/lib/src/common/types.rs index 4bc58dc9db7..978368be258 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/types.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/types.rs @@ -22,7 +22,6 @@ use sc_service::{PartialComponents, TFullBackend, TFullClient}; use sc_telemetry::{Telemetry, TelemetryWorkerHandle}; use sc_transaction_pool::TransactionPoolHandle; use sp_runtime::{generic, traits::BlakeTwo256}; -use std::sync::Arc; pub use parachains_common::{AccountId, Balance, Hash, Nonce}; @@ -42,15 +41,20 @@ pub type ParachainClient<Block, RuntimeApi> = pub type ParachainBackend<Block> = TFullBackend<Block>; -pub type ParachainBlockImport<Block, RuntimeApi> = - TParachainBlockImport<Block, Arc<ParachainClient<Block, RuntimeApi>>, ParachainBackend<Block>>; +pub type ParachainBlockImport<Block, BI> = + TParachainBlockImport<Block, BI, ParachainBackend<Block>>; /// Assembly of PartialComponents (enough to run chain ops subcommands) -pub type ParachainService<Block, RuntimeApi> = PartialComponents< +pub type ParachainService<Block, RuntimeApi, BI, BIExtraReturnValue> = PartialComponents< ParachainClient<Block, RuntimeApi>, ParachainBackend<Block>, (), DefaultImportQueue<Block>, TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>, - (ParachainBlockImport<Block, RuntimeApi>, Option<Telemetry>, Option<TelemetryWorkerHandle>), + ( + ParachainBlockImport<Block, BI>, + Option<Telemetry>, + Option<TelemetryWorkerHandle>, + BIExtraReturnValue, + ), >; diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs index 0b2c230f695..816f76117a2 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs @@ -18,7 +18,10 @@ use crate::{ common::{ aura::{AuraIdT, AuraRuntimeApi}, rpc::BuildParachainRpcExtensions, - spec::{BaseNodeSpec, BuildImportQueue, NodeSpec, StartConsensus}, + spec::{ + BaseNodeSpec, BuildImportQueue, ClientBlockImport, InitBlockImport, NodeSpec, + StartConsensus, + }, types::{ AccountId, Balance, Hash, Nonce, ParachainBackend, ParachainBlockImport, ParachainClient, @@ -30,11 +33,14 @@ use crate::{ use cumulus_client_collator::service::{ CollatorService, ServiceInterface as CollatorServiceInterface, }; -use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams}; #[docify::export(slot_based_colator_import)] use cumulus_client_consensus_aura::collators::slot_based::{ self as slot_based, Params as SlotBasedParams, }; +use cumulus_client_consensus_aura::collators::{ + lookahead::{self as aura, Params as AuraParams}, + slot_based::{SlotBasedBlockImport, SlotBasedBlockImportHandle}, +}; use cumulus_client_consensus_proposer::{Proposer, ProposerInterface}; use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier; #[allow(deprecated)] @@ -91,20 +97,23 @@ where /// Build the import queue for parachain runtimes that started with relay chain consensus and /// switched to aura. -pub(crate) struct BuildRelayToAuraImportQueue<Block, RuntimeApi, AuraId>( - PhantomData<(Block, RuntimeApi, AuraId)>, +pub(crate) struct BuildRelayToAuraImportQueue<Block, RuntimeApi, AuraId, BlockImport>( + PhantomData<(Block, RuntimeApi, AuraId, BlockImport)>, ); -impl<Block: BlockT, RuntimeApi, AuraId> BuildImportQueue<Block, RuntimeApi> - for BuildRelayToAuraImportQueue<Block, RuntimeApi, AuraId> +impl<Block: BlockT, RuntimeApi, AuraId, BlockImport> + BuildImportQueue<Block, RuntimeApi, BlockImport> + for BuildRelayToAuraImportQueue<Block, RuntimeApi, AuraId, BlockImport> where RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>, RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>, AuraId: AuraIdT + Sync, + BlockImport: + sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + Send + Sync + 'static, { fn build_import_queue( client: Arc<ParachainClient<Block, RuntimeApi>>, - block_import: ParachainBlockImport<Block, RuntimeApi>, + block_import: ParachainBlockImport<Block, BlockImport>, config: &Configuration, telemetry_handle: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -159,20 +168,20 @@ where /// Uses the lookahead collator to support async backing. /// /// Start an aura powered parachain node. Some system chains use this. -pub(crate) struct AuraNode<Block, RuntimeApi, AuraId, StartConsensus>( - pub PhantomData<(Block, RuntimeApi, AuraId, StartConsensus)>, +pub(crate) struct AuraNode<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport>( + pub PhantomData<(Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport)>, ); -impl<Block, RuntimeApi, AuraId, StartConsensus> Default - for AuraNode<Block, RuntimeApi, AuraId, StartConsensus> +impl<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport> Default + for AuraNode<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport> { fn default() -> Self { Self(Default::default()) } } -impl<Block, RuntimeApi, AuraId, StartConsensus> BaseNodeSpec - for AuraNode<Block, RuntimeApi, AuraId, StartConsensus> +impl<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport> BaseNodeSpec + for AuraNode<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport> where Block: NodeBlock, RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>, @@ -180,14 +189,19 @@ where + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance> + substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>, AuraId: AuraIdT + Sync, + InitBlockImport: self::InitBlockImport<Block, RuntimeApi> + Send, + InitBlockImport::BlockImport: + sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + 'static, { type Block = Block; type RuntimeApi = RuntimeApi; - type BuildImportQueue = BuildRelayToAuraImportQueue<Block, RuntimeApi, AuraId>; + type BuildImportQueue = + BuildRelayToAuraImportQueue<Block, RuntimeApi, AuraId, InitBlockImport::BlockImport>; + type InitBlockImport = InitBlockImport; } -impl<Block, RuntimeApi, AuraId, StartConsensus> NodeSpec - for AuraNode<Block, RuntimeApi, AuraId, StartConsensus> +impl<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport> NodeSpec + for AuraNode<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport> where Block: NodeBlock, RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>, @@ -195,7 +209,15 @@ where + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance> + substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>, AuraId: AuraIdT + Sync, - StartConsensus: self::StartConsensus<Block, RuntimeApi> + 'static, + StartConsensus: self::StartConsensus< + Block, + RuntimeApi, + InitBlockImport::BlockImport, + InitBlockImport::BlockImportAuxiliaryData, + > + 'static, + InitBlockImport: self::InitBlockImport<Block, RuntimeApi> + Send, + InitBlockImport::BlockImport: + sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + 'static, { type BuildRpcExtensions = BuildParachainRpcExtensions<Block, RuntimeApi>; type StartConsensus = StartConsensus; @@ -219,6 +241,7 @@ where RuntimeApi, AuraId, StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId>, + StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId>, >::default()) } else { Box::new(AuraNode::< @@ -226,6 +249,7 @@ where RuntimeApi, AuraId, StartLookaheadAuraConsensus<Block, RuntimeApi, AuraId>, + ClientBlockImport, >::default()) } } @@ -245,7 +269,15 @@ where #[docify::export_content] fn launch_slot_based_collator<CIDP, CHP, Proposer, CS, Spawner>( params: SlotBasedParams< - ParachainBlockImport<Block, RuntimeApi>, + Block, + ParachainBlockImport< + Block, + SlotBasedBlockImport< + Block, + Arc<ParachainClient<Block, RuntimeApi>>, + ParachainClient<Block, RuntimeApi>, + >, + >, CIDP, ParachainClient<Block, RuntimeApi>, ParachainBackend<Block>, @@ -267,8 +299,17 @@ where } } -impl<Block: BlockT<Hash = DbHash>, RuntimeApi, AuraId> StartConsensus<Block, RuntimeApi> - for StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId> +impl<Block: BlockT<Hash = DbHash>, RuntimeApi, AuraId> + StartConsensus< + Block, + RuntimeApi, + SlotBasedBlockImport< + Block, + Arc<ParachainClient<Block, RuntimeApi>>, + ParachainClient<Block, RuntimeApi>, + >, + SlotBasedBlockImportHandle<Block>, + > for StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId> where RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>, RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>, @@ -276,7 +317,14 @@ where { fn start_consensus( client: Arc<ParachainClient<Block, RuntimeApi>>, - block_import: ParachainBlockImport<Block, RuntimeApi>, + block_import: ParachainBlockImport< + Block, + SlotBasedBlockImport< + Block, + Arc<ParachainClient<Block, RuntimeApi>>, + ParachainClient<Block, RuntimeApi>, + >, + >, prometheus_registry: Option<&Registry>, telemetry: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -290,6 +338,7 @@ where announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>, backend: Arc<ParachainBackend<Block>>, _node_extra_args: NodeExtraArgs, + block_import_handle: SlotBasedBlockImportHandle<Block>, ) -> Result<(), Error> { let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( task_manager.spawn_handle(), @@ -325,6 +374,7 @@ where authoring_duration: Duration::from_millis(2000), reinitialize: false, slot_drift: Duration::from_secs(1), + block_import_handle, spawner: task_manager.spawn_handle(), }; @@ -336,6 +386,27 @@ where } } +impl<Block: BlockT<Hash = DbHash>, RuntimeApi, AuraId> InitBlockImport<Block, RuntimeApi> + for StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId> +where + RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>, + RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>, + AuraId: AuraIdT + Sync, +{ + type BlockImport = SlotBasedBlockImport< + Block, + Arc<ParachainClient<Block, RuntimeApi>>, + ParachainClient<Block, RuntimeApi>, + >; + type BlockImportAuxiliaryData = SlotBasedBlockImportHandle<Block>; + + fn init_block_import( + client: Arc<ParachainClient<Block, RuntimeApi>>, + ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)> { + Ok(SlotBasedBlockImport::new(client.clone(), client)) + } +} + /// Wait for the Aura runtime API to appear on chain. /// This is useful for chains that started out without Aura. Components that /// are depending on Aura functionality will wait until Aura appears in the runtime. @@ -364,7 +435,8 @@ pub(crate) struct StartLookaheadAuraConsensus<Block, RuntimeApi, AuraId>( PhantomData<(Block, RuntimeApi, AuraId)>, ); -impl<Block: BlockT<Hash = DbHash>, RuntimeApi, AuraId> StartConsensus<Block, RuntimeApi> +impl<Block: BlockT<Hash = DbHash>, RuntimeApi, AuraId> + StartConsensus<Block, RuntimeApi, Arc<ParachainClient<Block, RuntimeApi>>, ()> for StartLookaheadAuraConsensus<Block, RuntimeApi, AuraId> where RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>, @@ -373,7 +445,7 @@ where { fn start_consensus( client: Arc<ParachainClient<Block, RuntimeApi>>, - block_import: ParachainBlockImport<Block, RuntimeApi>, + block_import: ParachainBlockImport<Block, Arc<ParachainClient<Block, RuntimeApi>>>, prometheus_registry: Option<&Registry>, telemetry: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -387,6 +459,7 @@ where announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>, backend: Arc<ParachainBackend<Block>>, node_extra_args: NodeExtraArgs, + _: (), ) -> Result<(), Error> { let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( task_manager.spawn_handle(), diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs b/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs index 7e36ce735af..8b7921da30c 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs @@ -16,7 +16,7 @@ use crate::common::{ rpc::BuildRpcExtensions as BuildRpcExtensionsT, - spec::{BaseNodeSpec, BuildImportQueue, NodeSpec as NodeSpecT}, + spec::{BaseNodeSpec, BuildImportQueue, ClientBlockImport, NodeSpec as NodeSpecT}, types::{Hash, ParachainBlockImport, ParachainClient}, }; use codec::Encode; @@ -32,12 +32,19 @@ use std::{marker::PhantomData, sync::Arc}; pub struct ManualSealNode<NodeSpec>(PhantomData<NodeSpec>); -impl<NodeSpec: NodeSpecT> BuildImportQueue<NodeSpec::Block, NodeSpec::RuntimeApi> - for ManualSealNode<NodeSpec> +impl<NodeSpec: NodeSpecT> + BuildImportQueue< + NodeSpec::Block, + NodeSpec::RuntimeApi, + Arc<ParachainClient<NodeSpec::Block, NodeSpec::RuntimeApi>>, + > for ManualSealNode<NodeSpec> { fn build_import_queue( client: Arc<ParachainClient<NodeSpec::Block, NodeSpec::RuntimeApi>>, - _block_import: ParachainBlockImport<NodeSpec::Block, NodeSpec::RuntimeApi>, + _block_import: ParachainBlockImport< + NodeSpec::Block, + Arc<ParachainClient<NodeSpec::Block, NodeSpec::RuntimeApi>>, + >, config: &Configuration, _telemetry_handle: Option<TelemetryHandle>, task_manager: &TaskManager, @@ -54,6 +61,7 @@ impl<NodeSpec: NodeSpecT> BaseNodeSpec for ManualSealNode<NodeSpec> { type Block = NodeSpec::Block; type RuntimeApi = NodeSpec::RuntimeApi; type BuildImportQueue = Self; + type InitBlockImport = ClientBlockImport; } impl<NodeSpec: NodeSpecT> ManualSealNode<NodeSpec> { @@ -78,7 +86,7 @@ impl<NodeSpec: NodeSpecT> ManualSealNode<NodeSpec> { keystore_container, select_chain: _, transaction_pool, - other: (_, mut telemetry, _), + other: (_, mut telemetry, _, _), } = Self::new_partial(&config)?; let select_chain = LongestChain::new(backend.clone()); diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index f01da9becef..2c13d20333a 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -27,7 +27,10 @@ use cumulus_client_collator::service::CollatorService; use cumulus_client_consensus_aura::{ collators::{ lookahead::{self as aura, Params as AuraParams}, - slot_based::{self as slot_based, Params as SlotBasedParams}, + slot_based::{ + self as slot_based, Params as SlotBasedParams, SlotBasedBlockImport, + SlotBasedBlockImportHandle, + }, }, ImportQueueParams, }; @@ -131,7 +134,8 @@ pub type Client = TFullClient<runtime::NodeBlock, runtime::RuntimeApi, WasmExecu pub type Backend = TFullBackend<Block>; /// The block-import type being used by the test service. -pub type ParachainBlockImport = TParachainBlockImport<Block, Arc<Client>, Backend>; +pub type ParachainBlockImport = + TParachainBlockImport<Block, SlotBasedBlockImport<Block, Arc<Client>, Client>, Backend>; /// Transaction pool type used by the test service pub type TransactionPool = Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>; @@ -184,7 +188,7 @@ pub type Service = PartialComponents< (), sc_consensus::import_queue::BasicQueue<Block>, sc_transaction_pool::TransactionPoolHandle<Block, Client>, - ParachainBlockImport, + (ParachainBlockImport, SlotBasedBlockImportHandle<Block>), >; /// Starts a `ServiceBuilder` for a full service. @@ -217,7 +221,9 @@ pub fn new_partial( )?; let client = Arc::new(client); - let block_import = ParachainBlockImport::new(client.clone(), backend.clone()); + let (block_import, slot_based_handle) = + SlotBasedBlockImport::new(client.clone(), client.clone()); + let block_import = ParachainBlockImport::new(block_import, backend.clone()); let transaction_pool = Arc::from( sc_transaction_pool::Builder::new( @@ -260,7 +266,7 @@ pub fn new_partial( task_manager, transaction_pool, select_chain: (), - other: block_import, + other: (block_import, slot_based_handle), }; Ok(params) @@ -349,7 +355,8 @@ where let client = params.client.clone(); let backend = params.backend.clone(); - let block_import = params.other; + let block_import = params.other.0; + let slot_based_handle = params.other.1; let relay_chain_interface = build_relay_chain_interface( relay_chain_config, parachain_config.prometheus_registry(), @@ -497,6 +504,7 @@ where authoring_duration: Duration::from_millis(2000), reinitialize: false, slot_drift: Duration::from_secs(1), + block_import_handle: slot_based_handle, spawner: task_manager.spawn_handle(), }; diff --git a/prdoc/pr_6481.prdoc b/prdoc/pr_6481.prdoc new file mode 100644 index 00000000000..83ba0a32eb2 --- /dev/null +++ b/prdoc/pr_6481.prdoc @@ -0,0 +1,10 @@ +title: 'slot-based-collator: Implement dedicated block import' +doc: +- audience: Node Dev + description: |- + The `SlotBasedBlockImport` job is to collect the storage proofs of all blocks getting imported. These storage proofs alongside the block are being forwarded to the collation task. Right now they are just being thrown away. More logic will follow later. Basically this will be required to include multiple blocks into one `PoV` which will then be done by the collation task. +crates: +- name: cumulus-client-consensus-aura + bump: major +- name: polkadot-omni-node-lib + bump: major -- GitLab