Skip to content
Snippets Groups Projects
Unverified Commit e64b53c2 authored by Mrisho Lukamba's avatar Mrisho Lukamba Committed by GitHub
Browse files

feat(collator) add export pov on slot base collator (#7585)

Closes #7573

@skunert  @bkchr



---------

Co-authored-by: default avatarBastian Köcher <git@kchr.de>
Co-authored-by: default avatarcmd[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: default avatarSebastian Kunert <skunert49@gmail.com>
parent f7e98b40
No related merge requests found
Pipeline #517221 waiting for manual action with stages
in 1 hour, 17 minutes, and 57 seconds
...@@ -40,14 +40,14 @@ use cumulus_primitives_aura::AuraUnincludedSegmentApi; ...@@ -40,14 +40,14 @@ use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{ClaimQueueOffset, CollectCollationInfo, PersistedValidationData}; use cumulus_primitives_core::{ClaimQueueOffset, CollectCollationInfo, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface; use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_node_primitives::{PoV, SubmitCollationParams}; use polkadot_node_primitives::SubmitCollationParams;
use polkadot_node_subsystem::messages::CollationGenerationMessage; use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle; use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{ use polkadot_primitives::{
vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, BlockNumber as RBlockNumber, CollatorPair, Hash as RHash, vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, CollatorPair, Id as ParaId, OccupiedCoreAssumption,
HeadData, Id as ParaId, OccupiedCoreAssumption,
}; };
use crate::{collator as collator_util, export_pov_to_path};
use futures::prelude::*; use futures::prelude::*;
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf}; use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
use sc_consensus::BlockImport; use sc_consensus::BlockImport;
...@@ -58,49 +58,8 @@ use sp_consensus_aura::{AuraApi, Slot}; ...@@ -58,49 +58,8 @@ use sp_consensus_aura::{AuraApi, Slot};
use sp_core::crypto::Pair; use sp_core::crypto::Pair;
use sp_inherents::CreateInherentDataProviders; use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr; use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, NumberFor}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
use std::{ use std::{path::PathBuf, sync::Arc, time::Duration};
fs::{self, File},
path::PathBuf,
sync::Arc,
time::Duration,
};
use crate::{collator as collator_util, LOG_TARGET};
/// Export the given `pov` to the file system at `path`.
///
/// The file will be named `block_hash_block_number.pov`.
///
/// The `parent_header`, `relay_parent_storage_root` and `relay_parent_number` will also be
/// stored in the file alongside the `pov`. This enables stateless validation of the `pov`.
fn export_pov_to_path<Block: BlockT>(
path: PathBuf,
pov: PoV,
block_hash: Block::Hash,
block_number: NumberFor<Block>,
parent_header: Block::Header,
relay_parent_storage_root: RHash,
relay_parent_number: RBlockNumber,
) {
if let Err(error) = fs::create_dir_all(&path) {
tracing::error!(target: LOG_TARGET, %error, path = %path.display(), "Failed to create PoV export directory");
return
}
let mut file = match File::create(path.join(format!("{block_hash:?}_{block_number}.pov"))) {
Ok(f) => f,
Err(error) => {
tracing::error!(target: LOG_TARGET, %error, "Failed to export PoV.");
return
},
};
pov.encode_to(&mut file);
HeadData(parent_header.encode()).encode_to(&mut file);
relay_parent_storage_root.encode_to(&mut file);
relay_parent_number.encode_to(&mut file);
}
/// Parameters for [`run`]. /// Parameters for [`run`].
pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> { pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
// along with Cumulus. If not, see <https://www.gnu.org/licenses/>. // along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
use codec::Encode; use codec::Encode;
use std::path::PathBuf;
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
use cumulus_relay_chain_interface::RelayChainInterface; use cumulus_relay_chain_interface::RelayChainInterface;
...@@ -25,8 +26,10 @@ use polkadot_node_subsystem::messages::CollationGenerationMessage; ...@@ -25,8 +26,10 @@ use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle; use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{CollatorPair, Id as ParaId}; use polkadot_primitives::{CollatorPair, Id as ParaId};
use cumulus_primitives_core::relay_chain::BlockId;
use futures::prelude::*; use futures::prelude::*;
use crate::export_pov_to_path;
use sc_utils::mpsc::TracingUnboundedReceiver; use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_runtime::traits::{Block as BlockT, Header}; use sp_runtime::traits::{Block as BlockT, Header};
...@@ -50,6 +53,8 @@ pub struct Params<Block: BlockT, RClient, CS> { ...@@ -50,6 +53,8 @@ pub struct Params<Block: BlockT, RClient, CS> {
pub collator_receiver: TracingUnboundedReceiver<CollatorMessage<Block>>, pub collator_receiver: TracingUnboundedReceiver<CollatorMessage<Block>>,
/// The handle from the special slot based block import. /// The handle from the special slot based block import.
pub block_import_handle: super::SlotBasedBlockImportHandle<Block>, pub block_import_handle: super::SlotBasedBlockImportHandle<Block>,
/// When set, the collator will export every produced `POV` to this folder.
pub export_pov: Option<PathBuf>,
} }
/// Asynchronously executes the collation task for a parachain. /// Asynchronously executes the collation task for a parachain.
...@@ -67,6 +72,7 @@ pub async fn run_collation_task<Block, RClient, CS>( ...@@ -67,6 +72,7 @@ pub async fn run_collation_task<Block, RClient, CS>(
collator_service, collator_service,
mut collator_receiver, mut collator_receiver,
mut block_import_handle, mut block_import_handle,
export_pov,
}: Params<Block, RClient, CS>, }: Params<Block, RClient, CS>,
) where ) where
Block: BlockT, Block: BlockT,
...@@ -93,7 +99,7 @@ pub async fn run_collation_task<Block, RClient, CS>( ...@@ -93,7 +99,7 @@ pub async fn run_collation_task<Block, RClient, CS>(
return; return;
}; };
handle_collation_message(message, &collator_service, &mut overseer_handle).await; handle_collation_message(message, &collator_service, &mut overseer_handle,relay_client.clone(),export_pov.clone()).await;
}, },
block_import_msg = block_import_handle.next().fuse() => { block_import_msg = block_import_handle.next().fuse() => {
// TODO: Implement me. // TODO: Implement me.
...@@ -107,10 +113,12 @@ pub async fn run_collation_task<Block, RClient, CS>( ...@@ -107,10 +113,12 @@ pub async fn run_collation_task<Block, RClient, CS>(
/// Handle an incoming collation message from the block builder task. /// Handle an incoming collation message from the block builder task.
/// This builds the collation from the [`CollatorMessage`] and submits it to /// This builds the collation from the [`CollatorMessage`] and submits it to
/// the collation-generation subsystem of the relay chain. /// the collation-generation subsystem of the relay chain.
async fn handle_collation_message<Block: BlockT>( async fn handle_collation_message<Block: BlockT, RClient: RelayChainInterface + Clone + 'static>(
message: CollatorMessage<Block>, message: CollatorMessage<Block>,
collator_service: &impl CollatorServiceInterface<Block>, collator_service: &impl CollatorServiceInterface<Block>,
overseer_handle: &mut OverseerHandle, overseer_handle: &mut OverseerHandle,
relay_client: RClient,
export_pov: Option<PathBuf>,
) { ) {
let CollatorMessage { let CollatorMessage {
parent_header, parent_header,
...@@ -140,6 +148,24 @@ async fn handle_collation_message<Block: BlockT>( ...@@ -140,6 +148,24 @@ async fn handle_collation_message<Block: BlockT>(
); );
if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
if let Some(pov_path) = export_pov {
if let Ok(Some(relay_parent_header)) =
relay_client.header(BlockId::Hash(relay_parent)).await
{
export_pov_to_path::<Block>(
pov_path.clone(),
pov.clone(),
block_data.header().hash(),
*block_data.header().number(),
parent_header.clone(),
relay_parent_header.state_root,
relay_parent_header.number,
);
} else {
tracing::error!(target: LOG_TARGET, "Failed to get relay parent header from hash: {relay_parent:?}");
}
}
tracing::info!( tracing::info!(
target: LOG_TARGET, target: LOG_TARGET,
"Compressed PoV size: {}kb", "Compressed PoV size: {}kb",
......
...@@ -54,7 +54,7 @@ use sp_core::{crypto::Pair, traits::SpawnNamed, U256}; ...@@ -54,7 +54,7 @@ use sp_core::{crypto::Pair, traits::SpawnNamed, U256};
use sp_inherents::CreateInherentDataProviders; use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr; use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Member, NumberFor, One}; use sp_runtime::traits::{Block as BlockT, Member, NumberFor, One};
use std::{sync::Arc, time::Duration}; use std::{path::PathBuf, sync::Arc, time::Duration};
pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle}; pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle};
...@@ -100,28 +100,13 @@ pub struct Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, ...@@ -100,28 +100,13 @@ pub struct Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS,
pub block_import_handle: SlotBasedBlockImportHandle<Block>, pub block_import_handle: SlotBasedBlockImportHandle<Block>,
/// Spawner for spawning futures. /// Spawner for spawning futures.
pub spawner: Spawner, pub spawner: Spawner,
/// When set, the collator will export every produced `POV` to this folder.
pub export_pov: Option<PathBuf>,
} }
/// Run aura-based block building and collation task. /// Run aura-based block building and collation task.
pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>( pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>(
Params { params: Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,
create_inherent_data_providers,
block_import,
para_client,
para_backend,
relay_client,
code_hash_provider,
keystore,
collator_key,
para_id,
proposer,
collator_service,
authoring_duration,
reinitialize,
slot_drift,
block_import_handle,
spawner,
}: Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,
) where ) where
Block: BlockT, Block: BlockT,
Client: ProvideRuntimeApi<Block> Client: ProvideRuntimeApi<Block>
...@@ -148,6 +133,26 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw ...@@ -148,6 +133,26 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw
P::Signature: TryFrom<Vec<u8>> + Member + Codec, P::Signature: TryFrom<Vec<u8>> + Member + Codec,
Spawner: SpawnNamed, Spawner: SpawnNamed,
{ {
let Params {
create_inherent_data_providers,
block_import,
para_client,
para_backend,
relay_client,
code_hash_provider,
keystore,
collator_key,
para_id,
proposer,
collator_service,
authoring_duration,
reinitialize,
slot_drift,
block_import_handle,
spawner,
export_pov,
} = params;
let (tx, rx) = tracing_unbounded("mpsc_builder_to_collator", 100); let (tx, rx) = tracing_unbounded("mpsc_builder_to_collator", 100);
let collator_task_params = collation_task::Params { let collator_task_params = collation_task::Params {
relay_client: relay_client.clone(), relay_client: relay_client.clone(),
...@@ -157,6 +162,7 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw ...@@ -157,6 +162,7 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw
collator_service: collator_service.clone(), collator_service: collator_service.clone(),
collator_receiver: rx, collator_receiver: rx,
block_import_handle, block_import_handle,
export_pov,
}; };
let collation_task_fut = run_collation_task::<Block, _, _>(collator_task_params); let collation_task_fut = run_collation_task::<Block, _, _>(collator_task_params);
......
...@@ -23,13 +23,15 @@ ...@@ -23,13 +23,15 @@
//! //!
//! For more information about AuRa, the Substrate crate should be checked. //! For more information about AuRa, the Substrate crate should be checked.
use codec::Codec; use codec::{Codec, Encode};
use cumulus_client_consensus_common::{ use cumulus_client_consensus_common::{
ParachainBlockImportMarker, ParachainCandidate, ParachainConsensus, ParachainBlockImportMarker, ParachainCandidate, ParachainConsensus,
}; };
use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData}; use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData};
use cumulus_primitives_core::relay_chain::HeadData;
use futures::lock::Mutex; use futures::lock::Mutex;
use polkadot_primitives::{BlockNumber as RBlockNumber, Hash as RHash};
use sc_client_api::{backend::AuxStore, BlockOf}; use sc_client_api::{backend::AuxStore, BlockOf};
use sc_consensus::BlockImport; use sc_consensus::BlockImport;
use sc_consensus_slots::{BackoffAuthoringBlocksStrategy, SimpleSlotWorker, SlotInfo}; use sc_consensus_slots::{BackoffAuthoringBlocksStrategy, SimpleSlotWorker, SlotInfo};
...@@ -45,7 +47,10 @@ use sp_keystore::KeystorePtr; ...@@ -45,7 +47,10 @@ use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, NumberFor}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, NumberFor};
use std::{ use std::{
convert::TryFrom, convert::TryFrom,
fs,
fs::File,
marker::PhantomData, marker::PhantomData,
path::PathBuf,
sync::{ sync::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, Arc,
...@@ -55,6 +60,7 @@ use std::{ ...@@ -55,6 +60,7 @@ use std::{
mod import_queue; mod import_queue;
pub use import_queue::{build_verifier, import_queue, BuildVerifierParams, ImportQueueParams}; pub use import_queue::{build_verifier, import_queue, BuildVerifierParams, ImportQueueParams};
use polkadot_node_primitives::PoV;
pub use sc_consensus_aura::{ pub use sc_consensus_aura::{
slot_duration, standalone::slot_duration_at, AuraVerifier, BuildAuraWorkerParams, slot_duration, standalone::slot_duration_at, AuraVerifier, BuildAuraWorkerParams,
SlotProportion, SlotProportion,
...@@ -252,3 +258,37 @@ where ...@@ -252,3 +258,37 @@ where
Some(ParachainCandidate { block: res.block, proof: res.storage_proof }) Some(ParachainCandidate { block: res.block, proof: res.storage_proof })
} }
} }
/// Export the given `pov` to the file system at `path`.
///
/// The file will be named `block_hash_block_number.pov`.
///
/// The `parent_header`, `relay_parent_storage_root` and `relay_parent_number` will also be
/// stored in the file alongside the `pov`. This enables stateless validation of the `pov`.
pub(crate) fn export_pov_to_path<Block: BlockT>(
path: PathBuf,
pov: PoV,
block_hash: Block::Hash,
block_number: NumberFor<Block>,
parent_header: Block::Header,
relay_parent_storage_root: RHash,
relay_parent_number: RBlockNumber,
) {
if let Err(error) = fs::create_dir_all(&path) {
tracing::error!(target: LOG_TARGET, %error, path = %path.display(), "Failed to create PoV export directory");
return
}
let mut file = match File::create(path.join(format!("{block_hash:?}_{block_number}.pov"))) {
Ok(f) => f,
Err(error) => {
tracing::error!(target: LOG_TARGET, %error, "Failed to export PoV.");
return
},
};
pov.encode_to(&mut file);
HeadData(parent_header.encode()).encode_to(&mut file);
relay_parent_storage_root.encode_to(&mut file);
relay_parent_number.encode_to(&mut file);
}
...@@ -250,7 +250,7 @@ where ...@@ -250,7 +250,7 @@ where
{ {
#[docify::export_content] #[docify::export_content]
fn launch_slot_based_collator<CIDP, CHP, Proposer, CS, Spawner>( fn launch_slot_based_collator<CIDP, CHP, Proposer, CS, Spawner>(
params: SlotBasedParams< params_with_export: SlotBasedParams<
Block, Block,
ParachainBlockImport< ParachainBlockImport<
Block, Block,
...@@ -277,7 +277,9 @@ where ...@@ -277,7 +277,9 @@ where
CS: CollatorServiceInterface<Block> + Send + Sync + Clone + 'static, CS: CollatorServiceInterface<Block> + Send + Sync + Clone + 'static,
Spawner: SpawnNamed, Spawner: SpawnNamed,
{ {
slot_based::run::<Block, <AuraId as AppCrypto>::Pair, _, _, _, _, _, _, _, _, _>(params); slot_based::run::<Block, <AuraId as AppCrypto>::Pair, _, _, _, _, _, _, _, _, _>(
params_with_export,
);
} }
} }
...@@ -319,7 +321,7 @@ where ...@@ -319,7 +321,7 @@ where
_overseer_handle: OverseerHandle, _overseer_handle: OverseerHandle,
announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>, announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
backend: Arc<ParachainBackend<Block>>, backend: Arc<ParachainBackend<Block>>,
_node_extra_args: NodeExtraArgs, node_extra_args: NodeExtraArgs,
block_import_handle: SlotBasedBlockImportHandle<Block>, block_import_handle: SlotBasedBlockImportHandle<Block>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
...@@ -358,10 +360,12 @@ where ...@@ -358,10 +360,12 @@ where
slot_drift: Duration::from_secs(1), slot_drift: Duration::from_secs(1),
block_import_handle, block_import_handle,
spawner: task_manager.spawn_handle(), spawner: task_manager.spawn_handle(),
export_pov: node_extra_args.export_pov,
}; };
// We have a separate function only to be able to use `docify::export` on this piece of // We have a separate function only to be able to use `docify::export` on this piece of
// code. // code.
Self::launch_slot_based_collator(params); Self::launch_slot_based_collator(params);
Ok(()) Ok(())
......
...@@ -506,6 +506,7 @@ where ...@@ -506,6 +506,7 @@ where
slot_drift: Duration::from_secs(1), slot_drift: Duration::from_secs(1),
block_import_handle: slot_based_handle, block_import_handle: slot_based_handle,
spawner: task_manager.spawn_handle(), spawner: task_manager.spawn_handle(),
export_pov: None,
}; };
slot_based::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _, _>(params); slot_based::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _, _>(params);
......
title: 'Add export PoV on slot base collator'
doc:
- audience: [Node Dev, Node Operator]
description: Add functionality to export the Proof of Validity (PoV) when the slot-based collator is used.
crates:
- name: cumulus-test-service
bump: major
- name: cumulus-client-consensus-aura
bump: major
- name: polkadot-omni-node-lib
bump: major
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment