From 99164122a21c14f55dcf8cb96cff27fcace8b3a7 Mon Sep 17 00:00:00 2001
From: Sebastian Kunert <skunert49@gmail.com>
Date: Fri, 21 Mar 2025 10:53:24 +0100
Subject: [PATCH] [stable-2503] Backport #7569 and #7585 (#7883)

This PR backports #7569 and #7585.

---------

Co-authored-by: Mrisho Lukamba <69342343+MrishoLukamba@users.noreply.github.com>
---
 .github/workflows/build-publish-images.yml    |  63 ++++
 .../zombienet-reusable-preflight.yml          |   2 +-
 .github/workflows/zombienet_cumulus.yml       |  54 ++++
 .github/zombienet-env                         |   2 +-
 Cargo.toml                                    |   3 +
 cumulus/client/consensus/aura/Cargo.toml      |   3 +-
 .../consensus/aura/src/collators/lookahead.rs |  51 +---
 .../slot_based/block_builder_task.rs          | 105 ++-----
 .../collators/slot_based/collation_task.rs    |  30 +-
 .../aura/src/collators/slot_based/mod.rs      | 115 +++++---
 .../src/collators/slot_based/slot_timer.rs    | 269 +++++++++++++++++
 cumulus/client/consensus/aura/src/lib.rs      |  42 ++-
 cumulus/pallets/aura-ext/Cargo.toml           |   1 +
 .../pallets/aura-ext/src/consensus_hook.rs    |  36 ++-
 cumulus/pallets/aura-ext/src/test.rs          | 169 ++++++-----
 cumulus/polkadot-omni-node/lib/src/cli.rs     |  44 ++-
 cumulus/polkadot-omni-node/lib/src/command.rs |   8 +
 .../polkadot-omni-node/lib/src/common/mod.rs  |   6 +-
 .../polkadot-omni-node/lib/src/nodes/aura.rs  |  18 +-
 cumulus/test/runtime/Cargo.toml               |   5 +
 cumulus/test/runtime/build.rs                 |   8 +
 cumulus/test/runtime/src/lib.rs               |  40 ++-
 cumulus/test/service/src/chain_spec.rs        |   9 +
 cumulus/test/service/src/cli.rs               |  44 ++-
 cumulus/test/service/src/lib.rs               |   4 +-
 cumulus/test/service/src/main.rs              |  20 +-
 .../tests/0008-elastic_authoring.toml         |  52 ++--
 .../tests/0009-elastic_pov_recovery.toml      |  83 +++---
 .../zombienet-sdk-helpers/Cargo.toml          |  20 ++
 .../zombienet-sdk-helpers/src/lib.rs          | 279 ++++++++++++++++++
 cumulus/zombienet/zombienet-sdk/Cargo.toml    |  25 ++
 cumulus/zombienet/zombienet-sdk/README.md     |  19 ++
 cumulus/zombienet/zombienet-sdk/src/lib.rs    |   2 +
 ...lastic_scaling_multiple_blocks_per_slot.rs | 137 +++++++++
 .../tests/elastic_scaling/mod.rs              |   4 +
 cumulus/zombienet/zombienet-sdk/tests/lib.rs  |   5 +
 .../src/guides/enable_elastic_scaling_mvp.rs  |   4 +-
 polkadot/zombienet-sdk-tests/Cargo.toml       |   1 +
 .../elastic_scaling/slot_based_12cores.rs     |  32 +-
 .../elastic_scaling/slot_based_3cores.rs      |  61 +---
 .../0018-shared-core-idle-parachain.toml      |  34 +--
 ...-coretime-collation-fetching-fairness.toml |  52 ++--
 prdoc/pr_7569.prdoc                           |  25 ++
 prdoc/pr_7585.prdoc                           |  11 +
 44 files changed, 1531 insertions(+), 466 deletions(-)
 create mode 100644 cumulus/client/consensus/aura/src/collators/slot_based/slot_timer.rs
 create mode 100644 cumulus/zombienet/zombienet-sdk-helpers/Cargo.toml
 create mode 100644 cumulus/zombienet/zombienet-sdk-helpers/src/lib.rs
 create mode 100644 cumulus/zombienet/zombienet-sdk/Cargo.toml
 create mode 100644 cumulus/zombienet/zombienet-sdk/README.md
 create mode 100644 cumulus/zombienet/zombienet-sdk/src/lib.rs
 create mode 100644 cumulus/zombienet/zombienet-sdk/tests/elastic_scaling/elastic_scaling_multiple_blocks_per_slot.rs
 create mode 100644 cumulus/zombienet/zombienet-sdk/tests/elastic_scaling/mod.rs
 create mode 100644 cumulus/zombienet/zombienet-sdk/tests/lib.rs
 create mode 100644 prdoc/pr_7569.prdoc
 create mode 100644 prdoc/pr_7585.prdoc

diff --git a/.github/workflows/build-publish-images.yml b/.github/workflows/build-publish-images.yml
index 0ce33e2864a..ee10e9334c9 100644
--- a/.github/workflows/build-publish-images.yml
+++ b/.github/workflows/build-publish-images.yml
@@ -266,6 +266,9 @@ jobs:
           path: artifacts.tar
           retention-days: 1
 
+
+  ### Build zombienet test artifacts ########################
+
   #
   #
   #
@@ -296,6 +299,66 @@ jobs:
           path: artifacts.tar
           retention-days: 1
 
+  #
+  #
+  #
+  prepare-polkadot-zombienet-artifacts:
+    needs: [preflight]
+    runs-on: ${{ needs.preflight.outputs.RUNNER }}
+    timeout-minutes: 60
+    container:
+      image: ${{ needs.preflight.outputs.IMAGE }}
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v4
+      - name: build
+        run: |
+          forklift cargo nextest --manifest-path polkadot/zombienet-sdk-tests/Cargo.toml archive --locked --features zombie-metadata --archive-file polkadot-zombienet-tests.tar.zst
+      - name: pack artifacts
+        run: |
+          mkdir -p artifacts
+          cp polkadot-zombienet-tests.tar.zst ./artifacts
+
+      - name: tar
+        run: tar -cvf artifacts.tar artifacts
+
+      - name: upload artifacts
+        uses: actions/upload-artifact@v4
+        with:
+          name: ${{ github.job }}-${{ needs.preflight.outputs.SOURCE_REF_SLUG }}
+          path: artifacts.tar
+          retention-days: 1
+
+  #
+  #
+  #
+  prepare-cumulus-zombienet-artifacts:
+    needs: [preflight]
+    runs-on: ${{ needs.preflight.outputs.RUNNER }}
+    timeout-minutes: 60
+    container:
+      image: ${{ needs.preflight.outputs.IMAGE }}
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v4
+      - name: build
+        run: |
+          forklift cargo nextest --manifest-path cumulus/zombienet/zombienet-sdk/Cargo.toml archive --locked --features zombie-ci --archive-file cumulus-zombienet-tests.tar.zst
+      - name: pack artifacts
+        run: |
+          mkdir -p artifacts
+          cp cumulus-zombienet-tests.tar.zst ./artifacts
+
+      - name: tar
+        run: tar -cvf artifacts.tar artifacts
+
+      - name: upload artifacts
+        uses: actions/upload-artifact@v4
+        with:
+          name: ${{ github.job }}-${{ needs.preflight.outputs.SOURCE_REF_SLUG }}
+          path: artifacts.tar
+          retention-days: 1
+
   ### Publish ########################
 
   #
diff --git a/.github/workflows/zombienet-reusable-preflight.yml b/.github/workflows/zombienet-reusable-preflight.yml
index decbe11a460..114aa99e3c1 100644
--- a/.github/workflows/zombienet-reusable-preflight.yml
+++ b/.github/workflows/zombienet-reusable-preflight.yml
@@ -220,7 +220,7 @@ jobs:
   wait_build_images:
     needs: [ci-env]
     runs-on: ubuntu-latest
-    timeout-minutes: 30
+    timeout-minutes: 60
     outputs:
       BUILD_RUN_ID: ${{ steps.wait_build.outputs.BUILD_RUN_ID }}
     steps:
diff --git a/.github/workflows/zombienet_cumulus.yml b/.github/workflows/zombienet_cumulus.yml
index 492c63b302a..9ac076ef14e 100644
--- a/.github/workflows/zombienet_cumulus.yml
+++ b/.github/workflows/zombienet_cumulus.yml
@@ -315,3 +315,57 @@ jobs:
           name: zombienet-logs-${{ github.job }}-${{ github.sha }}
           path: |
             /tmp/zombie*/logs/*
+
+  zombienet-cumulus-0010-elastic_scaling_multiple_block_per_slot:
+    needs: [preflight]
+    if: ${{ needs.preflight.outputs.changes_substrate || needs.preflight.outputs.changes_cumulus || needs.preflight.outputs.changes_polkadot }}
+    runs-on: ${{ needs.preflight.outputs.ZOMBIENET_RUNNER }} # NOTE: should be zombienet-arc-runner (without quotes)
+    timeout-minutes: 60
+    container:
+      image: ${{ needs.preflight.outputs.ZOMBIENET_IMAGE }}
+    env:
+      # sdk tests are looking for POLKADOT_IMAGE
+      POLKADOT_IMAGE: "${{ needs.preflight.outputs.TEMP_IMAGES_BASE }}/polkadot-debug:${{ needs.preflight.outputs.DOCKER_IMAGES_VERSION }}"
+      CUMULUS_IMAGE: "${{ needs.preflight.outputs.TEMP_IMAGES_BASE }}/test-parachain:${{ needs.preflight.outputs.DOCKER_IMAGES_VERSION }}"
+      RUST_LOG: ${{ needs.preflight.outputs.RUST_LOG }}
+      ZOMBIE_PROVIDER: ${{ needs.preflight.outputs.ZOMBIE_PROVIDER }}
+      # don't retry sdk tests
+      NEXTEST_RETRIES: 0
+
+    steps:
+      - name: k8s_auth
+        shell: bash
+        run: |
+          . /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
+          k8s_auth
+
+      - name: Checkout
+        uses: actions/checkout@v4
+
+      - uses: actions/download-artifact@v4.1.8
+        with:
+          name: prepare-cumulus-zombienet-artifacts-${{ needs.preflight.outputs.SOURCE_REF_SLUG }}
+          github-token: ${{ secrets.GITHUB_TOKEN }}
+          run-id: ${{ needs.preflight.outputs.BUILD_RUN_ID }}
+
+      - name: tar
+        run: tar -xvf artifacts.tar
+
+      - name: script
+        run: |
+          echo "POLKADOT_IMAGE: $POLKADOT_IMAGE"
+          echo "CUMULUS_IMAGE: $CUMULUS_IMAGE"
+          ls -ltr ./artifacts
+          # use spot by default
+          export X_INFRA_INSTANCE=spot
+          # we want to use `--no-capture` in zombienet tests.
+          unset NEXTEST_FAILURE_OUTPUT
+          unset NEXTEST_SUCCESS_OUTPUT
+          cargo nextest run --archive-file ./artifacts/cumulus-zombienet-tests.tar.zst --no-capture -- elastic_scaling::elastic_scaling_multiple_blocks_per_slot::elastic_scaling_multiple_block_per_slot
+
+      - name: upload logs
+        uses: actions/upload-artifact@v4
+        with:
+          name: zombienet-logs-${{ github.job }}-${{ github.sha }}
+          path: |
+            /tmp/zombie*/logs/*
diff --git a/.github/zombienet-env b/.github/zombienet-env
index 565a91a8d71..7c4737feb21 100644
--- a/.github/zombienet-env
+++ b/.github/zombienet-env
@@ -1,4 +1,4 @@
-ZOMBIENET_IMAGE=docker.io/paritytech/zombienet:v1.3.119
+ZOMBIENET_IMAGE=docker.io/paritytech/zombienet:v1.3.126
 ZOMBIENET_RUNNER=zombienet-arc-runner
 PUSHGATEWAY_URL=http://zombienet-prometheus-pushgateway.managed-monitoring:9091/metrics/job/zombie-metrics
 DEBUG=zombie,zombie::network-node,zombie::kube::client::logs
diff --git a/Cargo.toml b/Cargo.toml
index 9ffddc8e059..2189d78e44b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -149,6 +149,8 @@ members = [
 	"cumulus/test/runtime",
 	"cumulus/test/service",
 	"cumulus/xcm/xcm-emulator",
+	"cumulus/zombienet/zombienet-sdk",
+	"cumulus/zombienet/zombienet-sdk-helpers",
 	"docs/sdk",
 	"docs/sdk/packages/guides/first-pallet",
 	"docs/sdk/packages/guides/first-runtime",
@@ -741,6 +743,7 @@ cumulus-test-client = { path = "cumulus/test/client" }
 cumulus-test-relay-sproof-builder = { path = "cumulus/test/relay-sproof-builder", default-features = false, version = "0.18.0-rc1" }
 cumulus-test-runtime = { path = "cumulus/test/runtime", version = "0.1.0" }
 cumulus-test-service = { path = "cumulus/test/service", version = "0.1.0" }
+cumulus-zombienet-sdk-helpers = { path = "cumulus/zombienet/zombienet-sdk-helpers", default-features = false }
 curve25519-dalek = { version = "4.1.3" }
 derive-syn-parse = { version = "0.2.0" }
 derive-where = { version = "1.2.7" }
diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml
index 8ac6be9e0aa..60ba4ec2287 100644
--- a/cumulus/client/consensus/aura/Cargo.toml
+++ b/cumulus/client/consensus/aura/Cargo.toml
@@ -1,7 +1,7 @@
 [package]
 name = "cumulus-client-consensus-aura"
 description = "AURA consensus algorithm for parachains"
-version = "0.22.0-rc1"
+version = "0.7.0"
 authors.workspace = true
 edition.workspace = true
 license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
@@ -90,6 +90,7 @@ polkadot-primitives.default-features = true
 cumulus-test-client = { path = "../../../test/client" }
 cumulus-test-relay-sproof-builder = { path = "../../../test/relay-sproof-builder", default-features = false }
 sp-keyring = { path = "../../../../substrate/primitives/keyring", default-features = false }
+rstest = { workspace = true }
 
 [features]
 # Allows collator to use full PoV size for block building
diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs
index 8427a404199..17c4059b114 100644
--- a/cumulus/client/consensus/aura/src/collators/lookahead.rs
+++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs
@@ -40,14 +40,14 @@ use cumulus_primitives_aura::AuraUnincludedSegmentApi;
 use cumulus_primitives_core::{ClaimQueueOffset, CollectCollationInfo, PersistedValidationData};
 use cumulus_relay_chain_interface::RelayChainInterface;
 
-use polkadot_node_primitives::{PoV, SubmitCollationParams};
+use polkadot_node_primitives::SubmitCollationParams;
 use polkadot_node_subsystem::messages::CollationGenerationMessage;
 use polkadot_overseer::Handle as OverseerHandle;
 use polkadot_primitives::{
-	vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, BlockNumber as RBlockNumber, CollatorPair, Hash as RHash,
-	HeadData, Id as ParaId, OccupiedCoreAssumption,
+	vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, CollatorPair, Id as ParaId, OccupiedCoreAssumption,
 };
 
+use crate::{collator as collator_util, export_pov_to_path};
 use futures::prelude::*;
 use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
 use sc_consensus::BlockImport;
@@ -58,49 +58,8 @@ use sp_consensus_aura::{AuraApi, Slot};
 use sp_core::crypto::Pair;
 use sp_inherents::CreateInherentDataProviders;
 use sp_keystore::KeystorePtr;
-use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, NumberFor};
-use std::{
-	fs::{self, File},
-	path::PathBuf,
-	sync::Arc,
-	time::Duration,
-};
-
-use crate::{collator as collator_util, LOG_TARGET};
-
-/// Export the given `pov` to the file system at `path`.
-///
-/// The file will be named `block_hash_block_number.pov`.
-///
-/// The `parent_header`, `relay_parent_storage_root` and `relay_parent_number` will also be
-/// stored in the file alongside the `pov`. This enables stateless validation of the `pov`.
-fn export_pov_to_path<Block: BlockT>(
-	path: PathBuf,
-	pov: PoV,
-	block_hash: Block::Hash,
-	block_number: NumberFor<Block>,
-	parent_header: Block::Header,
-	relay_parent_storage_root: RHash,
-	relay_parent_number: RBlockNumber,
-) {
-	if let Err(error) = fs::create_dir_all(&path) {
-		tracing::error!(target: LOG_TARGET, %error, path = %path.display(), "Failed to create PoV export directory");
-		return
-	}
-
-	let mut file = match File::create(path.join(format!("{block_hash:?}_{block_number}.pov"))) {
-		Ok(f) => f,
-		Err(error) => {
-			tracing::error!(target: LOG_TARGET, %error, "Failed to export PoV.");
-			return
-		},
-	};
-
-	pov.encode_to(&mut file);
-	HeadData(parent_header.encode()).encode_to(&mut file);
-	relay_parent_storage_root.encode_to(&mut file);
-	relay_parent_number.encode_to(&mut file);
-}
+use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
+use std::{path::PathBuf, sync::Arc, time::Duration};
 
 /// Parameters for [`run`].
 pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs
index 9754f98da4c..fa6c8f21c6b 100644
--- a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs
+++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs
@@ -26,20 +26,6 @@ use cumulus_relay_chain_interface::RelayChainInterface;
 
 use polkadot_primitives::{Block as RelayBlock, Id as ParaId};
 
-use futures::prelude::*;
-use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
-use sc_consensus::BlockImport;
-use sp_api::ProvideRuntimeApi;
-use sp_application_crypto::AppPublic;
-use sp_blockchain::HeaderBackend;
-use sp_consensus_aura::{AuraApi, Slot};
-use sp_core::crypto::Pair;
-use sp_inherents::CreateInherentDataProviders;
-use sp_keystore::KeystorePtr;
-use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
-use sp_timestamp::Timestamp;
-use std::{sync::Arc, time::Duration};
-
 use super::CollatorMessage;
 use crate::{
 	collator::{self as collator_util},
@@ -48,10 +34,23 @@ use crate::{
 		slot_based::{
 			core_selector,
 			relay_chain_data_cache::{RelayChainData, RelayChainDataCache},
+			slot_timer::SlotTimer,
 		},
 	},
 	LOG_TARGET,
 };
+use futures::prelude::*;
+use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
+use sc_consensus::BlockImport;
+use sp_api::ProvideRuntimeApi;
+use sp_application_crypto::AppPublic;
+use sp_blockchain::HeaderBackend;
+use sp_consensus_aura::AuraApi;
+use sp_core::crypto::Pair;
+use sp_inherents::CreateInherentDataProviders;
+use sp_keystore::KeystorePtr;
+use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
+use std::{sync::Arc, time::Duration};
 
 /// Parameters for [`run_block_builder`].
 pub struct BuilderTaskParams<
@@ -91,71 +90,16 @@ pub struct BuilderTaskParams<
 	pub authoring_duration: Duration,
 	/// Channel to send built blocks to the collation task.
 	pub collator_sender: sc_utils::mpsc::TracingUnboundedSender<CollatorMessage<Block>>,
-	/// Drift every slot by this duration.
+	/// Slot duration of the relay chain.
+	pub relay_chain_slot_duration: Duration,
+	/// Offset all time operations by this duration.
+	///
 	/// This is a time quantity that is subtracted from the actual timestamp when computing
 	/// the time left to enter a new slot. In practice, this *left-shifts* the clock time with the
 	/// intent to keep our "clock" slightly behind the relay chain one and thus reducing the
 	/// likelihood of encountering unfavorable notification arrival timings (i.e. we don't want to
 	/// wait for relay chain notifications because we woke up too early).
-	pub slot_drift: Duration,
-}
-
-#[derive(Debug)]
-struct SlotInfo {
-	pub timestamp: Timestamp,
-	pub slot: Slot,
-}
-
-#[derive(Debug)]
-struct SlotTimer<Block, Client, P> {
-	client: Arc<Client>,
-	drift: Duration,
-	_marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
-}
-
-/// Returns current duration since Unix epoch.
-fn duration_now() -> Duration {
-	use std::time::SystemTime;
-	let now = SystemTime::now();
-	now.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_else(|e| {
-		panic!("Current time {:?} is before Unix epoch. Something is wrong: {:?}", now, e)
-	})
-}
-
-/// Returns the duration until the next slot from now.
-fn time_until_next_slot(slot_duration: Duration, drift: Duration) -> Duration {
-	let now = duration_now().as_millis() - drift.as_millis();
-
-	let next_slot = (now + slot_duration.as_millis()) / slot_duration.as_millis();
-	let remaining_millis = next_slot * slot_duration.as_millis() - now;
-	Duration::from_millis(remaining_millis as u64)
-}
-
-impl<Block, Client, P> SlotTimer<Block, Client, P>
-where
-	Block: BlockT,
-	Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + UsageProvider<Block>,
-	Client::Api: AuraApi<Block, P::Public>,
-	P: Pair,
-	P::Public: AppPublic + Member + Codec,
-	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
-{
-	pub fn new_with_drift(client: Arc<Client>, drift: Duration) -> Self {
-		Self { client, drift, _marker: Default::default() }
-	}
-
-	/// Returns a future that resolves when the next slot arrives.
-	pub async fn wait_until_next_slot(&self) -> Result<SlotInfo, ()> {
-		let Ok(slot_duration) = crate::slot_duration(&*self.client) else {
-			tracing::error!(target: crate::LOG_TARGET, "Failed to fetch slot duration from runtime.");
-			return Err(())
-		};
-
-		let time_until_next_slot = time_until_next_slot(slot_duration.as_duration(), self.drift);
-		tokio::time::sleep(time_until_next_slot).await;
-		let timestamp = sp_timestamp::Timestamp::current();
-		Ok(SlotInfo { slot: Slot::from_timestamp(timestamp, slot_duration), timestamp })
-	}
+	pub slot_offset: Duration,
 }
 
 /// Run block-builder.
@@ -201,11 +145,16 @@ where
 			collator_sender,
 			code_hash_provider,
 			authoring_duration,
+			relay_chain_slot_duration,
 			para_backend,
-			slot_drift,
+			slot_offset,
 		} = params;
 
-		let slot_timer = SlotTimer::<_, _, P>::new_with_drift(para_client.clone(), slot_drift);
+		let mut slot_timer = SlotTimer::<_, _, P>::new_with_offset(
+			para_client.clone(),
+			slot_offset,
+			relay_chain_slot_duration,
+		);
 
 		let mut collator = {
 			let params = collator_util::Params {
@@ -225,7 +174,7 @@ where
 
 		loop {
 			// We wait here until the next slot arrives.
-			let Ok(para_slot) = slot_timer.wait_until_next_slot().await else {
+			let Some(para_slot) = slot_timer.wait_until_next_slot().await else {
 				return;
 			};
 
@@ -281,6 +230,8 @@ where
 				);
 			}
 
+			slot_timer.update_scheduling(scheduled_cores.len() as u32);
+
 			let core_selector = core_selector.0 as usize % scheduled_cores.len();
 			let Some(core_index) = scheduled_cores.get(core_selector) else {
 				// This cannot really happen, as we modulo the core selector with the
diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs
index eb48494cf6d..d92cfb044b8 100644
--- a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs
+++ b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs
@@ -16,6 +16,7 @@
 // along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
 
 use codec::Encode;
+use std::path::PathBuf;
 
 use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
 use cumulus_relay_chain_interface::RelayChainInterface;
@@ -25,8 +26,10 @@ use polkadot_node_subsystem::messages::CollationGenerationMessage;
 use polkadot_overseer::Handle as OverseerHandle;
 use polkadot_primitives::{CollatorPair, Id as ParaId};
 
+use cumulus_primitives_core::relay_chain::BlockId;
 use futures::prelude::*;
 
+use crate::export_pov_to_path;
 use sc_utils::mpsc::TracingUnboundedReceiver;
 use sp_runtime::traits::{Block as BlockT, Header};
 
@@ -50,6 +53,8 @@ pub struct Params<Block: BlockT, RClient, CS> {
 	pub collator_receiver: TracingUnboundedReceiver<CollatorMessage<Block>>,
 	/// The handle from the special slot based block import.
 	pub block_import_handle: super::SlotBasedBlockImportHandle<Block>,
+	/// When set, the collator will export every produced `POV` to this folder.
+	pub export_pov: Option<PathBuf>,
 }
 
 /// Asynchronously executes the collation task for a parachain.
@@ -67,6 +72,7 @@ pub async fn run_collation_task<Block, RClient, CS>(
 		collator_service,
 		mut collator_receiver,
 		mut block_import_handle,
+		export_pov,
 	}: Params<Block, RClient, CS>,
 ) where
 	Block: BlockT,
@@ -93,7 +99,7 @@ pub async fn run_collation_task<Block, RClient, CS>(
 					return;
 				};
 
-				handle_collation_message(message, &collator_service, &mut overseer_handle).await;
+				handle_collation_message(message, &collator_service, &mut overseer_handle,relay_client.clone(),export_pov.clone()).await;
 			},
 			block_import_msg = block_import_handle.next().fuse() => {
 				// TODO: Implement me.
@@ -107,10 +113,12 @@ pub async fn run_collation_task<Block, RClient, CS>(
 /// Handle an incoming collation message from the block builder task.
 /// This builds the collation from the [`CollatorMessage`] and submits it to
 /// the collation-generation subsystem of the relay chain.
-async fn handle_collation_message<Block: BlockT>(
+async fn handle_collation_message<Block: BlockT, RClient: RelayChainInterface + Clone + 'static>(
 	message: CollatorMessage<Block>,
 	collator_service: &impl CollatorServiceInterface<Block>,
 	overseer_handle: &mut OverseerHandle,
+	relay_client: RClient,
+	export_pov: Option<PathBuf>,
 ) {
 	let CollatorMessage {
 		parent_header,
@@ -140,6 +148,24 @@ async fn handle_collation_message<Block: BlockT>(
 	);
 
 	if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
+		if let Some(pov_path) = export_pov {
+			if let Ok(Some(relay_parent_header)) =
+				relay_client.header(BlockId::Hash(relay_parent)).await
+			{
+				export_pov_to_path::<Block>(
+					pov_path.clone(),
+					pov.clone(),
+					block_data.header().hash(),
+					*block_data.header().number(),
+					parent_header.clone(),
+					relay_parent_header.state_root,
+					relay_parent_header.number,
+				);
+			} else {
+				tracing::error!(target: LOG_TARGET, "Failed to get relay parent header from hash: {relay_parent:?}");
+			}
+		}
+
 		tracing::info!(
 			target: LOG_TARGET,
 			"Compressed PoV size: {}kb",
diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs
index f72960fe4c2..f845f46fd96 100644
--- a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs
+++ b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs
@@ -15,19 +15,56 @@
 // You should have received a copy of the GNU General Public License
 // along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
 
-//! A collator for Aura that looks ahead of the most recently included parachain block
-//! when determining what to build upon.
+//! # Architecture Overview
 //!
-//! The block building mechanism consists of two parts:
-//! 	1. A block-builder task that builds parachain blocks at each of our slots.
-//! 	2. A collator task that transforms the blocks into a collation and submits them to the relay
-//!     chain.
+//! The block building mechanism operates through two coordinated tasks:
 //!
-//! Blocks are built on every parachain slot if there is a core scheduled on the relay chain. At the
-//! beginning of each block building loop, we determine how many blocks we expect to build per relay
-//! chain block. The collator implementation then expects that we have that many cores scheduled
-//! during the relay chain block. After the block is built, the block builder task sends it to
-//! the collation task which compresses it and submits it to the collation-generation subsystem.
+//! 1. **Block Builder Task**: Orchestrates the timing and execution of parachain block production
+//! 2. **Collator Task**: Processes built blocks into collations for relay chain submission
+//!
+//! # Block Builder Task Details
+//!
+//! The block builder task manages block production timing and execution through an iterative
+//! process:
+//!
+//! 1. Awaits the next production signal from the internal timer
+//! 2. Retrieves the current best relay chain block and identifies a valid parent block (see
+//!    [find_potential_parents][cumulus_client_consensus_common::find_potential_parents] for parent
+//!    selection criteria)
+//! 3. Validates that:
+//!    - The parachain has an assigned core on the relay chain
+//!    - No block has been previously built on the target core
+//! 4. Executes block building and import operations
+//! 5. Transmits the completed block to the collator task
+//!
+//! # Block Production Timing
+//!
+//! When a block is produced is determined by the following parameters:
+//!
+//! - Parachain slot duration
+//! - Number of assigned parachain cores
+//! - Parachain runtime configuration
+//!
+//! ## Timing Examples
+//!
+//! The following table demonstrates various timing configurations and their effects. The "AURA
+//! Slot" column shows which author is responsible for the block.
+//!
+//! | Slot Duration (ms) | Cores | Production Attempts (ms) | AURA Slot  |
+//! |-------------------|--------|-------------------------|------------|
+//! | 2000              | 3      | 0, 2000, 4000, 6000    | 0, 1, 2, 3 |
+//! | 6000              | 1      | 0, 6000, 12000, 18000  | 0, 1, 2, 3 |
+//! | 6000              | 3      | 0, 2000, 4000, 6000    | 0, 0, 0, 1 |
+//! | 12000             | 1      | 0, 6000, 12000, 18000  | 0, 0, 1, 1 |
+//! | 12000             | 3      | 0, 2000, 4000, 6000    | 0, 0, 0, 0 |
+//!
+//! # Collator Task Details
+//!
+//! The collator task receives built blocks from the block builder task and performs two primary
+//! functions:
+//!
+//! 1. Block compression
+//! 2. Submission to the collation-generation subsystem
 
 use self::{block_builder_task::run_block_builder, collation_task::run_collation_task};
 use codec::Codec;
@@ -54,7 +91,7 @@ use sp_core::{crypto::Pair, traits::SpawnNamed, U256};
 use sp_inherents::CreateInherentDataProviders;
 use sp_keystore::KeystorePtr;
 use sp_runtime::traits::{Block as BlockT, Member, NumberFor, One};
-use std::{sync::Arc, time::Duration};
+use std::{path::PathBuf, sync::Arc, time::Duration};
 
 pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle};
 
@@ -63,6 +100,8 @@ mod block_import;
 mod collation_task;
 mod relay_chain_data_cache;
 
+mod slot_timer;
+
 /// Parameters for [`run`].
 pub struct Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> {
 	/// Inherent data providers. Only non-consensus inherent data should be provided, i.e.
@@ -93,35 +132,22 @@ pub struct Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS,
 	pub authoring_duration: Duration,
 	/// Whether we should reinitialize the collator config (i.e. we are transitioning to aura).
 	pub reinitialize: bool,
-	/// Drift slots by a fixed duration. This can be used to create more preferrable authoring
+	/// Offset slots by a fixed duration. This can be used to create more preferrable authoring
 	/// timings.
-	pub slot_drift: Duration,
+	pub slot_offset: Duration,
 	/// The handle returned by [`SlotBasedBlockImport`].
 	pub block_import_handle: SlotBasedBlockImportHandle<Block>,
 	/// Spawner for spawning futures.
 	pub spawner: Spawner,
+	/// Slot duration of the relay chain
+	pub relay_chain_slot_duration: Duration,
+	/// When set, the collator will export every produced `POV` to this folder.
+	pub export_pov: Option<PathBuf>,
 }
 
 /// Run aura-based block building and collation task.
 pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>(
-	Params {
-		create_inherent_data_providers,
-		block_import,
-		para_client,
-		para_backend,
-		relay_client,
-		code_hash_provider,
-		keystore,
-		collator_key,
-		para_id,
-		proposer,
-		collator_service,
-		authoring_duration,
-		reinitialize,
-		slot_drift,
-		block_import_handle,
-		spawner,
-	}: Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,
+	params: Params<Block, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,
 ) where
 	Block: BlockT,
 	Client: ProvideRuntimeApi<Block>
@@ -148,6 +174,27 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw
 	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
 	Spawner: SpawnNamed,
 {
+	let Params {
+		create_inherent_data_providers,
+		block_import,
+		para_client,
+		para_backend,
+		relay_client,
+		code_hash_provider,
+		keystore,
+		collator_key,
+		para_id,
+		proposer,
+		collator_service,
+		authoring_duration,
+		reinitialize,
+		slot_offset,
+		block_import_handle,
+		spawner,
+		export_pov,
+		relay_chain_slot_duration,
+	} = params;
+
 	let (tx, rx) = tracing_unbounded("mpsc_builder_to_collator", 100);
 	let collator_task_params = collation_task::Params {
 		relay_client: relay_client.clone(),
@@ -157,6 +204,7 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw
 		collator_service: collator_service.clone(),
 		collator_receiver: rx,
 		block_import_handle,
+		export_pov,
 	};
 
 	let collation_task_fut = run_collation_task::<Block, _, _>(collator_task_params);
@@ -174,7 +222,8 @@ pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spaw
 		collator_service,
 		authoring_duration,
 		collator_sender: tx,
-		slot_drift,
+		relay_chain_slot_duration,
+		slot_offset,
 	};
 
 	let block_builder_fut =
diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/slot_timer.rs b/cumulus/client/consensus/aura/src/collators/slot_based/slot_timer.rs
new file mode 100644
index 00000000000..fb76089cdb0
--- /dev/null
+++ b/cumulus/client/consensus/aura/src/collators/slot_based/slot_timer.rs
@@ -0,0 +1,269 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Cumulus.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// Cumulus is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Cumulus is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
+
+use crate::LOG_TARGET;
+use codec::Codec;
+use cumulus_primitives_aura::Slot;
+use cumulus_primitives_core::BlockT;
+use sc_client_api::UsageProvider;
+use sc_consensus_aura::SlotDuration;
+use sp_api::ProvideRuntimeApi;
+use sp_application_crypto::AppPublic;
+use sp_consensus_aura::AuraApi;
+use sp_core::Pair;
+use sp_runtime::traits::Member;
+use sp_timestamp::Timestamp;
+use std::{
+	cmp::{max, min},
+	sync::Arc,
+	time::Duration,
+};
+
+/// Lower limits of allowed block production interval.
+/// Defensive mechanism, corresponds to 12 cores at 6 second block time.
+const BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS: Duration = Duration::from_millis(500);
+
+#[derive(Debug)]
+pub(crate) struct SlotInfo {
+	pub timestamp: Timestamp,
+	pub slot: Slot,
+}
+
+/// Manages block-production timings based on chain parameters and assigned cores.
+#[derive(Debug)]
+pub(crate) struct SlotTimer<Block, Client, P> {
+	/// Client that is used for runtime calls
+	client: Arc<Client>,
+	/// Offset the current time by this duration.
+	time_offset: Duration,
+	/// Last reported core count.
+	last_reported_core_num: Option<u32>,
+	/// Slot duration of the relay chain. This is used to compute how man block-production
+	/// attempts we should trigger per relay chain block.
+	relay_slot_duration: Duration,
+	_marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
+}
+
+/// Compute when to try block-authoring next.
+/// The exact time point is determined by the slot duration of relay- and parachain as
+/// well as the last observed core count. If more cores are available, we attempt to author blocks
+/// for them.
+///
+/// Returns a tuple with:
+/// - `Duration`: How long to wait until the next slot.
+/// - `Timestamp`: The timestamp to pass to the inherent
+/// - `Slot`: The AURA slot used for authoring
+fn compute_next_wake_up_time(
+	para_slot_duration: SlotDuration,
+	relay_slot_duration: Duration,
+	core_count: Option<u32>,
+	time_now: Duration,
+	time_offset: Duration,
+) -> (Duration, Timestamp, Slot) {
+	let para_slots_per_relay_block =
+		(relay_slot_duration.as_millis() / para_slot_duration.as_millis() as u128) as u32;
+	let assigned_core_num = core_count.unwrap_or(1);
+
+	// Trigger at least once per relay block, if we have for example 12 second slot duration,
+	// we should still produce two blocks if we are scheduled on every relay block.
+	let mut block_production_interval = min(para_slot_duration.as_duration(), relay_slot_duration);
+
+	if assigned_core_num > para_slots_per_relay_block &&
+		para_slot_duration.as_duration() >= relay_slot_duration
+	{
+		block_production_interval =
+			max(relay_slot_duration / assigned_core_num, BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS);
+		tracing::debug!(
+			target: LOG_TARGET,
+			?block_production_interval,
+			"Expected to produce for {assigned_core_num} cores but only have {para_slots_per_relay_block} slots. Attempting to produce multiple blocks per slot."
+		);
+	}
+
+	let (duration, timestamp) =
+		time_until_next_attempt(time_now, block_production_interval, time_offset);
+	let aura_slot = Slot::from_timestamp(timestamp, para_slot_duration);
+	(duration, timestamp, aura_slot)
+}
+
+/// Returns current duration since Unix epoch.
+fn duration_now() -> Duration {
+	use std::time::SystemTime;
+	let now = SystemTime::now();
+	now.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_else(|e| {
+		panic!("Current time {:?} is before Unix epoch. Something is wrong: {:?}", now, e)
+	})
+}
+
+/// Returns the duration until the next block production should be attempted.
+/// Returns:
+/// - Duration: The duration until the next attempt.
+/// - Timestamp: The time at which the attempt will take place.
+fn time_until_next_attempt(
+	now: Duration,
+	block_production_interval: Duration,
+	offset: Duration,
+) -> (Duration, Timestamp) {
+	let now = now.as_millis().saturating_sub(offset.as_millis());
+
+	let next_slot_time = ((now + block_production_interval.as_millis()) /
+		block_production_interval.as_millis()) *
+		block_production_interval.as_millis();
+	let remaining_millis = next_slot_time - now;
+	(Duration::from_millis(remaining_millis as u64), Timestamp::from(next_slot_time as u64))
+}
+
+impl<Block, Client, P> SlotTimer<Block, Client, P>
+where
+	Block: BlockT,
+	Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + UsageProvider<Block>,
+	Client::Api: AuraApi<Block, P::Public>,
+	P: Pair,
+	P::Public: AppPublic + Member + Codec,
+	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
+{
+	/// Create a new slot timer.
+	pub fn new_with_offset(
+		client: Arc<Client>,
+		time_offset: Duration,
+		relay_slot_duration: Duration,
+	) -> Self {
+		Self {
+			client,
+			time_offset,
+			last_reported_core_num: None,
+			relay_slot_duration,
+			_marker: Default::default(),
+		}
+	}
+
+	/// Inform the slot timer about the last seen number of cores.
+	pub fn update_scheduling(&mut self, num_cores_next_block: u32) {
+		self.last_reported_core_num = Some(num_cores_next_block);
+	}
+
+	/// Returns a future that resolves when the next block production should be attempted.
+	pub async fn wait_until_next_slot(&self) -> Option<SlotInfo> {
+		let Ok(slot_duration) = crate::slot_duration(&*self.client) else {
+			tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
+			return None
+		};
+
+		let (time_until_next_attempt, timestamp, aura_slot) = compute_next_wake_up_time(
+			slot_duration,
+			self.relay_slot_duration,
+			self.last_reported_core_num,
+			duration_now(),
+			self.time_offset,
+		);
+
+		tokio::time::sleep(time_until_next_attempt).await;
+
+		tracing::debug!(
+			target: LOG_TARGET,
+			?slot_duration,
+			?timestamp,
+			?aura_slot,
+			"New block production opportunity."
+		);
+		Some(SlotInfo { slot: aura_slot, timestamp })
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use rstest::rstest;
+	use sc_consensus_aura::SlotDuration;
+	const RELAY_CHAIN_SLOT_DURATION: u64 = 6000;
+
+	#[rstest]
+	// Test that different now timestamps have correct impact
+	//                    ||||
+	#[case(6000, Some(1), 1000, 0, 5000, 6000, 1)]
+	#[case(6000, Some(1), 0, 0, 6000, 6000, 1)]
+	#[case(6000, Some(1), 6000, 0, 6000, 12000, 2)]
+	#[case(6000, Some(0), 6000, 0, 6000, 12000, 2)]
+	// Test that `None` core defaults to 1
+	//           ||||
+	#[case(6000, None, 1000, 0, 5000, 6000, 1)]
+	#[case(6000, None, 0, 0, 6000, 6000, 1)]
+	#[case(6000, None, 6000, 0, 6000, 12000, 2)]
+	// Test that offset affects the current time correctly
+	//                          ||||
+	#[case(6000, Some(1), 1000, 1000, 6000, 6000, 1)]
+	#[case(6000, Some(1), 12000, 2000, 2000, 12000, 2)]
+	#[case(6000, Some(1), 12000, 6000, 6000, 12000, 2)]
+	#[case(6000, Some(1), 12000, 7000, 1000, 6000, 1)]
+	// Test that number of cores affects the block production interval
+	//           |||||||
+	#[case(6000, Some(3), 12000, 0, 2000, 14000, 2)]
+	#[case(6000, Some(2), 12000, 0, 3000, 15000, 2)]
+	#[case(6000, Some(3), 11999, 0, 1, 12000, 2)]
+	// High core count
+	//           ||||||||
+	#[case(6000, Some(12), 0, 0, 500, 500, 0)]
+	/// Test that the minimum block interval is respected
+	/// at high core counts.
+	///          |||||||||
+	#[case(6000, Some(100), 0, 0, 500, 500, 0)]
+	// Test that slot_duration works correctly
+	//     ||||
+	#[case(2000, Some(1), 1000, 0, 1000, 2000, 1)]
+	#[case(2000, Some(1), 3000, 0, 1000, 4000, 2)]
+	#[case(2000, Some(1), 10000, 0, 2000, 12000, 6)]
+	#[case(2000, Some(2), 1000, 0, 1000, 2000, 1)]
+	// Cores are ignored if relay_slot_duration != para_slot_duration
+	//           |||||||
+	#[case(2000, Some(3), 3000, 0, 1000, 4000, 2)]
+	// For long slot durations, we should still check
+	// every relay chain block for the slot.
+	//     |||||
+	#[case(12000, None, 0, 0, 6000, 6000, 0)]
+	#[case(12000, None, 6100, 0, 5900, 12000, 1)]
+	#[case(12000, None, 6000, 2000, 2000, 6000, 0)]
+	#[case(12000, Some(2), 6000, 0, 3000, 9000, 0)]
+	#[case(12000, Some(3), 6000, 0, 2000, 8000, 0)]
+	#[case(12000, Some(3), 8100, 0, 1900, 10000, 0)]
+	fn test_get_next_slot(
+		#[case] para_slot_millis: u64,
+		#[case] core_count: Option<u32>,
+		#[case] time_now: u64,
+		#[case] offset_millis: u64,
+		#[case] expected_wait_duration: u128,
+		#[case] expected_timestamp: u64,
+		#[case] expected_slot: u64,
+	) {
+		let para_slot_duration = SlotDuration::from_millis(para_slot_millis); // 6 second slots
+		let relay_slot_duration = Duration::from_millis(RELAY_CHAIN_SLOT_DURATION);
+		let time_now = Duration::from_millis(time_now); // 1 second passed
+		let offset = Duration::from_millis(offset_millis);
+		let expected_slot = Slot::from(expected_slot);
+
+		let (wait_duration, timestamp, aura_slot) = compute_next_wake_up_time(
+			para_slot_duration,
+			relay_slot_duration,
+			core_count,
+			time_now,
+			offset,
+		);
+
+		assert_eq!(wait_duration.as_millis(), expected_wait_duration, "Wait time mismatch."); // Should wait 5 seconds
+		assert_eq!(timestamp.as_millis(), expected_timestamp, "Timestamp mismatch.");
+		assert_eq!(aura_slot, expected_slot, "AURA slot mismatch.");
+	}
+}
diff --git a/cumulus/client/consensus/aura/src/lib.rs b/cumulus/client/consensus/aura/src/lib.rs
index 0e404541ab9..2e9b4b70234 100644
--- a/cumulus/client/consensus/aura/src/lib.rs
+++ b/cumulus/client/consensus/aura/src/lib.rs
@@ -23,13 +23,15 @@
 //!
 //! For more information about AuRa, the Substrate crate should be checked.
 
-use codec::Codec;
+use codec::{Codec, Encode};
 use cumulus_client_consensus_common::{
 	ParachainBlockImportMarker, ParachainCandidate, ParachainConsensus,
 };
 use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData};
 
+use cumulus_primitives_core::relay_chain::HeadData;
 use futures::lock::Mutex;
+use polkadot_primitives::{BlockNumber as RBlockNumber, Hash as RHash};
 use sc_client_api::{backend::AuxStore, BlockOf};
 use sc_consensus::BlockImport;
 use sc_consensus_slots::{BackoffAuthoringBlocksStrategy, SimpleSlotWorker, SlotInfo};
@@ -45,7 +47,10 @@ use sp_keystore::KeystorePtr;
 use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, NumberFor};
 use std::{
 	convert::TryFrom,
+	fs,
+	fs::File,
 	marker::PhantomData,
+	path::PathBuf,
 	sync::{
 		atomic::{AtomicU64, Ordering},
 		Arc,
@@ -55,6 +60,7 @@ use std::{
 mod import_queue;
 
 pub use import_queue::{build_verifier, import_queue, BuildVerifierParams, ImportQueueParams};
+use polkadot_node_primitives::PoV;
 pub use sc_consensus_aura::{
 	slot_duration, standalone::slot_duration_at, AuraVerifier, BuildAuraWorkerParams,
 	SlotProportion,
@@ -252,3 +258,37 @@ where
 		Some(ParachainCandidate { block: res.block, proof: res.storage_proof })
 	}
 }
+
+/// Export the given `pov` to the file system at `path`.
+///
+/// The file will be named `block_hash_block_number.pov`.
+///
+/// The `parent_header`, `relay_parent_storage_root` and `relay_parent_number` will also be
+/// stored in the file alongside the `pov`. This enables stateless validation of the `pov`.
+pub(crate) fn export_pov_to_path<Block: BlockT>(
+	path: PathBuf,
+	pov: PoV,
+	block_hash: Block::Hash,
+	block_number: NumberFor<Block>,
+	parent_header: Block::Header,
+	relay_parent_storage_root: RHash,
+	relay_parent_number: RBlockNumber,
+) {
+	if let Err(error) = fs::create_dir_all(&path) {
+		tracing::error!(target: LOG_TARGET, %error, path = %path.display(), "Failed to create PoV export directory");
+		return
+	}
+
+	let mut file = match File::create(path.join(format!("{block_hash:?}_{block_number}.pov"))) {
+		Ok(f) => f,
+		Err(error) => {
+			tracing::error!(target: LOG_TARGET, %error, "Failed to export PoV.");
+			return
+		},
+	};
+
+	pov.encode_to(&mut file);
+	HeadData(parent_header.encode()).encode_to(&mut file);
+	relay_parent_storage_root.encode_to(&mut file);
+	relay_parent_number.encode_to(&mut file);
+}
diff --git a/cumulus/pallets/aura-ext/Cargo.toml b/cumulus/pallets/aura-ext/Cargo.toml
index 5cf965bd201..58819bdd9a2 100644
--- a/cumulus/pallets/aura-ext/Cargo.toml
+++ b/cumulus/pallets/aura-ext/Cargo.toml
@@ -24,6 +24,7 @@ sp-runtime.workspace = true
 cumulus-pallet-parachain-system.workspace = true
 
 [dev-dependencies]
+rstest = { workspace = true }
 # Cumulus
 cumulus-pallet-parachain-system = { default-features = true, path = "../parachain-system" }
 cumulus-primitives-core = { default-features = true, path = "../../primitives/core" }
diff --git a/cumulus/pallets/aura-ext/src/consensus_hook.rs b/cumulus/pallets/aura-ext/src/consensus_hook.rs
index 52a7a500a1f..1c3e373ef85 100644
--- a/cumulus/pallets/aura-ext/src/consensus_hook.rs
+++ b/cumulus/pallets/aura-ext/src/consensus_hook.rs
@@ -16,8 +16,6 @@
 
 //! The definition of a [`FixedVelocityConsensusHook`] for consensus logic to manage
 //! block velocity.
-//!
-//! The velocity `V` refers to the rate of block processing by the relay chain.
 use super::{pallet, Aura};
 use core::{marker::PhantomData, num::NonZeroU32};
 use cumulus_pallet_parachain_system::{
@@ -28,9 +26,25 @@ use cumulus_pallet_parachain_system::{
 use frame_support::pallet_prelude::*;
 use sp_consensus_aura::{Slot, SlotDuration};
 
-/// A consensus hook for a fixed block processing velocity and unincluded segment capacity.
+/// A consensus hook that enforces fixed block production velocity and unincluded segment capacity.
 ///
-/// Relay chain slot duration must be provided in milliseconds.
+/// It keeps track of relay chain slot information and parachain blocks authored per relay chain
+/// slot.
+///
+/// # Type Parameters
+/// - `T` - The runtime configuration trait
+/// - `RELAY_CHAIN_SLOT_DURATION_MILLIS` - Duration of relay chain slots in milliseconds
+/// - `V` - Maximum number of blocks that can be authored per relay chain parent (velocity)
+/// - `C` - Maximum capacity of unincluded segment
+///
+/// # Example Configuration
+/// ```ignore
+/// type ConsensusHook = FixedVelocityConsensusHook<Runtime, 6000, 2, 8>;
+/// ```
+/// This configures:
+/// - 6 second relay chain slots
+/// - Maximum 2 blocks per slot
+/// - Maximum 8 blocks in unincluded segment
 pub struct FixedVelocityConsensusHook<
 	T,
 	const RELAY_CHAIN_SLOT_DURATION_MILLIS: u32,
@@ -47,7 +61,15 @@ impl<
 where
 	<T as pallet_timestamp::Config>::Moment: Into<u64>,
 {
-	// Validates the number of authored blocks within the slot with respect to the `V + 1` limit.
+	/// Consensus hook that performs validations on the provided relay chain state
+	/// proof:
+	/// - Ensures blocks are not produced faster than the specified velocity `V`
+	/// - Verifies parachain slot alignment with relay chain slot
+	///
+	/// # Panics
+	/// - When the relay chain slot from the state is smaller than the slot from the proof
+	/// - When the number of authored blocks exceeds velocity limit
+	/// - When parachain slot is ahead of the calculated slot from relay chain
 	fn on_state_proof(state_proof: &RelayChainStateProof) -> (Weight, UnincludedSegmentCapacity) {
 		// Ensure velocity is non-zero.
 		let velocity = V.max(1);
@@ -79,9 +101,7 @@ where
 		let para_slot_from_relay =
 			Slot::from_timestamp(relay_chain_timestamp.into(), para_slot_duration);
 
-		// Check that we are not too far in the future. Since we expect `V` parachain blocks
-		// during the relay chain slot, we can allow for `V` parachain slots into the future.
-		if *para_slot > *para_slot_from_relay + u64::from(velocity) {
+		if *para_slot > *para_slot_from_relay {
 			panic!(
 				"Parachain slot is too far in the future: parachain_slot={:?}, derived_from_relay_slot={:?} velocity={:?}, relay_chain_slot={:?}",
 				para_slot,
diff --git a/cumulus/pallets/aura-ext/src/test.rs b/cumulus/pallets/aura-ext/src/test.rs
index 8ac3c74bddd..9c6eeeff7a1 100644
--- a/cumulus/pallets/aura-ext/src/test.rs
+++ b/cumulus/pallets/aura-ext/src/test.rs
@@ -28,10 +28,12 @@ use frame_support::{
 	derive_impl,
 	pallet_prelude::ConstU32,
 	parameter_types,
-	traits::{ConstBool, ConstU64, EnqueueWithOrigin},
+	traits::{ConstBool, EnqueueWithOrigin},
 };
+use sp_core::Get;
 use sp_io::TestExternalities;
 use sp_version::RuntimeVersion;
+use std::cell::RefCell;
 
 type Block = frame_system::mocking::MockBlock<Test>;
 
@@ -67,12 +69,29 @@ impl frame_system::Config for Test {
 
 impl crate::Config for Test {}
 
+std::thread_local! {
+	pub static PARA_SLOT_DURATION: RefCell<u64> = RefCell::new(6000);
+}
+
+pub struct TestSlotDuration;
+
+impl TestSlotDuration {
+	pub fn set_slot_duration(slot_duration: u64) {
+		PARA_SLOT_DURATION.with(|v| *v.borrow_mut() = slot_duration);
+	}
+}
+impl Get<u64> for TestSlotDuration {
+	fn get() -> u64 {
+		PARA_SLOT_DURATION.with(|v| v.clone().into_inner())
+	}
+}
+
 impl pallet_aura::Config for Test {
 	type AuthorityId = sp_consensus_aura::sr25519::AuthorityId;
 	type MaxAuthorities = ConstU32<100_000>;
 	type DisabledValidators = ();
 	type AllowMultipleBlocksPerSlot = ConstBool<true>;
-	type SlotDuration = ConstU64<6000>;
+	type SlotDuration = TestSlotDuration;
 }
 
 impl pallet_timestamp::Config for Test {
@@ -104,6 +123,7 @@ mod test {
 	use cumulus_pallet_parachain_system::{
 		Ancestor, ConsensusHook, RelayChainStateProof, UsedBandwidth,
 	};
+	use rstest::rstest;
 	use sp_core::H256;
 
 	fn set_ancestors() {
@@ -149,9 +169,11 @@ mod test {
 		);
 	}
 
+	const DEFAULT_TEST_VELOCITY: u32 = 2;
+
 	#[test]
 	fn test_velocity() {
-		type Hook = FixedVelocityConsensusHook<Test, 6000, 2, 1>;
+		type Hook = FixedVelocityConsensusHook<Test, 6000, DEFAULT_TEST_VELOCITY, 1>;
 
 		new_test_ext(1).execute_with(|| {
 			let state_proof = relay_chain_state_proof(10);
@@ -165,15 +187,30 @@ mod test {
 		});
 	}
 
+	#[test]
+	fn test_velocity_2() {
+		type Hook = FixedVelocityConsensusHook<Test, 6000, DEFAULT_TEST_VELOCITY, 3>;
+
+		new_test_ext(1).execute_with(|| {
+			let state_proof = relay_chain_state_proof(10);
+			let (_, capacity) = Hook::on_state_proof(&state_proof);
+			assert_eq!(capacity, NonZeroU32::new(3).unwrap().into());
+			assert_slot_info(10, 1);
+
+			let (_, capacity) = Hook::on_state_proof(&state_proof);
+			assert_eq!(capacity, NonZeroU32::new(3).unwrap().into());
+			assert_slot_info(10, 2);
+		});
+	}
+
 	#[test]
 	#[should_panic(expected = "authored blocks limit is reached for the slot")]
 	fn test_exceeding_velocity_limit() {
-		const VELOCITY: u32 = 2;
-		type Hook = FixedVelocityConsensusHook<Test, 6000, VELOCITY, 1>;
+		type Hook = FixedVelocityConsensusHook<Test, 6000, DEFAULT_TEST_VELOCITY, 1>;
 
 		new_test_ext(1).execute_with(|| {
 			let state_proof = relay_chain_state_proof(10);
-			for authored in 0..=VELOCITY + 1 {
+			for authored in 0..=DEFAULT_TEST_VELOCITY + 1 {
 				Hook::on_state_proof(&state_proof);
 				assert_slot_info(10, authored + 1);
 			}
@@ -182,15 +219,44 @@ mod test {
 
 	#[test]
 	fn test_para_slot_calculated_from_slot_duration() {
-		const VELOCITY: u32 = 2;
-		type Hook = FixedVelocityConsensusHook<Test, 3000, VELOCITY, 1>;
+		type Hook = FixedVelocityConsensusHook<Test, 3000, DEFAULT_TEST_VELOCITY, 1>;
 
-		new_test_ext(6).execute_with(|| {
+		new_test_ext(5).execute_with(|| {
 			let state_proof = relay_chain_state_proof(10);
 			Hook::on_state_proof(&state_proof);
+		});
+	}
 
-			let para_slot = Slot::from(7);
-			pallet_aura::CurrentSlot::<Test>::put(para_slot);
+	#[rstest]
+	#[should_panic(
+		expected = "too far in the future: parachain_slot=Slot(31), derived_from_relay_slot=Slot(30)"
+	)]
+	#[case::short_para_slot(2000, 31, 10)]
+	#[should_panic(
+		expected = "too far in the future: parachain_slot=Slot(32), derived_from_relay_slot=Slot(30)"
+	)]
+	#[case::short_para_slot(2000, 32, 10)]
+	#[case::short_para_slot(2000, 30, 10)]
+	#[case::short_para_slot(2000, 33, 11)]
+	#[case::short_para_slot(2000, 29, 10)]
+	#[case::short_para_slot(2000, 1, 10)]
+	#[case::normal_para_slot(6000, 1, 10)]
+	#[case::normal_para_slot(6000, 9, 10)]
+	#[case::normal_para_slot(6000, 10, 10)]
+	#[should_panic(
+		expected = "too far in the future: parachain_slot=Slot(11), derived_from_relay_slot=Slot(10)"
+	)]
+	#[case::normal_para_slot(6000, 11, 10)]
+	fn test_para_slot_too_high(
+		#[case] para_slot_duration: u64,
+		#[case] para_slot: u64,
+		#[case] relay_slot: u64,
+	) {
+		type Hook = FixedVelocityConsensusHook<Test, 6000, DEFAULT_TEST_VELOCITY, 1>;
+
+		TestSlotDuration::set_slot_duration(para_slot_duration);
+		new_test_ext(para_slot).execute_with(|| {
+			let state_proof = relay_chain_state_proof(relay_slot);
 			Hook::on_state_proof(&state_proof);
 		});
 	}
@@ -212,8 +278,8 @@ mod test {
 		expected = "Parachain slot is too far in the future: parachain_slot=Slot(8), derived_from_relay_slot=Slot(5) velocity=2"
 	)]
 	fn test_para_slot_calculated_from_slot_duration_2() {
-		const VELOCITY: u32 = 2;
-		type Hook = FixedVelocityConsensusHook<Test, 3000, VELOCITY, 1>;
+		// Note: In contrast to tests below, relay chain slot duration is 3000 here.
+		type Hook = FixedVelocityConsensusHook<Test, 3000, DEFAULT_TEST_VELOCITY, 1>;
 
 		new_test_ext(8).execute_with(|| {
 			let state_proof = relay_chain_state_proof(10);
@@ -223,18 +289,17 @@ mod test {
 
 	#[test]
 	fn test_velocity_resets_on_new_relay_slot() {
-		const VELOCITY: u32 = 2;
-		type Hook = FixedVelocityConsensusHook<Test, 6000, VELOCITY, 1>;
+		type Hook = FixedVelocityConsensusHook<Test, 6000, DEFAULT_TEST_VELOCITY, 1>;
 
 		new_test_ext(1).execute_with(|| {
 			let state_proof = relay_chain_state_proof(10);
-			for authored in 0..=VELOCITY {
+			for authored in 0..=DEFAULT_TEST_VELOCITY {
 				Hook::on_state_proof(&state_proof);
 				assert_slot_info(10, authored + 1);
 			}
 
 			let state_proof = relay_chain_state_proof(11);
-			for authored in 0..=VELOCITY {
+			for authored in 0..=DEFAULT_TEST_VELOCITY {
 				Hook::on_state_proof(&state_proof);
 				assert_slot_info(11, authored + 1);
 			}
@@ -258,23 +323,9 @@ mod test {
 		});
 	}
 
-	#[test]
-	#[should_panic(
-		expected = "Parachain slot is too far in the future: parachain_slot=Slot(13), derived_from_relay_slot=Slot(10) velocity=2"
-	)]
-	fn test_future_parachain_slot_errors() {
-		type Hook = FixedVelocityConsensusHook<Test, 6000, 2, 1>;
-
-		new_test_ext(13).execute_with(|| {
-			let state_proof = relay_chain_state_proof(10);
-			Hook::on_state_proof(&state_proof);
-		});
-	}
-
 	#[test]
 	fn test_can_build_upon_true_when_empty() {
-		const VELOCITY: u32 = 2;
-		type Hook = FixedVelocityConsensusHook<Test, 6000, VELOCITY, 1>;
+		type Hook = FixedVelocityConsensusHook<Test, 6000, DEFAULT_TEST_VELOCITY, 1>;
 
 		new_test_ext(1).execute_with(|| {
 			let hash = H256::repeat_byte(0x1);
@@ -282,57 +333,43 @@ mod test {
 		});
 	}
 
-	#[test]
-	fn test_can_build_upon_respects_velocity() {
-		const VELOCITY: u32 = 2;
-		type Hook = FixedVelocityConsensusHook<Test, 6000, VELOCITY, 10>;
-
-		new_test_ext(1).execute_with(|| {
-			let hash = H256::repeat_byte(0x1);
-			let relay_slot = Slot::from(10);
-
-			set_relay_slot(10, VELOCITY - 1);
-			assert!(Hook::can_build_upon(hash, relay_slot));
-
-			set_relay_slot(10, VELOCITY);
-			assert!(Hook::can_build_upon(hash, relay_slot));
-
-			set_relay_slot(10, VELOCITY + 1);
-			// Velocity too high
-			assert!(!Hook::can_build_upon(hash, relay_slot));
-		});
-	}
-
-	#[test]
-	fn test_can_build_upon_slot_can_not_decrease() {
-		const VELOCITY: u32 = 2;
-		type Hook = FixedVelocityConsensusHook<Test, 6000, VELOCITY, 10>;
+	#[rstest]
+	#[case::slot_higher_ok(10, 11, DEFAULT_TEST_VELOCITY, true)]
+	#[case::slot_same_ok(10, 10, DEFAULT_TEST_VELOCITY, true)]
+	#[case::slot_decrease_illegal(10, 9, DEFAULT_TEST_VELOCITY, false)]
+	#[case::velocity_small_ok(10, 10, DEFAULT_TEST_VELOCITY - 1 , true)]
+	#[case::velocity_small_ok(10, 10, DEFAULT_TEST_VELOCITY - 2 , true)]
+	#[case::velocity_too_high_illegal(10, 10, DEFAULT_TEST_VELOCITY + 1 , false)]
+	fn test_can_build_upon_slot_can_not_decrease(
+		#[case] state_relay_slot: u64,
+		#[case] test_relay_slot: u64,
+		#[case] authored_in_slot: u32,
+		#[case] expected_result: bool,
+	) {
+		type Hook = FixedVelocityConsensusHook<Test, 6000, DEFAULT_TEST_VELOCITY, 10>;
 
 		new_test_ext(1).execute_with(|| {
 			let hash = H256::repeat_byte(0x1);
 
-			set_relay_slot(10, VELOCITY);
+			set_relay_slot(state_relay_slot, authored_in_slot);
 			// Slot moves backwards
-			assert!(!Hook::can_build_upon(hash, Slot::from(9)));
+			assert_eq!(Hook::can_build_upon(hash, Slot::from(test_relay_slot)), expected_result);
 		});
 	}
 
 	#[test]
 	fn test_can_build_upon_unincluded_segment_size() {
-		const VELOCITY: u32 = 2;
-		type Hook = FixedVelocityConsensusHook<Test, 6000, VELOCITY, 2>;
+		type Hook = FixedVelocityConsensusHook<Test, 6000, DEFAULT_TEST_VELOCITY, 2>;
 
 		new_test_ext(1).execute_with(|| {
 			let relay_slot = Slot::from(10);
 
-			set_relay_slot(10, VELOCITY);
+			set_relay_slot(10, DEFAULT_TEST_VELOCITY);
 			// Size after included is two, we can not build
-			let hash = H256::repeat_byte(0x1);
-			assert!(!Hook::can_build_upon(hash, relay_slot));
+			assert!(!Hook::can_build_upon(H256::repeat_byte(0x1), relay_slot));
 
 			// Size after included is one, we can build
-			let hash = H256::repeat_byte(0x2);
-			assert!(Hook::can_build_upon(hash, relay_slot));
+			assert!(Hook::can_build_upon(H256::repeat_byte(0x2), relay_slot));
 		});
 	}
 }
diff --git a/cumulus/polkadot-omni-node/lib/src/cli.rs b/cumulus/polkadot-omni-node/lib/src/cli.rs
index 1c47eae5773..69fe91751d0 100644
--- a/cumulus/polkadot-omni-node/lib/src/cli.rs
+++ b/cumulus/polkadot-omni-node/lib/src/cli.rs
@@ -23,15 +23,18 @@ use crate::{
 		NodeExtraArgs,
 	},
 };
-use clap::{Command, CommandFactory, FromArgMatches};
+use clap::{Command, CommandFactory, FromArgMatches, ValueEnum};
 use sc_chain_spec::ChainSpec;
 use sc_cli::{
 	CliConfiguration, DefaultConfigurationValues, ImportParams, KeystoreParams, NetworkParams,
 	RpcEndpoint, SharedParams, SubstrateCli,
 };
 use sc_service::{config::PrometheusConfig, BasePath};
-use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
-
+use std::{
+	fmt::{Debug, Display, Formatter},
+	marker::PhantomData,
+	path::PathBuf,
+};
 /// Trait that can be used to customize some of the customer-facing info related to the node binary
 /// that is being built using this library.
 ///
@@ -137,12 +140,17 @@ pub struct Cli<Config: CliConfig> {
 	#[arg(long)]
 	pub dev_block_time: Option<u64>,
 
-	/// EXPERIMENTAL: Use slot-based collator which can handle elastic scaling.
+	/// DEPRECATED: This feature has been stabilized, pLease use `--authoring slot-based` instead.
 	///
+	/// Use slot-based collator which can handle elastic scaling.
 	/// Use with care, this flag is unstable and subject to change.
-	#[arg(long)]
+	#[arg(long, conflicts_with = "authoring")]
 	pub experimental_use_slot_based: bool,
 
+	/// Authoring style to use.
+	#[arg(long, default_value_t = AuthoringPolicy::Lookahead)]
+	pub authoring: AuthoringPolicy,
+
 	/// Disable automatic hardware benchmarks.
 	///
 	/// By default these benchmarks are automatically ran at startup and measure
@@ -168,10 +176,34 @@ pub struct Cli<Config: CliConfig> {
 	pub(crate) _phantom: PhantomData<Config>,
 }
 
+/// Collator implementation to use.
+#[derive(PartialEq, Debug, ValueEnum, Clone, Copy)]
+pub enum AuthoringPolicy {
+	/// Use the lookahead collator. Builds a block once per imported relay chain block and
+	/// on relay chain forks. Default for asynchronous backing chains.
+	Lookahead,
+	/// Use the slot-based collator. Builds a block based on time. Can utilize multiple cores,
+	/// always builds on the best relay chain block available. Should be used with elastic-scaling
+	/// chains.
+	SlotBased,
+}
+
+impl Display for AuthoringPolicy {
+	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+		match self {
+			AuthoringPolicy::Lookahead => write!(f, "lookahead"),
+			AuthoringPolicy::SlotBased => write!(f, "slot-based"),
+		}
+	}
+}
+
 impl<Config: CliConfig> Cli<Config> {
 	pub(crate) fn node_extra_args(&self) -> NodeExtraArgs {
 		NodeExtraArgs {
-			use_slot_based_consensus: self.experimental_use_slot_based,
+			authoring_policy: self
+				.experimental_use_slot_based
+				.then(|| AuthoringPolicy::SlotBased)
+				.unwrap_or(self.authoring),
 			export_pov: self.export_pov_to_path.clone(),
 		}
 	}
diff --git a/cumulus/polkadot-omni-node/lib/src/command.rs b/cumulus/polkadot-omni-node/lib/src/command.rs
index bf0d264e8c9..818d3400288 100644
--- a/cumulus/polkadot-omni-node/lib/src/command.rs
+++ b/cumulus/polkadot-omni-node/lib/src/command.rs
@@ -223,6 +223,14 @@ pub fn run<CliConfig: crate::cli::CliConfig>(cmd_config: RunConfig) -> Result<()
 				RelayChainCli::<CliConfig>::new(runner.config(), cli.relay_chain_args.iter());
 			let collator_options = cli.run.collator_options();
 
+			if cli.experimental_use_slot_based {
+				log::warn!(
+					"Deprecated: The flag --experimental-use-slot-based is no longer \
+				supported. Please use --authoring slot-based instead. This feature will be removed \
+				after May 2025."
+				);
+			}
+
 			runner.run_node_until_exit(|config| async move {
 				let node_spec =
 					new_node_spec(&config, &cmd_config.runtime_resolver, &cli.node_extra_args())?;
diff --git a/cumulus/polkadot-omni-node/lib/src/common/mod.rs b/cumulus/polkadot-omni-node/lib/src/common/mod.rs
index af003b87e3d..256aef02b77 100644
--- a/cumulus/polkadot-omni-node/lib/src/common/mod.rs
+++ b/cumulus/polkadot-omni-node/lib/src/common/mod.rs
@@ -26,6 +26,7 @@ pub mod runtime;
 pub mod spec;
 pub mod types;
 
+use crate::cli::AuthoringPolicy;
 use cumulus_primitives_core::{CollectCollationInfo, GetCoreSelectorApi};
 use sc_client_db::DbHash;
 use sc_offchain::OffchainWorkerApi;
@@ -104,7 +105,10 @@ where
 
 /// Extra args that are passed when creating a new node spec.
 pub struct NodeExtraArgs {
-	pub use_slot_based_consensus: bool,
+	/// The authoring policy to use.
+	///
+	/// Can be used to influence details of block production.
+	pub authoring_policy: AuthoringPolicy,
 
 	/// If set, each `PoV` build by the node will be exported to this folder.
 	pub export_pov: Option<PathBuf>,
diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
index 0d526b09834..3444b21c8aa 100644
--- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
+++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
@@ -15,6 +15,7 @@
 // limitations under the License.
 
 use crate::{
+	cli::AuthoringPolicy,
 	common::{
 		aura::{AuraIdT, AuraRuntimeApi},
 		rpc::BuildParachainRpcExtensions,
@@ -217,7 +218,7 @@ where
 		+ substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>,
 	AuraId: AuraIdT + Sync,
 {
-	if extra_args.use_slot_based_consensus {
+	if extra_args.authoring_policy == AuthoringPolicy::SlotBased {
 		Box::new(AuraNode::<
 			Block,
 			RuntimeApi,
@@ -250,7 +251,7 @@ where
 {
 	#[docify::export_content]
 	fn launch_slot_based_collator<CIDP, CHP, Proposer, CS, Spawner>(
-		params: SlotBasedParams<
+		params_with_export: SlotBasedParams<
 			Block,
 			ParachainBlockImport<
 				Block,
@@ -277,7 +278,9 @@ where
 		CS: CollatorServiceInterface<Block> + Send + Sync + Clone + 'static,
 		Spawner: SpawnNamed,
 	{
-		slot_based::run::<Block, <AuraId as AppCrypto>::Pair, _, _, _, _, _, _, _, _, _>(params);
+		slot_based::run::<Block, <AuraId as AppCrypto>::Pair, _, _, _, _, _, _, _, _, _>(
+			params_with_export,
+		);
 	}
 }
 
@@ -313,13 +316,13 @@ where
 		relay_chain_interface: Arc<dyn RelayChainInterface>,
 		transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
 		keystore: KeystorePtr,
-		_relay_chain_slot_duration: Duration,
+		relay_chain_slot_duration: Duration,
 		para_id: ParaId,
 		collator_key: CollatorPair,
 		_overseer_handle: OverseerHandle,
 		announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
 		backend: Arc<ParachainBackend<Block>>,
-		_node_extra_args: NodeExtraArgs,
+		node_extra_args: NodeExtraArgs,
 		block_import_handle: SlotBasedBlockImportHandle<Block>,
 	) -> Result<(), Error> {
 		let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
@@ -345,6 +348,7 @@ where
 			para_client: client.clone(),
 			para_backend: backend.clone(),
 			relay_client: relay_chain_interface,
+			relay_chain_slot_duration,
 			code_hash_provider: move |block_hash| {
 				client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
 			},
@@ -355,13 +359,15 @@ where
 			collator_service,
 			authoring_duration: Duration::from_millis(2000),
 			reinitialize: false,
-			slot_drift: Duration::from_secs(1),
+			slot_offset: Duration::from_secs(1),
 			block_import_handle,
 			spawner: task_manager.spawn_handle(),
+			export_pov: node_extra_args.export_pov,
 		};
 
 		// We have a separate function only to be able to use `docify::export` on this piece of
 		// code.
+
 		Self::launch_slot_based_collator(params);
 
 		Ok(())
diff --git a/cumulus/test/runtime/Cargo.toml b/cumulus/test/runtime/Cargo.toml
index e24424a07bd..8f1dc4c2b37 100644
--- a/cumulus/test/runtime/Cargo.toml
+++ b/cumulus/test/runtime/Cargo.toml
@@ -89,6 +89,11 @@ std = [
 	"substrate-wasm-builder",
 ]
 increment-spec-version = []
+# A runtime with elastic-scaling configuration.
 elastic-scaling = []
+# A runtime with low slot duration of 500ms for low-latency testing with 12 cores.
 elastic-scaling-500ms = []
+# A runtime with a slot duration of 6s but parameters that allow multiple blocks per slot.
+elastic-scaling-multi-block-slot = []
+# A runtime with 6s slot duration which sends RFC-103 compatible UMP signals.
 experimental-ump-signals = ["cumulus-pallet-parachain-system/experimental-ump-signals"]
diff --git a/cumulus/test/runtime/build.rs b/cumulus/test/runtime/build.rs
index 99d30ce6dc3..b56ba647020 100644
--- a/cumulus/test/runtime/build.rs
+++ b/cumulus/test/runtime/build.rs
@@ -47,6 +47,14 @@ fn main() {
 		.import_memory()
 		.set_file_name("wasm_binary_elastic_scaling_500ms.rs")
 		.build();
+
+	WasmBuilder::new()
+		.with_current_project()
+		.enable_feature("elastic-scaling-multi-block-slot")
+		.enable_feature("experimental-ump-signals")
+		.import_memory()
+		.set_file_name("wasm_binary_elastic_scaling_multi_block_slot.rs")
+		.build();
 }
 
 #[cfg(not(feature = "std"))]
diff --git a/cumulus/test/runtime/src/lib.rs b/cumulus/test/runtime/src/lib.rs
index 09e1361603a..5575fbbd840 100644
--- a/cumulus/test/runtime/src/lib.rs
+++ b/cumulus/test/runtime/src/lib.rs
@@ -41,6 +41,11 @@ pub mod elastic_scaling {
 	include!(concat!(env!("OUT_DIR"), "/wasm_binary_elastic_scaling.rs"));
 }
 
+pub mod elastic_scaling_multi_block_slot {
+	#[cfg(feature = "std")]
+	include!(concat!(env!("OUT_DIR"), "/wasm_binary_elastic_scaling_multi_block_slot.rs"));
+}
+
 mod genesis_config_presets;
 mod test_pallet;
 
@@ -102,22 +107,32 @@ impl_opaque_keys! {
 /// The para-id used in this runtime.
 pub const PARACHAIN_ID: u32 = 100;
 
-#[cfg(not(any(feature = "elastic-scaling", feature = "elastic-scaling-500ms")))]
-pub const MILLISECS_PER_BLOCK: u64 = 6000;
+#[cfg(all(
+	feature = "elastic-scaling-multi-block-slot",
+	not(any(feature = "elastic-scaling", feature = "elastic-scaling-500ms"))
+))]
+pub const BLOCK_PROCESSING_VELOCITY: u32 = 6;
 
-#[cfg(all(feature = "elastic-scaling", not(feature = "elastic-scaling-500ms")))]
-pub const MILLISECS_PER_BLOCK: u64 = 2000;
+#[cfg(all(
+	feature = "elastic-scaling-500ms",
+	not(any(feature = "elastic-scaling", feature = "elastic-scaling-multi-block-slot"))
+))]
+pub const BLOCK_PROCESSING_VELOCITY: u32 = 12;
 
-#[cfg(feature = "elastic-scaling-500ms")]
-pub const MILLISECS_PER_BLOCK: u64 = 500;
+#[cfg(feature = "elastic-scaling")]
+pub const BLOCK_PROCESSING_VELOCITY: u32 = 3;
 
-const BLOCK_PROCESSING_VELOCITY: u32 =
-	RELAY_CHAIN_SLOT_DURATION_MILLIS / (MILLISECS_PER_BLOCK as u32);
+#[cfg(not(any(
+	feature = "elastic-scaling",
+	feature = "elastic-scaling-500ms",
+	feature = "elastic-scaling-multi-block-slot"
+)))]
+pub const BLOCK_PROCESSING_VELOCITY: u32 = 1;
 
 // The `+2` shouldn't be needed, https://github.com/paritytech/polkadot-sdk/issues/5260
 const UNINCLUDED_SEGMENT_CAPACITY: u32 = BLOCK_PROCESSING_VELOCITY * 2 + 2;
 
-pub const SLOT_DURATION: u64 = MILLISECS_PER_BLOCK;
+pub const SLOT_DURATION: u64 = 6000;
 
 const RELAY_CHAIN_SLOT_DURATION_MILLIS: u32 = 6000;
 
@@ -163,7 +178,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
 pub const EPOCH_DURATION_IN_BLOCKS: u32 = 10 * MINUTES;
 
 // These time units are defined in number of blocks.
-pub const MINUTES: BlockNumber = 60_000 / (MILLISECS_PER_BLOCK as BlockNumber);
+pub const MINUTES: BlockNumber = 60_000 / (SLOT_DURATION as BlockNumber);
 pub const HOURS: BlockNumber = MINUTES * 60;
 pub const DAYS: BlockNumber = HOURS * 24;
 
@@ -241,7 +256,10 @@ impl cumulus_pallet_weight_reclaim::Config for Runtime {
 }
 
 parameter_types! {
-	pub const MinimumPeriod: u64 = SLOT_DURATION / 2;
+	pub const MinimumPeriod: u64 = 0;
+}
+
+parameter_types! {
 	pub const PotId: PalletId = PalletId(*b"PotStake");
 	pub const SessionLength: BlockNumber = 10 * MINUTES;
 	pub const Offset: u32 = 0;
diff --git a/cumulus/test/service/src/chain_spec.rs b/cumulus/test/service/src/chain_spec.rs
index b59bd7ab46b..ecac18f2ed9 100644
--- a/cumulus/test/service/src/chain_spec.rs
+++ b/cumulus/test/service/src/chain_spec.rs
@@ -136,3 +136,12 @@ pub fn get_elastic_scaling_mvp_chain_spec(id: Option<ParaId>) -> ChainSpec {
 			.expect("WASM binary was not built, please build it!"),
 	)
 }
+
+pub fn get_elastic_scaling_multi_block_slot_chain_spec(id: Option<ParaId>) -> ChainSpec {
+	get_chain_spec_with_extra_endowed(
+		id,
+		Default::default(),
+		cumulus_test_runtime::elastic_scaling_multi_block_slot::WASM_BINARY
+			.expect("WASM binary was not built, please build it!"),
+	)
+}
diff --git a/cumulus/test/service/src/cli.rs b/cumulus/test/service/src/cli.rs
index 7909ffbf714..eee592f3599 100644
--- a/cumulus/test/service/src/cli.rs
+++ b/cumulus/test/service/src/cli.rs
@@ -14,8 +14,7 @@
 // You should have received a copy of the GNU General Public License
 // along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
 
-use std::path::PathBuf;
-
+use clap::ValueEnum;
 use cumulus_client_cli::{ExportGenesisHeadCommand, ExportGenesisWasmCommand};
 use polkadot_service::{ChainSpec, ParaId, PrometheusConfig};
 use sc_cli::{
@@ -23,6 +22,10 @@ use sc_cli::{
 	Result as CliResult, RpcEndpoint, SharedParams, SubstrateCli,
 };
 use sc_service::BasePath;
+use std::{
+	fmt::{Display, Formatter},
+	path::PathBuf,
+};
 
 #[derive(Debug, clap::Parser)]
 #[command(
@@ -51,11 +54,29 @@ pub struct TestCollatorCli {
 	#[arg(long)]
 	pub fail_pov_recovery: bool,
 
-	/// EXPERIMENTAL: Use slot-based collator which can handle elastic scaling.
-	///
-	/// Use with care, this flag is unstable and subject to change.
-	#[arg(long)]
-	pub experimental_use_slot_based: bool,
+	/// Authoring style to use.
+	#[arg(long, default_value_t = AuthoringPolicy::Lookahead)]
+	pub authoring: AuthoringPolicy,
+}
+
+/// Collator implementation to use.
+#[derive(PartialEq, Debug, ValueEnum, Clone, Copy)]
+pub enum AuthoringPolicy {
+	/// Use the lookahead collator. Builds blocks once per relay chain block,
+	/// builds on relay chain forks.
+	Lookahead,
+	/// Use the slot-based collator which can handle elastic-scaling. Builds blocks based on time
+	/// and can utilize multiple cores, always builds on the best relay chain block available.
+	SlotBased,
+}
+
+impl Display for AuthoringPolicy {
+	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+		match self {
+			AuthoringPolicy::Lookahead => write!(f, "lookahead"),
+			AuthoringPolicy::SlotBased => write!(f, "slot-based"),
+		}
+	}
 }
 
 #[derive(Debug, clap::Subcommand)]
@@ -280,9 +301,14 @@ impl SubstrateCli for TestCollatorCli {
 					ParaId::from(2300),
 				))) as Box<_>
 			},
+			"elastic-scaling-multi-block-slot" => {
+				tracing::info!("Using elastic-scaling multi-block-slot chain spec.");
+				Box::new(cumulus_test_service::get_elastic_scaling_multi_block_slot_chain_spec(
+					Some(ParaId::from(2400)),
+				)) as Box<_>
+			},
 			path => {
-				let chain_spec =
-					cumulus_test_service::chain_spec::ChainSpec::from_json_file(path.into())?;
+				let chain_spec = cumulus_test_service::ChainSpec::from_json_file(path.into())?;
 				Box::new(chain_spec)
 			},
 		})
diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs
index eaffbfdcadc..e8870422a6d 100644
--- a/cumulus/test/service/src/lib.rs
+++ b/cumulus/test/service/src/lib.rs
@@ -498,14 +498,16 @@ where
 					},
 					keystore,
 					collator_key,
+					relay_chain_slot_duration,
 					para_id,
 					proposer,
 					collator_service,
 					authoring_duration: Duration::from_millis(2000),
 					reinitialize: false,
-					slot_drift: Duration::from_secs(1),
+					slot_offset: Duration::from_secs(1),
 					block_import_handle: slot_based_handle,
 					spawner: task_manager.spawn_handle(),
+					export_pov: None,
 				};
 
 				slot_based::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _, _>(params);
diff --git a/cumulus/test/service/src/main.rs b/cumulus/test/service/src/main.rs
index df331577af5..14486431e1b 100644
--- a/cumulus/test/service/src/main.rs
+++ b/cumulus/test/service/src/main.rs
@@ -18,7 +18,7 @@ mod cli;
 
 use std::sync::Arc;
 
-use cli::{RelayChainCli, Subcommand, TestCollatorCli};
+use cli::{AuthoringPolicy, RelayChainCli, Subcommand, TestCollatorCli};
 use cumulus_primitives_core::relay_chain::CollatorPair;
 use cumulus_test_service::{chain_spec, new_partial, AnnounceBlockFn};
 use polkadot_service::IdentifyNetworkBackend;
@@ -56,7 +56,7 @@ fn main() -> Result<(), sc_cli::Error> {
 		None => {
 			let log_filters = cli.run.normalize().log_filters();
 			let mut builder = sc_cli::LoggerBuilder::new(log_filters.unwrap_or_default());
-			builder.with_colors(true);
+			builder.with_colors(false);
 			let _ = builder.init();
 
 			let collator_options = cli.run.collator_options();
@@ -103,12 +103,12 @@ fn main() -> Result<(), sc_cli::Error> {
 					cumulus_test_service::Consensus::Null
 				})
 				.unwrap_or(cumulus_test_service::Consensus::Aura);
-
-			// If the network backend is unspecified, use the default for the given chain.
-			let default_backend = relay_chain_config.chain_spec.network_backend();
-			let network_backend =
-				relay_chain_config.network.network_backend.unwrap_or(default_backend);
-			let (mut task_manager, _, _, _, _, _) = tokio_runtime
+			let use_slot_based_collator = cli.authoring == AuthoringPolicy::SlotBased;
+            // If the network backend is unspecified, use the default for the given chain.
+            let default_backend = relay_chain_config.chain_spec.network_backend();
+            let network_backend =
+                relay_chain_config.network.network_backend.unwrap_or(default_backend);
+            let (mut task_manager, _, _, _, _, _) = tokio_runtime
 				.block_on(async move {
 					match network_backend {
 						sc_network::config::NetworkBackendType::Libp2p =>
@@ -126,7 +126,7 @@ fn main() -> Result<(), sc_cli::Error> {
 								consensus,
 								collator_options,
 								true,
-								cli.experimental_use_slot_based,
+								use_slot_based_collator,
 							)
 							.await,
 						sc_network::config::NetworkBackendType::Litep2p =>
@@ -144,7 +144,7 @@ fn main() -> Result<(), sc_cli::Error> {
 								consensus,
 								collator_options,
 								true,
-								cli.experimental_use_slot_based,
+								use_slot_based_collator,
 							)
 							.await,
 					}
diff --git a/cumulus/zombienet/tests/0008-elastic_authoring.toml b/cumulus/zombienet/tests/0008-elastic_authoring.toml
index 516c152471b..ff0e13bf1a7 100644
--- a/cumulus/zombienet/tests/0008-elastic_authoring.toml
+++ b/cumulus/zombienet/tests/0008-elastic_authoring.toml
@@ -2,45 +2,53 @@
 timeout = 1000
 
 [relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params]
-  max_validators_per_core = 1
-  num_cores = 4
+max_validators_per_core = 1
+num_cores = 4
 
 [relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params]
-  max_approval_coalesce_count = 5
+max_approval_coalesce_count = 5
 
 [relaychain]
-default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}"
+default_image = "{{RELAY_IMAGE}}"
 chain = "rococo-local"
 command = "polkadot"
 
-  [[relaychain.nodes]]
-  name = "alice"
-  args = ["" ]
+[[relaychain.nodes]]
+name = "alice"
+args = [""]
 
-  [[relaychain.node_groups]]
-  name = "validator"
-  args = ["-lruntime=debug,parachain=trace" ]
-  count = 8
+[[relaychain.node_groups]]
+name = "validator"
+args = ["-lruntime=debug,parachain=trace"]
+count = 8
 
-# Slot based authoring with 3 cores and 2s slot duration
+# Slot based authoring with 3 cores and 6s slot duration
 [[parachains]]
 id = 2100
 chain = "elastic-scaling"
 add_to_genesis = true
 
-  [[parachains.collators]]
-  name = "collator-elastic"
-  image = "{{COL_IMAGE}}"
-  command = "test-parachain"
-  args = ["-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug", "--force-authoring", "--experimental-use-slot-based"]
+[[parachains.collators]]
+name = "collator-elastic"
+image = "{{COL_IMAGE}}"
+command = "test-parachain"
+args = [
+	"--force-authoring",
+	"--authoring=slot-based",
+	"-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug"
+]
 
 # Slot based authoring with 1 core and 6s slot duration
 [[parachains]]
 id = 2000
 add_to_genesis = true
 
-  [[parachains.collators]]
-  name = "collator-single-core"
-  image = "{{COL_IMAGE}}"
-  command = "test-parachain"
-  args = ["-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug", "--force-authoring", "--experimental-use-slot-based"]
+[[parachains.collators]]
+name = "collator-single-core"
+image = "{{COL_IMAGE}}"
+command = "test-parachain"
+args = [
+	"--force-authoring",
+	"--authoring=slot-based",
+	"-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug"
+]
diff --git a/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml b/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml
index b65ed77ec1b..19b001d87b5 100644
--- a/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml
+++ b/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml
@@ -10,59 +10,60 @@ limits = { memory = "4G", cpu = "2" }
 requests = { memory = "2G", cpu = "1" }
 
 [relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params]
-  max_validators_per_core = 1
-  num_cores = 4
+max_validators_per_core = 1
+num_cores = 4
 
 [relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params]
-  max_approval_coalesce_count = 5
+max_approval_coalesce_count = 5
 
 [relaychain]
-default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}"
+default_image = "{{RELAY_IMAGE}}"
 chain = "rococo-local"
 command = "polkadot"
 
-  [[relaychain.nodes]]
-  name = "alice"
-  args = ["" ]
+[[relaychain.nodes]]
+name = "alice"
+args = [""]
 
-  [[relaychain.node_groups]]
-  name = "validator"
-  args = [
-    "-lruntime=debug,parachain=trace",
-    "--reserved-only",
-    "--reserved-nodes {{'alice'|zombie('multiAddress')}}"
-  ]
-  count = 8
+[[relaychain.node_groups]]
+name = "validator"
+args = [
+	"-lruntime=debug,parachain=trace",
+	"--reserved-only",
+	"--reserved-nodes {{'alice'|zombie('multiAddress')}}"
+]
+count = 8
 
-# Slot based authoring with 3 cores and 2s slot duration
+# Slot based authoring with 3 cores and 6s slot duration
 [[parachains]]
 id = 2100
 chain = "elastic-scaling"
 add_to_genesis = false
 
-  # run 'recovery-target' as a parachain full node
-  [[parachains.collators]]
-  name = "recovery-target"
-  validator = false # full node
-  image = "{{COL_IMAGE}}"
-  command = "test-parachain"
-  args = [
-    "-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug",
-    "--disable-block-announcements",
-    "--in-peers 0",
-    "--out-peers 0",
-    "--",
-    "--reserved-only",
-    "--reserved-nodes {{'alice'|zombie('multiAddress')}}"]
+# run 'recovery-target' as a parachain full node
+[[parachains.collators]]
+name = "recovery-target"
+validator = false # full node
+image = "{{COL_IMAGE}}"
+command = "test-parachain"
+args = [
+	"-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug",
+	"--disable-block-announcements",
+	"--in-peers 0",
+	"--out-peers 0",
+	"--",
+	"--reserved-only",
+	"--reserved-nodes {{'alice'|zombie('multiAddress')}}"]
 
-  # Slot based authoring with 3 cores and 2s slot duration
-  [[parachains.collators]]
-  name = "collator-elastic"
-  image = "{{COL_IMAGE}}"
-  command = "test-parachain"
-  args = [
-    "-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug",
-    "--disable-block-announcements",
-    "--force-authoring",
-    "--experimental-use-slot-based"
-  ]
+# Slot based authoring with 3 cores and 2s slot duration
+[[parachains.collators]]
+name = "collator-elastic"
+image = "{{COL_IMAGE}}"
+command = "test-parachain"
+args = [
+	"-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug",
+	"--disable-block-announcements",
+	"--force-authoring",
+	"--authoring",
+	"slot-based"
+]
diff --git a/cumulus/zombienet/zombienet-sdk-helpers/Cargo.toml b/cumulus/zombienet/zombienet-sdk-helpers/Cargo.toml
new file mode 100644
index 00000000000..1c064c2c3c6
--- /dev/null
+++ b/cumulus/zombienet/zombienet-sdk-helpers/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "cumulus-zombienet-sdk-helpers"
+version = "0.1.0"
+description = "Zomebienet-sdk helpers for parachain related tests."
+authors.workspace = true
+edition.workspace = true
+license.workspace = true
+publish = false
+
+[dependencies]
+anyhow = { workspace = true, default-features = true }
+codec = { workspace = true, features = ["derive"] }
+log = { workspace = true }
+polkadot-primitives = { workspace = true, default-features = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+subxt = { workspace = true, features = ["native"] }
+subxt-signer = { workspace = true }
+tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
+
diff --git a/cumulus/zombienet/zombienet-sdk-helpers/src/lib.rs b/cumulus/zombienet/zombienet-sdk-helpers/src/lib.rs
new file mode 100644
index 00000000000..2a04f65d9cd
--- /dev/null
+++ b/cumulus/zombienet/zombienet-sdk-helpers/src/lib.rs
@@ -0,0 +1,279 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+use anyhow::anyhow;
+use codec::Decode;
+use polkadot_primitives::{vstaging::CandidateReceiptV2, Id as ParaId};
+use std::{
+	collections::{HashMap, HashSet},
+	ops::Range,
+};
+use subxt::{
+	blocks::Block, events::Events, ext::scale_value::value, tx::DynamicPayload, utils::H256,
+	OnlineClient, PolkadotConfig,
+};
+use tokio::join;
+
+// Maximum number of blocks to wait for a session change.
+// If it does not arrive for whatever reason, we should not wait forever.
+const WAIT_MAX_BLOCKS_FOR_SESSION: u32 = 50;
+
+/// Create a batch call to assign cores to a parachain.
+pub fn create_assign_core_call(core_and_para: &[(u32, u32)]) -> DynamicPayload {
+	let mut assign_cores = vec![];
+	for (core, para_id) in core_and_para.iter() {
+		assign_cores.push(value! {
+			Coretime(assign_core { core : *core, begin: 0, assignment: ((Task(*para_id), 57600)), end_hint: None() })
+		});
+	}
+
+	subxt::tx::dynamic(
+		"Sudo",
+		"sudo",
+		vec![value! {
+			Utility(batch { calls: assign_cores })
+		}],
+	)
+}
+
+/// Find an event in subxt `Events` and attempt to decode the fields fo the event.
+fn find_event_and_decode_fields<T: Decode>(
+	events: &Events<PolkadotConfig>,
+	pallet: &str,
+	variant: &str,
+) -> Result<Vec<T>, anyhow::Error> {
+	let mut result = vec![];
+	for event in events.iter() {
+		let event = event?;
+		if event.pallet_name() == pallet && event.variant_name() == variant {
+			let field_bytes = event.field_bytes().to_vec();
+			result.push(T::decode(&mut &field_bytes[..])?);
+		}
+	}
+	Ok(result)
+}
+
+// Helper function for asserting the throughput of parachains (total number of backed candidates in
+// a window of relay chain blocks), after the first session change.
+// Blocks with session changes are generally ignores.
+pub async fn assert_finalized_para_throughput(
+	relay_client: &OnlineClient<PolkadotConfig>,
+	stop_after: u32,
+	expected_candidate_ranges: HashMap<ParaId, Range<u32>>,
+) -> Result<(), anyhow::Error> {
+	let mut blocks_sub = relay_client.blocks().subscribe_finalized().await?;
+	let mut candidate_count: HashMap<ParaId, u32> = HashMap::new();
+	let mut current_block_count = 0;
+
+	let valid_para_ids: Vec<ParaId> = expected_candidate_ranges.keys().cloned().collect();
+
+	// Wait for the first session, block production on the parachain will start after that.
+	wait_for_first_session_change(&mut blocks_sub).await?;
+
+	while let Some(block) = blocks_sub.next().await {
+		let block = block?;
+		log::debug!("Finalized relay chain block {}", block.number());
+		let events = block.events().await?;
+		let is_session_change = events.iter().any(|event| {
+			event.as_ref().is_ok_and(|event| {
+				event.pallet_name() == "Session" && event.variant_name() == "NewSession"
+			})
+		});
+
+		// Do not count blocks with session changes, no backed blocks there.
+		if is_session_change {
+			continue
+		}
+
+		current_block_count += 1;
+
+		let receipts = find_event_and_decode_fields::<CandidateReceiptV2<H256>>(
+			&events,
+			"ParaInclusion",
+			"CandidateBacked",
+		)?;
+
+		for receipt in receipts {
+			let para_id = receipt.descriptor.para_id();
+			log::debug!("Block backed for para_id {para_id}");
+			if !valid_para_ids.contains(&para_id) {
+				return Err(anyhow!("Invalid ParaId detected: {}", para_id));
+			};
+			*(candidate_count.entry(para_id).or_default()) += 1;
+		}
+
+		if current_block_count == stop_after {
+			break;
+		}
+	}
+
+	log::info!(
+		"Reached {} finalized relay chain blocks that contain backed candidates. The per-parachain distribution is: {:#?}",
+		stop_after,
+		candidate_count
+	);
+
+	for (para_id, expected_candidate_range) in expected_candidate_ranges {
+		let actual = candidate_count
+			.get(&para_id)
+			.expect("ParaId did not have any backed candidates");
+		assert!(
+			expected_candidate_range.contains(actual),
+			"Candidate count {actual} not within range {expected_candidate_range:?}"
+		);
+	}
+
+	Ok(())
+}
+// Helper function for asserting the throughput of parachains (total number of backed candidates in
+// a window of relay chain blocks), after the first session change.
+// Blocks with session changes are generally ignores.
+pub async fn assert_para_throughput(
+	relay_client: &OnlineClient<PolkadotConfig>,
+	stop_after: u32,
+	expected_candidate_ranges: HashMap<ParaId, Range<u32>>,
+) -> Result<(), anyhow::Error> {
+	// Check on backed blocks in all imported relay chain blocks. The slot-based collator
+	// builds on the best fork currently. It can happen that it builds on a fork which is not
+	// getting finalized, in which case we will lose some blocks. This makes it harder to build
+	// stable asserts. Once we are building on older relay parents, this can be changed to
+	// finalized blocks again.
+	let mut blocks_sub = relay_client.blocks().subscribe_all().await?;
+	let mut candidate_count: HashMap<ParaId, (u32, u32)> = HashMap::new();
+	let mut start_height: Option<u32> = None;
+
+	let valid_para_ids: Vec<ParaId> = expected_candidate_ranges.keys().cloned().collect();
+
+	// Wait for the first session, block production on the parachain will start after that.
+	wait_for_first_session_change(&mut blocks_sub).await?;
+
+	let mut session_change_seen_at = 0u32;
+	while let Some(block) = blocks_sub.next().await {
+		let block = block?;
+		let block_number = Into::<u32>::into(block.number());
+
+		let events = block.events().await?;
+		let mut para_ids_to_increment: HashSet<ParaId> = Default::default();
+		let is_session_change = events.iter().any(|event| {
+			event.as_ref().is_ok_and(|event| {
+				event.pallet_name() == "Session" && event.variant_name() == "NewSession"
+			})
+		});
+
+		// Do not count blocks with session changes, no backed blocks there.
+		if is_session_change {
+			if block_number == session_change_seen_at {
+				continue;
+			}
+
+			// Increment the start height to account for a block level that has no
+			// backed blocks.
+			start_height = start_height.map(|h| h + 1);
+			session_change_seen_at = block_number;
+			continue;
+		}
+
+		let receipts = find_event_and_decode_fields::<CandidateReceiptV2<H256>>(
+			&events,
+			"ParaInclusion",
+			"CandidateBacked",
+		)?;
+
+		for receipt in receipts {
+			let para_id = receipt.descriptor.para_id();
+			if !valid_para_ids.contains(&para_id) {
+				return Err(anyhow!("Invalid ParaId detected: {}", para_id));
+			};
+			log::debug!(
+				"Block backed for para_id {para_id} at relay: #{} ({})",
+				block.number(),
+				block.hash()
+			);
+			let (counter, accounted_block_height) = candidate_count.entry(para_id).or_default();
+			if block_number > *accounted_block_height {
+				*counter += 1;
+				// Increment later to count multiple descriptors in the same block.
+				para_ids_to_increment.insert(para_id);
+			}
+		}
+
+		for para_id in para_ids_to_increment.iter() {
+			candidate_count.entry(*para_id).or_default().1 = block_number;
+		}
+
+		if block_number - *start_height.get_or_insert_with(|| block_number - 1) >= stop_after {
+			log::info!(
+				"Finished condition: block_height: {:?}, start_height: {:?}",
+				block.number(),
+				start_height
+			);
+			break;
+		}
+	}
+
+	log::info!(
+		"Reached {} relay chain blocks that contain backed candidates. The per-parachain distribution is: {:#?}",
+		stop_after,
+		candidate_count
+	);
+
+	for (para_id, expected_candidate_range) in expected_candidate_ranges {
+		let actual = candidate_count
+			.get(&para_id)
+			.expect("ParaId did not have any backed candidates");
+		assert!(
+			expected_candidate_range.contains(&actual.0),
+			"Candidate count {} not within range {expected_candidate_range:?}",
+			actual.0
+		);
+	}
+
+	Ok(())
+}
+
+/// Wait for the first block with a session change.
+///
+/// The session change is detected by inspecting the events in the block.
+async fn wait_for_first_session_change(
+	blocks_sub: &mut subxt::backend::StreamOfResults<
+		Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
+	>,
+) -> Result<(), anyhow::Error> {
+	let mut waited_block_num = 0;
+	while let Some(block) = blocks_sub.next().await {
+		let block = block?;
+		log::debug!("Finalized relay chain block {}", block.number());
+		let events = block.events().await?;
+		let is_session_change = events.iter().any(|event| {
+			event.as_ref().is_ok_and(|event| {
+				event.pallet_name() == "Session" && event.variant_name() == "NewSession"
+			})
+		});
+
+		if is_session_change {
+			return Ok(())
+		}
+
+		if waited_block_num >= WAIT_MAX_BLOCKS_FOR_SESSION {
+			return Err(anyhow::format_err!("Waited for {WAIT_MAX_BLOCKS_FOR_SESSION}, a new session should have been arrived by now."));
+		}
+
+		waited_block_num += 1;
+	}
+	Ok(())
+}
+
+// Helper function that asserts the maximum finality lag.
+pub async fn assert_finality_lag(
+	client: &OnlineClient<PolkadotConfig>,
+	maximum_lag: u32,
+) -> Result<(), anyhow::Error> {
+	let mut best_stream = client.blocks().subscribe_best().await?;
+	let mut fut_stream = client.blocks().subscribe_finalized().await?;
+	let (Some(Ok(best)), Some(Ok(finalized))) = join!(best_stream.next(), fut_stream.next()) else {
+		return Err(anyhow::format_err!("Unable to fetch best an finalized block!"));
+	};
+	let finality_lag = best.number() - finalized.number();
+	assert!(finality_lag <= maximum_lag, "Expected finality to lag by a maximum of {maximum_lag} blocks, but was lagging by {finality_lag} blocks.");
+	Ok(())
+}
diff --git a/cumulus/zombienet/zombienet-sdk/Cargo.toml b/cumulus/zombienet/zombienet-sdk/Cargo.toml
new file mode 100644
index 00000000000..da8a884378c
--- /dev/null
+++ b/cumulus/zombienet/zombienet-sdk/Cargo.toml
@@ -0,0 +1,25 @@
+[package]
+name = "cumulus-zombienet-sdk-tests"
+version = "0.1.0"
+description = "Zomebienet-sdk tests for cumulus."
+authors.workspace = true
+edition.workspace = true
+license.workspace = true
+publish = false
+
+[dependencies]
+anyhow = { workspace = true }
+codec = { workspace = true, features = ["derive"] }
+env_logger = { workspace = true }
+log = { workspace = true }
+polkadot-primitives = { workspace = true, default-features = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+subxt = { workspace = true }
+subxt-signer = { workspace = true }
+tokio = { workspace = true, features = ["rt-multi-thread"] }
+zombienet-sdk = { workspace = true }
+cumulus-zombienet-sdk-helpers = { workspace = true }
+
+[features]
+zombie-ci = []
diff --git a/cumulus/zombienet/zombienet-sdk/README.md b/cumulus/zombienet/zombienet-sdk/README.md
new file mode 100644
index 00000000000..19acdbc0dbc
--- /dev/null
+++ b/cumulus/zombienet/zombienet-sdk/README.md
@@ -0,0 +1,19 @@
+# How to run locally
+
+As a prerequisite, the `test-parachain` and `polkadot` binaries need to be installed or available under `$PATH`.
+
+```
+# install test-parachain
+cargo install --path ./cumulus/test/service --locked --release
+# install polkadot
+cargo install --path ./polkadot --locked --release
+```
+
+The following command launches the tests:
+
+```
+ZOMBIE_PROVIDER=native cargo test --release -p cumulus-zombienet-sdk-tests
+```
+
+In addition, you can specify a base directory with `ZOMBIENET_SDK_BASE_DIR=/my/dir/of/choice`. All chain files and logs
+will be placed in that directory.
diff --git a/cumulus/zombienet/zombienet-sdk/src/lib.rs b/cumulus/zombienet/zombienet-sdk/src/lib.rs
new file mode 100644
index 00000000000..fe0aa995d77
--- /dev/null
+++ b/cumulus/zombienet/zombienet-sdk/src/lib.rs
@@ -0,0 +1,2 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
diff --git a/cumulus/zombienet/zombienet-sdk/tests/elastic_scaling/elastic_scaling_multiple_blocks_per_slot.rs b/cumulus/zombienet/zombienet-sdk/tests/elastic_scaling/elastic_scaling_multiple_blocks_per_slot.rs
new file mode 100644
index 00000000000..660440fb799
--- /dev/null
+++ b/cumulus/zombienet/zombienet-sdk/tests/elastic_scaling/elastic_scaling_multiple_blocks_per_slot.rs
@@ -0,0 +1,137 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+use anyhow::anyhow;
+
+use cumulus_zombienet_sdk_helpers::{
+	assert_finality_lag, assert_para_throughput, create_assign_core_call,
+};
+use polkadot_primitives::Id as ParaId;
+use serde_json::json;
+use subxt::{OnlineClient, PolkadotConfig};
+use subxt_signer::sr25519::dev;
+use zombienet_sdk::{NetworkConfig, NetworkConfigBuilder};
+
+const PARA_ID: u32 = 2400;
+
+/// This test spawns a parachain network.
+/// Initially, one core is assigned. We expect the parachain to produce 1 block per relay.
+/// As we increase the number of cores via `assign_core`, we expect the block pace to increase too.
+/// **Note:** The runtime in use here has 6s slot duration, so multiple blocks will be produced per
+/// slot.
+#[tokio::test(flavor = "multi_thread")]
+async fn elastic_scaling_multiple_block_per_slot() -> Result<(), anyhow::Error> {
+	let _ = env_logger::try_init_from_env(
+		env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
+	);
+
+	let config = build_network_config().await?;
+
+	let spawn_fn = zombienet_sdk::environment::get_spawn_fn();
+	let network = spawn_fn(config).await?;
+
+	let relay_node = network.get_node("validator-0")?;
+	let para_node_elastic = network.get_node("collator-1")?;
+
+	let relay_client: OnlineClient<PolkadotConfig> = relay_node.wait_client().await?;
+	let alice = dev::alice();
+	assert_para_throughput(
+		&relay_client,
+		10,
+		[(ParaId::from(PARA_ID), 8..11)].into_iter().collect(),
+	)
+	.await?;
+	assert_finality_lag(&para_node_elastic.wait_client().await?, 5).await?;
+
+	let assign_cores_call = create_assign_core_call(&[(2, PARA_ID), (3, PARA_ID)]);
+
+	relay_client
+		.tx()
+		.sign_and_submit_then_watch_default(&assign_cores_call, &alice)
+		.await
+		.inspect(|_| log::info!("Tx send, waiting for finalization"))?
+		.wait_for_finalized_success()
+		.await?;
+	log::info!("2 more cores assigned to each parachain");
+
+	assert_para_throughput(
+		&relay_client,
+		15,
+		[(ParaId::from(PARA_ID), 39..46)].into_iter().collect(),
+	)
+	.await?;
+	assert_finality_lag(&para_node_elastic.wait_client().await?, 20).await?;
+
+	let assign_cores_call = create_assign_core_call(&[(4, PARA_ID), (5, PARA_ID), (6, PARA_ID)]);
+	// Assign two extra cores to each parachain.
+	relay_client
+		.tx()
+		.sign_and_submit_then_watch_default(&assign_cores_call, &alice)
+		.await?
+		.wait_for_finalized_success()
+		.await?;
+	log::info!("3 more cores assigned to each parachain");
+
+	assert_para_throughput(
+		&relay_client,
+		10,
+		[(ParaId::from(PARA_ID), 52..61)].into_iter().collect(),
+	)
+	.await?;
+	assert_finality_lag(&para_node_elastic.wait_client().await?, 30).await?;
+	log::info!("Test finished successfully");
+	Ok(())
+}
+
+async fn build_network_config() -> Result<NetworkConfig, anyhow::Error> {
+	let images = zombienet_sdk::environment::get_images_from_env();
+	log::info!("Using images: {images:?}");
+	NetworkConfigBuilder::new()
+		.with_relaychain(|r| {
+			let r = r
+				.with_chain("rococo-local")
+				.with_default_command("polkadot")
+				.with_default_image(images.polkadot.as_str())
+				.with_default_args(vec![("-lparachain=trace").into()])
+				.with_default_resources(|resources| {
+					resources.with_request_cpu(2).with_request_memory("2G")
+				})
+				.with_genesis_overrides(json!({
+					"configuration": {
+						"config": {
+							"scheduler_params": {
+								"num_cores": 7,
+								"max_validators_per_core": 1
+							}
+						}
+					}
+				}))
+				// Have to set a `with_node` outside of the loop below, so that `r` has the right
+				// type.
+				.with_node(|node| node.with_name("validator-0"));
+			(1..9).fold(r, |acc, i| acc.with_node(|node| node.with_name(&format!("validator-{i}"))))
+		})
+		.with_parachain(|p| {
+			p.with_id(PARA_ID)
+				.with_default_command("test-parachain")
+				.with_default_image(images.cumulus.as_str())
+				.with_chain("elastic-scaling-multi-block-slot")
+				.with_default_args(vec![
+					("--authoring").into(),
+					("slot-based").into(),
+					("-lparachain=trace,aura=debug").into(),
+				])
+				.with_collator(|n| n.with_name("collator-0"))
+				.with_collator(|n| n.with_name("collator-1"))
+				.with_collator(|n| n.with_name("collator-2"))
+		})
+		.with_global_settings(|global_settings| match std::env::var("ZOMBIENET_SDK_BASE_DIR") {
+			Ok(val) => global_settings.with_base_dir(val),
+			_ => global_settings,
+		})
+		.build()
+		.map_err(|e| {
+			let errs = e.into_iter().map(|e| e.to_string()).collect::<Vec<_>>().join(" ");
+			anyhow!("config errs: {errs}")
+		})
+}
diff --git a/cumulus/zombienet/zombienet-sdk/tests/elastic_scaling/mod.rs b/cumulus/zombienet/zombienet-sdk/tests/elastic_scaling/mod.rs
new file mode 100644
index 00000000000..658c4af684e
--- /dev/null
+++ b/cumulus/zombienet/zombienet-sdk/tests/elastic_scaling/mod.rs
@@ -0,0 +1,4 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+mod elastic_scaling_multiple_blocks_per_slot;
diff --git a/cumulus/zombienet/zombienet-sdk/tests/lib.rs b/cumulus/zombienet/zombienet-sdk/tests/lib.rs
new file mode 100644
index 00000000000..55df3e6c0bd
--- /dev/null
+++ b/cumulus/zombienet/zombienet-sdk/tests/lib.rs
@@ -0,0 +1,5 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+#[cfg(feature = "zombie-ci")]
+mod elastic_scaling;
diff --git a/docs/sdk/src/guides/enable_elastic_scaling_mvp.rs b/docs/sdk/src/guides/enable_elastic_scaling_mvp.rs
index 2339088abed..1047af86de3 100644
--- a/docs/sdk/src/guides/enable_elastic_scaling_mvp.rs
+++ b/docs/sdk/src/guides/enable_elastic_scaling_mvp.rs
@@ -72,7 +72,7 @@
 //!
 //! <div class="warning">Phase 1 is NOT needed if using the <code>polkadot-parachain</code> or
 //! <code>polkadot-omni-node</code> binary, or <code>polkadot-omni-node-lib</code> built from the
-//! latest polkadot-sdk release! Simply pass the <code>--experimental-use-slot-based</code>
+//! latest polkadot-sdk release! Simply pass the <code>--authoring slot-based</code>
 //! ([`polkadot_omni_node_lib::cli::Cli::experimental_use_slot_based`]) parameter to the command
 //! line and jump to Phase 2.</div>
 //!
@@ -92,7 +92,7 @@
 //!     - Remove the `overseer_handle` param (also remove the
 //!     `OverseerHandle` type import if it’s not used elsewhere).
 //!     - Rename `AuraParams` to `SlotBasedParams`, remove the `overseer_handle` field and add a
-//!     `slot_drift` field with a   value of `Duration::from_secs(1)`.
+//!     `slot_offset` field with a   value of `Duration::from_secs(1)`.
 //!     - Replace the single future returned by `aura::run` with the two futures returned by it and
 //!     spawn them as separate tasks:
 #![doc = docify::embed!("../../cumulus/polkadot-omni-node/lib/src/nodes/aura.rs", launch_slot_based_collator)]
diff --git a/polkadot/zombienet-sdk-tests/Cargo.toml b/polkadot/zombienet-sdk-tests/Cargo.toml
index 1c88455cff4..35717cdc679 100644
--- a/polkadot/zombienet-sdk-tests/Cargo.toml
+++ b/polkadot/zombienet-sdk-tests/Cargo.toml
@@ -10,6 +10,7 @@ publish = false
 [dependencies]
 anyhow = { workspace = true }
 codec = { workspace = true, features = ["derive"] }
+cumulus-zombienet-sdk-helpers = { workspace = true }
 env_logger = { workspace = true }
 log = { workspace = true }
 polkadot-primitives.workspace = true
diff --git a/polkadot/zombienet-sdk-tests/tests/elastic_scaling/slot_based_12cores.rs b/polkadot/zombienet-sdk-tests/tests/elastic_scaling/slot_based_12cores.rs
index 4d0e1adad08..ed9bad30340 100644
--- a/polkadot/zombienet-sdk-tests/tests/elastic_scaling/slot_based_12cores.rs
+++ b/polkadot/zombienet-sdk-tests/tests/elastic_scaling/slot_based_12cores.rs
@@ -6,12 +6,8 @@
 
 use anyhow::anyhow;
 
-use crate::helpers::{
-	assert_finalized_block_height, assert_para_throughput, rococo,
-	rococo::runtime_types::{
-		pallet_broker::coretime_interface::CoreAssignment,
-		polkadot_runtime_parachains::assigner_coretime::PartsOf57600,
-	},
+use cumulus_zombienet_sdk_helpers::{
+	assert_finality_lag, assert_para_throughput, create_assign_core_call,
 };
 use polkadot_primitives::Id as ParaId;
 use serde_json::json;
@@ -61,7 +57,7 @@ async fn slot_based_12cores_test() -> Result<(), anyhow::Error> {
 				.with_default_image(images.cumulus.as_str())
 				.with_chain("elastic-scaling-500ms")
 				.with_default_args(vec![
-					("--experimental-use-slot-based").into(),
+					"--authoring=slot-based".into(),
 					("-lparachain=debug,aura=debug").into(),
 				])
 				.with_collator(|n| n.with_name("collator-elastic"))
@@ -82,26 +78,12 @@ async fn slot_based_12cores_test() -> Result<(), anyhow::Error> {
 	let alice = dev::alice();
 
 	// Assign 11 extra cores to the parachain.
+	let cores = (0..11).map(|idx| (idx, 2300)).collect::<Vec<(u32, u32)>>();
 
+	let assign_cores_call = create_assign_core_call(&cores);
 	relay_client
 		.tx()
-		.sign_and_submit_then_watch_default(
-			&rococo::tx()
-				.sudo()
-				.sudo(rococo::runtime_types::rococo_runtime::RuntimeCall::Utility(
-					rococo::runtime_types::pallet_utility::pallet::Call::batch {
-						calls: (0..11).map(|idx| rococo::runtime_types::rococo_runtime::RuntimeCall::Coretime(
-                            rococo::runtime_types::polkadot_runtime_parachains::coretime::pallet::Call::assign_core {
-                                core: idx,
-                                begin: 0,
-                                assignment: vec![(CoreAssignment::Task(2300), PartsOf57600(57600))],
-                                end_hint: None
-                            }
-                        )).collect()
-					},
-				)),
-			&alice,
-		)
+		.sign_and_submit_then_watch_default(&assign_cores_call, &alice)
 		.await?
 		.wait_for_finalized_success()
 		.await?;
@@ -121,7 +103,7 @@ async fn slot_based_12cores_test() -> Result<(), anyhow::Error> {
 
 	// Assert the parachain finalized block height is also on par with the number of backed
 	// candidates.
-	assert_finalized_block_height(&para_node.wait_client().await?, 158..181).await?;
+	assert_finality_lag(&para_node.wait_client().await?, 60).await?;
 
 	log::info!("Test finished successfully");
 
diff --git a/polkadot/zombienet-sdk-tests/tests/elastic_scaling/slot_based_3cores.rs b/polkadot/zombienet-sdk-tests/tests/elastic_scaling/slot_based_3cores.rs
index aa1e54d7da5..422feb4d20c 100644
--- a/polkadot/zombienet-sdk-tests/tests/elastic_scaling/slot_based_3cores.rs
+++ b/polkadot/zombienet-sdk-tests/tests/elastic_scaling/slot_based_3cores.rs
@@ -6,12 +6,8 @@
 
 use anyhow::anyhow;
 
-use crate::helpers::{
-	assert_finalized_block_height, assert_para_throughput, rococo,
-	rococo::runtime_types::{
-		pallet_broker::coretime_interface::CoreAssignment,
-		polkadot_runtime_parachains::assigner_coretime::PartsOf57600,
-	},
+use cumulus_zombienet_sdk_helpers::{
+	assert_finality_lag, assert_para_throughput, create_assign_core_call,
 };
 use polkadot_primitives::Id as ParaId;
 use serde_json::json;
@@ -60,7 +56,7 @@ async fn slot_based_3cores_test() -> Result<(), anyhow::Error> {
 				.with_default_image(images.cumulus.as_str())
 				.with_chain("elastic-scaling-mvp")
 				.with_default_args(vec![
-					("--experimental-use-slot-based").into(),
+					"--authoring=slot-based".into(),
 					("-lparachain=debug,aura=debug").into(),
 				])
 				.with_collator(|n| n.with_name("collator-elastic-mvp"))
@@ -73,7 +69,7 @@ async fn slot_based_3cores_test() -> Result<(), anyhow::Error> {
 				.with_default_image(images.cumulus.as_str())
 				.with_chain("elastic-scaling")
 				.with_default_args(vec![
-					("--experimental-use-slot-based").into(),
+					"--authoring=slot-based".into(),
 					("-lparachain=debug,aura=debug").into(),
 				])
 				.with_collator(|n| n.with_name("collator-elastic"))
@@ -94,52 +90,11 @@ async fn slot_based_3cores_test() -> Result<(), anyhow::Error> {
 	let relay_client: OnlineClient<PolkadotConfig> = relay_node.wait_client().await?;
 	let alice = dev::alice();
 
+	let assign_cores_call = create_assign_core_call(&[(0, 2100), (1, 2100), (2, 2200), (3, 2200)]);
 	// Assign two extra cores to each parachain.
 	relay_client
 		.tx()
-		.sign_and_submit_then_watch_default(
-			&rococo::tx()
-				.sudo()
-				.sudo(rococo::runtime_types::rococo_runtime::RuntimeCall::Utility(
-					rococo::runtime_types::pallet_utility::pallet::Call::batch {
-						calls: vec![
-							rococo::runtime_types::rococo_runtime::RuntimeCall::Coretime(
-								rococo::runtime_types::polkadot_runtime_parachains::coretime::pallet::Call::assign_core {
-									core: 0,
-									begin: 0,
-									assignment: vec![(CoreAssignment::Task(2100), PartsOf57600(57600))],
-									end_hint: None
-								}
-							),
-							rococo::runtime_types::rococo_runtime::RuntimeCall::Coretime(
-								rococo::runtime_types::polkadot_runtime_parachains::coretime::pallet::Call::assign_core {
-									core: 1,
-									begin: 0,
-									assignment: vec![(CoreAssignment::Task(2100), PartsOf57600(57600))],
-									end_hint: None
-								}
-							),
-							rococo::runtime_types::rococo_runtime::RuntimeCall::Coretime(
-								rococo::runtime_types::polkadot_runtime_parachains::coretime::pallet::Call::assign_core {
-									core: 2,
-									begin: 0,
-									assignment: vec![(CoreAssignment::Task(2200), PartsOf57600(57600))],
-									end_hint: None
-								}
-							),
-							rococo::runtime_types::rococo_runtime::RuntimeCall::Coretime(
-								rococo::runtime_types::polkadot_runtime_parachains::coretime::pallet::Call::assign_core {
-									core: 3,
-									begin: 0,
-									assignment: vec![(CoreAssignment::Task(2200), PartsOf57600(57600))],
-									end_hint: None
-								}
-							)
-						],
-					},
-				)),
-			&alice,
-		)
+		.sign_and_submit_then_watch_default(&assign_cores_call, &alice)
 		.await?
 		.wait_for_finalized_success()
 		.await?;
@@ -161,8 +116,8 @@ async fn slot_based_3cores_test() -> Result<(), anyhow::Error> {
 
 	// Assert the parachain finalized block height is also on par with the number of backed
 	// candidates.
-	assert_finalized_block_height(&para_node_elastic.wait_client().await?, 36..46).await?;
-	assert_finalized_block_height(&para_node_elastic_mvp.wait_client().await?, 36..46).await?;
+	assert_finality_lag(&para_node_elastic.wait_client().await?, 15).await?;
+	assert_finality_lag(&para_node_elastic_mvp.wait_client().await?, 15).await?;
 
 	log::info!("Test finished successfully");
 
diff --git a/polkadot/zombienet_tests/functional/0018-shared-core-idle-parachain.toml b/polkadot/zombienet_tests/functional/0018-shared-core-idle-parachain.toml
index 050c1f01923..af4315879f6 100644
--- a/polkadot/zombienet_tests/functional/0018-shared-core-idle-parachain.toml
+++ b/polkadot/zombienet_tests/functional/0018-shared-core-idle-parachain.toml
@@ -2,23 +2,23 @@
 timeout = 1000
 
 [relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params]
-  max_validators_per_core = 2
-  num_cores = 4
-  group_rotation_frequency = 4
+max_validators_per_core = 2
+num_cores = 4
+group_rotation_frequency = 4
 
 
 [relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params]
-  needed_approvals = 3
+needed_approvals = 3
 
 [relaychain]
 default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}"
 chain = "rococo-local"
 command = "polkadot"
 
-  [[relaychain.node_groups]]
-  name = "validator"
-  args = ["-lruntime=debug,parachain=debug"]
-  count = 4
+[[relaychain.node_groups]]
+name = "validator"
+args = ["-lruntime=debug,parachain=debug"]
+count = 4
 
 [[parachains]]
 id = 2000
@@ -26,13 +26,13 @@ register_para = false
 onboard_as_parachain = false
 add_to_genesis = false
 chain = "glutton-westend-local-2000"
-    [parachains.genesis.runtimeGenesis.patch.glutton]
-    compute = "50000000"
-    storage = "2500000000"
-    trashDataCount = 5120
+[parachains.genesis.runtimeGenesis.patch.glutton]
+compute = "50000000"
+storage = "2500000000"
+trashDataCount = 5120
 
-    [parachains.collator]
-    name = "collator-2000"
-    image = "{{CUMULUS_IMAGE}}"
-    command = "polkadot-parachain"
-    args = ["-lparachain=debug", "--experimental-use-slot-based"]
+[parachains.collator]
+name = "collator-2000"
+image = "{{CUMULUS_IMAGE}}"
+command = "polkadot-parachain"
+args = ["-lparachain=debug", "--authoring", "slot-based"]
diff --git a/polkadot/zombienet_tests/functional/0019-coretime-collation-fetching-fairness.toml b/polkadot/zombienet_tests/functional/0019-coretime-collation-fetching-fairness.toml
index f9028b930cf..43a7d8fc747 100644
--- a/polkadot/zombienet_tests/functional/0019-coretime-collation-fetching-fairness.toml
+++ b/polkadot/zombienet_tests/functional/0019-coretime-collation-fetching-fairness.toml
@@ -2,21 +2,21 @@
 timeout = 1000
 
 [relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params]
-  max_validators_per_core = 4
-  num_cores = 1
+max_validators_per_core = 4
+num_cores = 1
 
 [relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params]
-  needed_approvals = 3
+needed_approvals = 3
 
 [relaychain]
 default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}"
 chain = "rococo-local"
 command = "polkadot"
 
-  [[relaychain.node_groups]]
-  name = "validator"
-  args = ["-lparachain=debug,parachain::collator-protocol=trace" ]
-  count = 4
+[[relaychain.node_groups]]
+name = "validator"
+args = ["-lparachain=debug,parachain::collator-protocol=trace"]
+count = 4
 
 [[parachains]]
 id = 2000
@@ -24,16 +24,16 @@ register_para = false
 onboard_as_parachain = false
 add_to_genesis = false
 chain = "glutton-westend-local-2000"
-    [parachains.genesis.runtimeGenesis.patch.glutton]
-    compute = "50000000"
-    storage = "2500000000"
-    trashDataCount = 5120
+[parachains.genesis.runtimeGenesis.patch.glutton]
+compute = "50000000"
+storage = "2500000000"
+trashDataCount = 5120
 
-    [parachains.collator]
-    name = "collator-2000"
-    image = "{{CUMULUS_IMAGE}}"
-    command = "polkadot-parachain"
-    args = ["-lparachain=debug,parachain::collator-protocol=trace", "--experimental-use-slot-based"]
+[parachains.collator]
+name = "collator-2000"
+image = "{{CUMULUS_IMAGE}}"
+command = "polkadot-parachain"
+args = ["-lparachain=debug,parachain::collator-protocol=trace", "--authoring", "slot-based"]
 
 [[parachains]]
 id = 2001
@@ -41,13 +41,13 @@ register_para = false
 onboard_as_parachain = false
 add_to_genesis = false
 chain = "glutton-westend-local-2001"
-    [parachains.genesis.runtimeGenesis.patch.glutton]
-    compute = "50000000"
-    storage = "2500000000"
-    trashDataCount = 5120
-
-    [parachains.collator]
-    name = "collator-2001"
-    image = "{{CUMULUS_IMAGE}}"
-    command = "polkadot-parachain"
-    args = ["-lparachain=debug"]
+[parachains.genesis.runtimeGenesis.patch.glutton]
+compute = "50000000"
+storage = "2500000000"
+trashDataCount = 5120
+
+[parachains.collator]
+name = "collator-2001"
+image = "{{CUMULUS_IMAGE}}"
+command = "polkadot-parachain"
+args = ["-lparachain=debug"]
diff --git a/prdoc/pr_7569.prdoc b/prdoc/pr_7569.prdoc
new file mode 100644
index 00000000000..69dba1251f2
--- /dev/null
+++ b/prdoc/pr_7569.prdoc
@@ -0,0 +1,25 @@
+title: 'slot-based-collator: Allow multiple blocks per slot'
+doc:
+- audience: Node Operator
+  description:
+    Adds multiple blocks per slot support to the slot-based collator. This PR deprecates
+    the `--experimental-use-slot-based` flag in favor of `--authoring slot-based`. The deprecated flag will be removed
+    in the next release.
+
+    Parachain runtimes using the `FixedVelocityConsensusHook` now no longer support building blocks with slots
+    shorter than 6 seconds. We advise elastic-scaling chains to use the mechanisms introduced in this PR and produce
+    multiple blocks in a single slot.
+crates:
+- name: cumulus-client-consensus-aura
+  bump: major
+- name: cumulus-pallet-aura-ext
+  bump: major
+  validate: false
+- name: cumulus-pov-validator
+  bump: none
+  validate: false
+- name: polkadot-omni-node-lib
+  bump: major
+- name: polkadot
+  bump: none
+  validate: false
diff --git a/prdoc/pr_7585.prdoc b/prdoc/pr_7585.prdoc
new file mode 100644
index 00000000000..1d6b69e8df6
--- /dev/null
+++ b/prdoc/pr_7585.prdoc
@@ -0,0 +1,11 @@
+title: 'Add export PoV on slot base collator'
+doc:
+- audience: [Node Dev, Node Operator]
+  description: Add functionality to export the Proof of Validity (PoV) when the slot-based collator is used.
+crates:
+- name: cumulus-test-service
+  bump: major
+- name: cumulus-client-consensus-aura
+  bump: major
+- name: polkadot-omni-node-lib
+  bump: major
\ No newline at end of file
-- 
GitLab