diff --git a/.gitlab/pipeline/zombienet/polkadot.yml b/.gitlab/pipeline/zombienet/polkadot.yml
index e09be328c81d07cd8d11cde93f02814b34a30475..4d8d4947daa5ca627c146779e91769e4b104dc5d 100644
--- a/.gitlab/pipeline/zombienet/polkadot.yml
+++ b/.gitlab/pipeline/zombienet/polkadot.yml
@@ -464,3 +464,19 @@ zombienet-polkadot-functional-async-backing-6-seconds-rate:
     - unset NEXTEST_FAILURE_OUTPUT
     - unset NEXTEST_SUCCESS_OUTPUT
     - cargo nextest run --archive-file ./artifacts/polkadot-zombienet-tests.tar.zst --no-capture -- functional::async_backing_6_seconds_rate::async_backing_6_seconds_rate_test
+
+zombienet-polkadot-functional-duplicate-collations:
+  extends:
+    - .zombienet-polkadot-common
+  needs:
+    - job: build-polkadot-zombienet-tests
+      artifacts: true
+  before_script:
+    - !reference [ ".zombienet-polkadot-common", "before_script" ]
+    - export POLKADOT_IMAGE="${ZOMBIENET_INTEGRATION_TEST_IMAGE}"
+    - export X_INFRA_INSTANCE=spot # use spot by default
+  script:
+    # we want to use `--no-capture` in zombienet tests.
+    - unset NEXTEST_FAILURE_OUTPUT
+    - unset NEXTEST_SUCCESS_OUTPUT
+    - cargo nextest run --archive-file ./artifacts/polkadot-zombienet-tests.tar.zst --no-capture -- functional::duplicate_collations::duplicate_collations_test
diff --git a/Cargo.lock b/Cargo.lock
index 054b03578f453d476fd6b06145a671227f3f3795..c5633437d42f72afccb05d8ac15d89ef750e5897 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -29527,6 +29527,7 @@ dependencies = [
  "log",
  "parity-scale-codec",
  "polkadot-parachain-primitives 6.0.0",
+ "polkadot-primitives 7.0.0",
  "sp-io 30.0.0",
  "substrate-wasm-builder 17.0.0",
  "tiny-keccak",
@@ -29542,6 +29543,7 @@ dependencies = [
  "log",
  "parity-scale-codec",
  "polkadot-cli",
+ "polkadot-erasure-coding",
  "polkadot-node-core-pvf",
  "polkadot-node-primitives",
  "polkadot-node-subsystem",
@@ -29550,6 +29552,7 @@ dependencies = [
  "polkadot-service",
  "polkadot-test-service",
  "sc-cli",
+ "sc-client-api",
  "sc-service",
  "sp-core 28.0.0",
  "sp-keyring 31.0.0",
diff --git a/polkadot/node/test/service/src/chain_spec.rs b/polkadot/node/test/service/src/chain_spec.rs
index ae4e84b7725e50348a2d635e0e340614dd445043..ef83c4795dc6889895dbf03e9e87c36161e6bb1e 100644
--- a/polkadot/node/test/service/src/chain_spec.rs
+++ b/polkadot/node/test/service/src/chain_spec.rs
@@ -18,7 +18,8 @@
 
 use pallet_staking::Forcing;
 use polkadot_primitives::{
-	AccountId, AssignmentId, SchedulerParams, ValidatorId, MAX_CODE_SIZE, MAX_POV_SIZE,
+	node_features, AccountId, AssignmentId, NodeFeatures, SchedulerParams, ValidatorId,
+	MAX_CODE_SIZE, MAX_POV_SIZE,
 };
 use polkadot_service::chain_spec::Extensions;
 use polkadot_test_runtime::BABE_GENESIS_EPOCH_CONFIG;
@@ -110,6 +111,11 @@ fn polkadot_testnet_genesis(
 	const ENDOWMENT: u128 = 1_000_000 * DOTS;
 	const STASH: u128 = 100 * DOTS;
 
+	// Prepare node features with V2 receipts enabled.
+	let mut node_features = NodeFeatures::new();
+	node_features.resize(node_features::FeatureIndex::CandidateReceiptV2 as usize + 1, false);
+	node_features.set(node_features::FeatureIndex::CandidateReceiptV2 as u8 as usize, true);
+
 	serde_json::json!({
 		"balances": {
 			"balances": endowed_accounts.iter().map(|k| (k.clone(), ENDOWMENT)).collect::<Vec<_>>(),
@@ -158,6 +164,7 @@ fn polkadot_testnet_genesis(
 				no_show_slots: 10,
 				minimum_validation_upgrade_delay: 5,
 				max_downward_message_size: 1024,
+				node_features,
 				scheduler_params: SchedulerParams {
 					group_rotation_frequency: 20,
 					paras_availability_period: 4,
diff --git a/polkadot/parachain/test-parachains/undying/Cargo.toml b/polkadot/parachain/test-parachains/undying/Cargo.toml
index 43b5a3352434feff95274d4c5b0435829e390573..f8dfc8936c0ef3bd7abd7e85fe3094c9318edb02 100644
--- a/polkadot/parachain/test-parachains/undying/Cargo.toml
+++ b/polkadot/parachain/test-parachains/undying/Cargo.toml
@@ -16,6 +16,7 @@ codec = { features = ["derive"], workspace = true }
 dlmalloc = { features = ["global"], workspace = true }
 log = { workspace = true }
 polkadot-parachain-primitives = { features = ["wasm-api"], workspace = true }
+polkadot-primitives = { workspace = true, default-features = false }
 tiny-keccak = { features = ["keccak"], workspace = true }
 
 # We need to make sure the global allocator is disabled until we have support of full substrate externalities
@@ -30,5 +31,6 @@ std = [
 	"codec/std",
 	"log/std",
 	"polkadot-parachain-primitives/std",
+	"polkadot-primitives/std",
 	"sp-io/std",
 ]
diff --git a/polkadot/parachain/test-parachains/undying/collator/Cargo.toml b/polkadot/parachain/test-parachains/undying/collator/Cargo.toml
index f4e6d4e585427f530dd28d631e7d022e81070b12..e26b9f59acd4bf7724c9308dda6827c95e8a8651 100644
--- a/polkadot/parachain/test-parachains/undying/collator/Cargo.toml
+++ b/polkadot/parachain/test-parachains/undying/collator/Cargo.toml
@@ -22,6 +22,7 @@ futures-timer = { workspace = true }
 log = { workspace = true, default-features = true }
 
 polkadot-cli = { workspace = true, default-features = true }
+polkadot-erasure-coding = { workspace = true, default-features = true }
 polkadot-node-primitives = { workspace = true, default-features = true }
 polkadot-node-subsystem = { workspace = true, default-features = true }
 polkadot-primitives = { workspace = true, default-features = true }
@@ -29,6 +30,7 @@ polkadot-service = { features = ["rococo-native"], workspace = true, default-fea
 test-parachain-undying = { workspace = true }
 
 sc-cli = { workspace = true, default-features = true }
+sc-client-api = { workspace = true, default-features = true }
 sc-service = { workspace = true, default-features = true }
 sp-core = { workspace = true, default-features = true }
 
diff --git a/polkadot/parachain/test-parachains/undying/collator/src/cli.rs b/polkadot/parachain/test-parachains/undying/collator/src/cli.rs
index 9572887a51a2a195a01e6ceced60ee711288ead2..a3de7c80d214a2e9c0669dacdd33be65bdf9ab18 100644
--- a/polkadot/parachain/test-parachains/undying/collator/src/cli.rs
+++ b/polkadot/parachain/test-parachains/undying/collator/src/cli.rs
@@ -61,6 +61,15 @@ pub struct ExportGenesisWasmCommand {
 	pub output: Option<PathBuf>,
 }
 
+/// Enum representing different types of malicious behaviors for collators.
+#[derive(Debug, Parser, Clone, PartialEq, clap::ValueEnum)]
+pub enum MalusType {
+	/// No malicious behavior.
+	None,
+	/// Submit the same collations to all assigned cores.
+	DuplicateCollations,
+}
+
 #[allow(missing_docs)]
 #[derive(Debug, Parser)]
 #[group(skip)]
@@ -81,6 +90,10 @@ pub struct RunCmd {
 	/// we compute per block.
 	#[arg(long, default_value_t = 1)]
 	pub pvf_complexity: u32,
+
+	/// Specifies the malicious behavior of the collator.
+	#[arg(long, value_enum, default_value_t = MalusType::None)]
+	pub malus_type: MalusType,
 }
 
 #[allow(missing_docs)]
diff --git a/polkadot/parachain/test-parachains/undying/collator/src/lib.rs b/polkadot/parachain/test-parachains/undying/collator/src/lib.rs
index 448c181ae062bbaac7dc1b8312d50d26f1643ea9..3d5724ae79ba8fcfd24f4bd56e6a7b0eaa23bdf4 100644
--- a/polkadot/parachain/test-parachains/undying/collator/src/lib.rs
+++ b/polkadot/parachain/test-parachains/undying/collator/src/lib.rs
@@ -17,14 +17,25 @@
 //! Collator for the `Undying` test parachain.
 
 use codec::{Decode, Encode};
-use futures::channel::oneshot;
+use futures::{channel::oneshot, StreamExt};
 use futures_timer::Delay;
+use polkadot_cli::ProvideRuntimeApi;
 use polkadot_node_primitives::{
-	maybe_compress_pov, Collation, CollationResult, CollationSecondedSignal, CollatorFn,
-	MaybeCompressedPoV, PoV, Statement,
+	maybe_compress_pov, AvailableData, Collation, CollationResult, CollationSecondedSignal,
+	CollatorFn, MaybeCompressedPoV, PoV, Statement, UpwardMessages,
 };
-use polkadot_primitives::{CollatorId, CollatorPair, Hash};
+use polkadot_node_subsystem::messages::CollatorProtocolMessage;
+use polkadot_primitives::{
+	vstaging::{
+		CandidateDescriptorV2, CandidateReceiptV2, ClaimQueueOffset, DEFAULT_CLAIM_QUEUE_OFFSET,
+	},
+	CandidateCommitments, CollatorId, CollatorPair, CoreIndex, Hash, Id as ParaId,
+	OccupiedCoreAssumption,
+};
+use polkadot_service::{Handle, NewFull, ParachainHost};
+use sc_client_api::client::BlockchainEvents;
 use sp_core::Pair;
+
 use std::{
 	collections::HashMap,
 	sync::{
@@ -37,6 +48,8 @@ use test_parachain_undying::{
 	execute, hash_state, BlockData, GraveyardState, HeadData, StateMismatch,
 };
 
+pub const LOG_TARGET: &str = "parachain::undying-collator";
+
 /// Default PoV size which also drives state size.
 const DEFAULT_POV_SIZE: usize = 1000;
 /// Default PVF time complexity - 1 signature per block.
@@ -52,19 +65,20 @@ fn calculate_head_and_state_for_number(
 	let mut graveyard = vec![0u8; graveyard_size * graveyard_size];
 	let zombies = 0;
 	let seal = [0u8; 32];
+	let core_selector_number = 0;
 
 	// Ensure a larger compressed PoV.
 	graveyard.iter_mut().enumerate().for_each(|(i, grave)| {
 		*grave = i as u8;
 	});
 
-	let mut state = GraveyardState { index, graveyard, zombies, seal };
+	let mut state = GraveyardState { index, graveyard, zombies, seal, core_selector_number };
 	let mut head =
 		HeadData { number: 0, parent_hash: Hash::default().into(), post_state: hash_state(&state) };
 
 	while head.number < number {
 		let block = BlockData { state, tombstones: 1_000, iterations: pvf_complexity };
-		let (new_head, new_state) = execute(head.hash(), head.clone(), block)?;
+		let (new_head, new_state, _) = execute(head.hash(), head.clone(), block)?;
 		head = new_head;
 		state = new_state;
 	}
@@ -99,13 +113,14 @@ impl State {
 		let mut graveyard = vec![0u8; graveyard_size * graveyard_size];
 		let zombies = 0;
 		let seal = [0u8; 32];
+		let core_selector_number = 0;
 
 		// Ensure a larger compressed PoV.
 		graveyard.iter_mut().enumerate().for_each(|(i, grave)| {
 			*grave = i as u8;
 		});
 
-		let state = GraveyardState { index, graveyard, zombies, seal };
+		let state = GraveyardState { index, graveyard, zombies, seal, core_selector_number };
 
 		let head_data =
 			HeadData { number: 0, parent_hash: Default::default(), post_state: hash_state(&state) };
@@ -123,7 +138,10 @@ impl State {
 	/// Advance the state and produce a new block based on the given `parent_head`.
 	///
 	/// Returns the new [`BlockData`] and the new [`HeadData`].
-	fn advance(&mut self, parent_head: HeadData) -> Result<(BlockData, HeadData), StateMismatch> {
+	fn advance(
+		&mut self,
+		parent_head: HeadData,
+	) -> Result<(BlockData, HeadData, UpwardMessages), StateMismatch> {
 		self.best_block = parent_head.number;
 
 		let state = if let Some(state) = self
@@ -144,14 +162,15 @@ impl State {
 		// Start with prev state and transaction to execute (place 1000 tombstones).
 		let block = BlockData { state, tombstones: 1000, iterations: self.pvf_complexity };
 
-		let (new_head, new_state) = execute(parent_head.hash(), parent_head, block.clone())?;
+		let (new_head, new_state, upward_messages) =
+			execute(parent_head.hash(), parent_head, block.clone())?;
 
 		let new_head_arc = Arc::new(new_head.clone());
 
 		self.head_to_state.insert(new_head_arc.clone(), new_state);
 		self.number_to_head.insert(new_head.number, new_head_arc);
 
-		Ok((block, new_head))
+		Ok((block, new_head, upward_messages))
 	}
 }
 
@@ -175,13 +194,18 @@ impl Collator {
 		let graveyard_size = ((pov_size / std::mem::size_of::<u8>()) as f64).sqrt().ceil() as usize;
 
 		log::info!(
+			target: LOG_TARGET,
 			"PoV target size: {} bytes. Graveyard size: ({} x {})",
 			pov_size,
 			graveyard_size,
-			graveyard_size
+			graveyard_size,
 		);
 
-		log::info!("PVF time complexity: {}", pvf_complexity);
+		log::info!(
+			target: LOG_TARGET,
+			"PVF time complexity: {}",
+			pvf_complexity,
+		);
 
 		Self {
 			state: Arc::new(Mutex::new(State::genesis(graveyard_size, pvf_complexity))),
@@ -232,21 +256,32 @@ impl Collator {
 		Box::new(move |relay_parent, validation_data| {
 			let parent = match HeadData::decode(&mut &validation_data.parent_head.0[..]) {
 				Err(err) => {
-					log::error!("Requested to build on top of malformed head-data: {:?}", err);
+					log::error!(
+						target: LOG_TARGET,
+						"Requested to build on top of malformed head-data: {:?}",
+						err,
+					);
 					return futures::future::ready(None).boxed()
 				},
 				Ok(p) => p,
 			};
 
-			let (block_data, head_data) = match state.lock().unwrap().advance(parent.clone()) {
-				Err(err) => {
-					log::error!("Unable to build on top of {:?}: {:?}", parent, err);
-					return futures::future::ready(None).boxed()
-				},
-				Ok(x) => x,
-			};
+			let (block_data, head_data, upward_messages) =
+				match state.lock().unwrap().advance(parent.clone()) {
+					Err(err) => {
+						log::error!(
+							target: LOG_TARGET,
+							"Unable to build on top of {:?}: {:?}",
+							parent,
+							err,
+						);
+						return futures::future::ready(None).boxed()
+					},
+					Ok(x) => x,
+				};
 
 			log::info!(
+				target: LOG_TARGET,
 				"created a new collation on relay-parent({}): {:?}",
 				relay_parent,
 				head_data,
@@ -256,7 +291,7 @@ impl Collator {
 			let pov = PoV { block_data: block_data.encode().into() };
 
 			let collation = Collation {
-				upward_messages: Default::default(),
+				upward_messages,
 				horizontal_messages: Default::default(),
 				new_validation_code: None,
 				head_data: head_data.encode().into(),
@@ -265,10 +300,15 @@ impl Collator {
 				hrmp_watermark: validation_data.relay_parent_number,
 			};
 
-			log::info!("Raw PoV size for collation: {} bytes", pov.block_data.0.len(),);
+			log::info!(
+				target: LOG_TARGET,
+				"Raw PoV size for collation: {} bytes",
+				pov.block_data.0.len(),
+			);
 			let compressed_pov = maybe_compress_pov(pov);
 
 			log::info!(
+				target: LOG_TARGET,
 				"Compressed PoV size for collation: {} bytes",
 				compressed_pov.block_data.0.len(),
 			);
@@ -285,8 +325,9 @@ impl Collator {
 							Statement::Seconded(s) if s.descriptor.pov_hash() == compressed_pov.hash(),
 						) {
 							log::error!(
+								target: LOG_TARGET,
 								"Seconded statement should match our collation: {:?}",
-								res.statement.payload()
+								res.statement.payload(),
 							);
 						}
 
@@ -330,6 +371,259 @@ impl Collator {
 			}
 		}
 	}
+
+	pub fn send_same_collations_to_all_assigned_cores(
+		&self,
+		full_node: &NewFull,
+		mut overseer_handle: Handle,
+		para_id: ParaId,
+	) {
+		let client = full_node.client.clone();
+
+		let collation_function =
+			self.create_collation_function(full_node.task_manager.spawn_handle());
+
+		full_node
+			.task_manager
+			.spawn_handle()
+			.spawn("malus-undying-collator", None, async move {
+				// Subscribe to relay chain block import notifications. In each iteration, build a
+				// collation in response to a block import notification and submits it to all cores
+				// assigned to the parachain.
+				let mut import_notifications = client.import_notification_stream();
+
+				while let Some(notification) = import_notifications.next().await {
+					let relay_parent = notification.hash;
+
+					// Get the list of cores assigned to the parachain.
+					let claim_queue = match client.runtime_api().claim_queue(relay_parent) {
+						Ok(claim_queue) => claim_queue,
+						Err(error) => {
+							log::error!(
+								target: LOG_TARGET,
+								"Failed to query claim queue runtime API: {error:?}",
+							);
+							continue;
+						},
+					};
+
+					let claim_queue_offset = ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET);
+
+					let scheduled_cores: Vec<CoreIndex> = claim_queue
+						.iter()
+						.filter_map(move |(core_index, paras)| {
+							paras.get(claim_queue_offset.0 as usize).and_then(|core_para_id| {
+								(core_para_id == &para_id).then_some(*core_index)
+							})
+						})
+						.collect();
+
+					if scheduled_cores.is_empty() {
+						log::info!(
+							target: LOG_TARGET,
+							"Scheduled cores is empty.",
+						);
+						continue;
+					}
+
+					if scheduled_cores.len() == 1 {
+						log::info!(
+							target: LOG_TARGET,
+							"Malus collator configured with duplicate collations, but only 1 core assigned. \
+							Collator will not do anything malicious.",
+						);
+					}
+
+					// Fetch validation data for the collation.
+					let validation_data = match client.runtime_api().persisted_validation_data(
+						relay_parent,
+						para_id,
+						OccupiedCoreAssumption::Included,
+					) {
+						Ok(Some(validation_data)) => validation_data,
+						Ok(None) => {
+							log::info!(
+								target: LOG_TARGET,
+								"Persisted validation data is None.",
+							);
+							continue;
+						},
+						Err(error) => {
+							log::error!(
+								target: LOG_TARGET,
+								"Failed to query persisted validation data runtime API: {error:?}",
+							);
+							continue;
+						},
+					};
+
+					// Generate the collation.
+					let collation =
+						match collation_function(relay_parent, &validation_data).await {
+							Some(collation) => collation,
+							None => {
+								log::info!(
+									target: LOG_TARGET,
+									"Collation result is None.",
+								);
+								continue;
+							},
+						}
+						.collation;
+
+					// Fetch the validation code hash.
+					let validation_code_hash = match client.runtime_api().validation_code_hash(
+						relay_parent,
+						para_id,
+						OccupiedCoreAssumption::Included,
+					) {
+						Ok(Some(validation_code_hash)) => validation_code_hash,
+						Ok(None) => {
+							log::info!(
+								target: LOG_TARGET,
+								"Validation code hash is None.",
+							);
+							continue;
+						},
+						Err(error) => {
+							log::error!(
+								target: LOG_TARGET,
+								"Failed to query validation code hash runtime API: {error:?}",
+							);
+							continue;
+						},
+					};
+
+					// Fetch the session index.
+					let session_index =
+						match client.runtime_api().session_index_for_child(relay_parent) {
+							Ok(session_index) => session_index,
+							Err(error) => {
+								log::error!(
+									target: LOG_TARGET,
+									"Failed to query session index for child runtime API: {error:?}",
+								);
+								continue;
+							},
+						};
+
+					let persisted_validation_data_hash = validation_data.hash();
+					let parent_head_data = validation_data.parent_head.clone();
+					let parent_head_data_hash = validation_data.parent_head.hash();
+
+					// Apply compression to the block data.
+					let pov = {
+						let pov = collation.proof_of_validity.into_compressed();
+						let encoded_size = pov.encoded_size();
+						let max_pov_size = validation_data.max_pov_size as usize;
+
+						// As long as `POV_BOMB_LIMIT` is at least `max_pov_size`, this ensures
+						// that honest collators never produce a PoV which is uncompressed.
+						//
+						// As such, honest collators never produce an uncompressed PoV which starts
+						// with a compression magic number, which would lead validators to
+						// reject the collation.
+						if encoded_size > max_pov_size {
+							log::error!(
+								target: LOG_TARGET,
+								"PoV size {encoded_size} exceeded maximum size of {max_pov_size}",
+							);
+							continue;
+						}
+
+						pov
+					};
+
+					let pov_hash = pov.hash();
+
+					// Fetch the session info.
+					let session_info =
+						match client.runtime_api().session_info(relay_parent, session_index) {
+							Ok(Some(session_info)) => session_info,
+							Ok(None) => {
+								log::info!(
+									target: LOG_TARGET,
+									"Session info is None.",
+								);
+								continue;
+							},
+							Err(error) => {
+								log::error!(
+									target: LOG_TARGET,
+									"Failed to query session info runtime API: {error:?}",
+								);
+								continue;
+							},
+						};
+
+					let n_validators = session_info.validators.len();
+
+					let available_data =
+						AvailableData { validation_data, pov: Arc::new(pov.clone()) };
+					let chunks = match polkadot_erasure_coding::obtain_chunks_v1(
+						n_validators,
+						&available_data,
+					) {
+						Ok(chunks) => chunks,
+						Err(error) => {
+							log::error!(
+								target: LOG_TARGET,
+								"Failed to obtain chunks v1: {error:?}",
+							);
+							continue;
+						},
+					};
+					let erasure_root = polkadot_erasure_coding::branches(&chunks).root();
+
+					let commitments = CandidateCommitments {
+						upward_messages: collation.upward_messages,
+						horizontal_messages: collation.horizontal_messages,
+						new_validation_code: collation.new_validation_code,
+						head_data: collation.head_data,
+						processed_downward_messages: collation.processed_downward_messages,
+						hrmp_watermark: collation.hrmp_watermark,
+					};
+
+					// Submit the same collation to all assigned cores.
+					for core_index in &scheduled_cores {
+						let candidate_receipt = CandidateReceiptV2 {
+							descriptor: CandidateDescriptorV2::new(
+								para_id,
+								relay_parent,
+								*core_index,
+								session_index,
+								persisted_validation_data_hash,
+								pov_hash,
+								erasure_root,
+								commitments.head_data.hash(),
+								validation_code_hash,
+							),
+							commitments_hash: commitments.hash(),
+						};
+
+						// We cannot use SubmitCollation here because it includes an additional
+						// check for the core index by calling `check_core_index`. This check
+						// enforces that the parachain always selects the correct core by comparing
+						// the descriptor and commitments core indexes. To bypass this check, we are
+						// simulating the behavior of SubmitCollation while skipping the core index
+						// validation.
+						overseer_handle
+							.send_msg(
+								CollatorProtocolMessage::DistributeCollation {
+									candidate_receipt,
+									parent_head_data_hash,
+									pov: pov.clone(),
+									parent_head_data: parent_head_data.clone(),
+									result_sender: None,
+									core_index: *core_index,
+								},
+								"Collator",
+							)
+							.await;
+					}
+				}
+			});
+	}
 }
 
 use sp_core::traits::SpawnNamed;
diff --git a/polkadot/parachain/test-parachains/undying/collator/src/main.rs b/polkadot/parachain/test-parachains/undying/collator/src/main.rs
index 017eefe5ee31e1695272ac63ad43567604152f03..9d993dd818b2f7486665103b83e7d613d8f86071 100644
--- a/polkadot/parachain/test-parachains/undying/collator/src/main.rs
+++ b/polkadot/parachain/test-parachains/undying/collator/src/main.rs
@@ -29,7 +29,7 @@ use std::{
 use test_parachain_undying_collator::Collator;
 
 mod cli;
-use cli::Cli;
+use cli::{Cli, MalusType};
 
 fn main() -> Result<()> {
 	let cli = Cli::from_args();
@@ -105,6 +105,7 @@ fn main() -> Result<()> {
 				.map_err(|e| e.to_string())?;
 				let mut overseer_handle = full_node
 					.overseer_handle
+					.clone()
 					.expect("Overseer handle should be initialized for collators");
 
 				let genesis_head_hex =
@@ -120,9 +121,16 @@ fn main() -> Result<()> {
 
 				let config = CollationGenerationConfig {
 					key: collator.collator_key(),
-					collator: Some(
-						collator.create_collation_function(full_node.task_manager.spawn_handle()),
-					),
+					// If the collator is malicious, disable the collation function
+					// (set to None) and manually handle collation submission later.
+					collator: if cli.run.malus_type == MalusType::None {
+						Some(
+							collator
+								.create_collation_function(full_node.task_manager.spawn_handle()),
+						)
+					} else {
+						None
+					},
 					para_id,
 				};
 				overseer_handle
@@ -133,6 +141,16 @@ fn main() -> Result<()> {
 					.send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator")
 					.await;
 
+				// If the collator is configured to behave maliciously, simulate the specified
+				// malicious behavior.
+				if cli.run.malus_type == MalusType::DuplicateCollations {
+					collator.send_same_collations_to_all_assigned_cores(
+						&full_node,
+						overseer_handle,
+						para_id,
+					);
+				}
+
 				Ok(full_node.task_manager)
 			})
 		},
diff --git a/polkadot/parachain/test-parachains/undying/collator/tests/integration.rs b/polkadot/parachain/test-parachains/undying/collator/tests/integration.rs
index b8e32b13bc9c78eed14b0a827212bd2316785802..866b2f888f84e86a67bb578269c3571dd84bf4f5 100644
--- a/polkadot/parachain/test-parachains/undying/collator/tests/integration.rs
+++ b/polkadot/parachain/test-parachains/undying/collator/tests/integration.rs
@@ -19,6 +19,12 @@
 
 // If this test is failing, make sure to run all tests with the `real-overseer` feature being
 // enabled.
+
+use polkadot_node_subsystem::TimeoutExt;
+use std::time::Duration;
+
+const TIMEOUT: Duration = Duration::from_secs(120);
+
 #[tokio::test(flavor = "multi_thread")]
 async fn collating_using_undying_collator() {
 	use polkadot_primitives::Id as ParaId;
@@ -82,8 +88,16 @@ async fn collating_using_undying_collator() {
 		.await;
 
 	// Wait until the parachain has 4 blocks produced.
-	collator.wait_for_blocks(4).await;
+	collator
+		.wait_for_blocks(4)
+		.timeout(TIMEOUT)
+		.await
+		.expect("Timed out waiting for 4 produced blocks");
 
 	// Wait until the collator received `12` seconded statements for its collations.
-	collator.wait_for_seconded_collations(12).await;
+	collator
+		.wait_for_seconded_collations(12)
+		.timeout(TIMEOUT)
+		.await
+		.expect("Timed out waiting for 12 seconded collations");
 }
diff --git a/polkadot/parachain/test-parachains/undying/src/lib.rs b/polkadot/parachain/test-parachains/undying/src/lib.rs
index e4ec7e99346bbef4f3ce2fdb03b6084c053926f8..4f014320d09bbd4f7045376016dfa564e65d1053 100644
--- a/polkadot/parachain/test-parachains/undying/src/lib.rs
+++ b/polkadot/parachain/test-parachains/undying/src/lib.rs
@@ -22,6 +22,10 @@ extern crate alloc;
 
 use alloc::vec::Vec;
 use codec::{Decode, Encode};
+use polkadot_parachain_primitives::primitives::UpwardMessages;
+use polkadot_primitives::vstaging::{
+	ClaimQueueOffset, CoreSelector, UMPSignal, DEFAULT_CLAIM_QUEUE_OFFSET, UMP_SEPARATOR,
+};
 use tiny_keccak::{Hasher as _, Keccak};
 
 #[cfg(not(feature = "std"))]
@@ -86,6 +90,8 @@ pub struct GraveyardState {
 	pub zombies: u64,
 	// Grave seal.
 	pub seal: [u8; 32],
+	// Increasing sequence number for core selector.
+	pub core_selector_number: u8,
 }
 
 /// Block data for this parachain.
@@ -119,6 +125,7 @@ pub fn execute_transaction(mut block_data: BlockData) -> GraveyardState {
 		// Chain hash the seals and burn CPU.
 		block_data.state.seal = hash_state(&block_data.state);
 	}
+	block_data.state.core_selector_number = block_data.state.core_selector_number.wrapping_add(1);
 
 	block_data.state
 }
@@ -133,7 +140,7 @@ pub fn execute(
 	parent_hash: [u8; 32],
 	parent_head: HeadData,
 	block_data: BlockData,
-) -> Result<(HeadData, GraveyardState), StateMismatch> {
+) -> Result<(HeadData, GraveyardState, UpwardMessages), StateMismatch> {
 	assert_eq!(parent_hash, parent_head.hash());
 
 	if hash_state(&block_data.state) != parent_head.post_state {
@@ -146,6 +153,16 @@ pub fn execute(
 		return Err(StateMismatch)
 	}
 
+	let mut upward_messages: UpwardMessages = Default::default();
+	upward_messages.force_push(UMP_SEPARATOR);
+	upward_messages.force_push(
+		UMPSignal::SelectCore(
+			CoreSelector(block_data.state.core_selector_number),
+			ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET),
+		)
+		.encode(),
+	);
+
 	// We need to clone the block data as the fn will mutate it's state.
 	let new_state = execute_transaction(block_data.clone());
 
@@ -156,5 +173,6 @@ pub fn execute(
 			post_state: hash_state(&new_state),
 		},
 		new_state,
+		upward_messages,
 	))
 }
diff --git a/polkadot/parachain/test-parachains/undying/src/wasm_validation.rs b/polkadot/parachain/test-parachains/undying/src/wasm_validation.rs
index 46b66aa518e490e117c6d190d52a4d4dc85574d7..42917484cfdc2f0622e30260cb0b7c979a64c9d9 100644
--- a/polkadot/parachain/test-parachains/undying/src/wasm_validation.rs
+++ b/polkadot/parachain/test-parachains/undying/src/wasm_validation.rs
@@ -31,13 +31,13 @@ pub extern "C" fn validate_block(params: *const u8, len: usize) -> u64 {
 
 	let parent_hash = crate::keccak256(&params.parent_head.0[..]);
 
-	let (new_head, _) =
+	let (new_head, _, upward_messages) =
 		crate::execute(parent_hash, parent_head, block_data).expect("Executes block");
 
 	polkadot_parachain_primitives::write_result(&ValidationResult {
 		head_data: GenericHeadData(new_head.encode()),
 		new_validation_code: None,
-		upward_messages: alloc::vec::Vec::new().try_into().expect("empty vec fits within bounds"),
+		upward_messages,
 		horizontal_messages: alloc::vec::Vec::new()
 			.try_into()
 			.expect("empty vec fits within bounds"),
diff --git a/polkadot/zombienet-sdk-tests/tests/functional/duplicate_collations.rs b/polkadot/zombienet-sdk-tests/tests/functional/duplicate_collations.rs
new file mode 100644
index 0000000000000000000000000000000000000000..43420692d32ed1999fc1d256b6baafa89389afba
--- /dev/null
+++ b/polkadot/zombienet-sdk-tests/tests/functional/duplicate_collations.rs
@@ -0,0 +1,154 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+// Test that a parachain using a malus undying collator, sending the same collation to all assigned
+// cores, does not break the relay chain and that blocks are included, backed by a normal collator.
+
+use anyhow::anyhow;
+
+use crate::helpers::{
+	assert_para_throughput, rococo,
+	rococo::runtime_types::{
+		pallet_broker::coretime_interface::CoreAssignment,
+		polkadot_runtime_parachains::assigner_coretime::PartsOf57600,
+	},
+};
+use polkadot_primitives::Id as ParaId;
+use serde_json::json;
+use subxt::{OnlineClient, PolkadotConfig};
+use subxt_signer::sr25519::dev;
+use zombienet_sdk::NetworkConfigBuilder;
+
+const VALIDATOR_COUNT: u8 = 3;
+
+#[tokio::test(flavor = "multi_thread")]
+async fn duplicate_collations_test() -> Result<(), anyhow::Error> {
+	let _ = env_logger::try_init_from_env(
+		env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
+	);
+
+	let images = zombienet_sdk::environment::get_images_from_env();
+
+	let config = NetworkConfigBuilder::new()
+		.with_relaychain(|r| {
+			let r = r
+				.with_chain("rococo-local")
+				.with_default_command("polkadot")
+				.with_default_image(images.polkadot.as_str())
+				.with_default_args(vec![("-lparachain=debug").into()])
+				.with_genesis_overrides(json!({
+					"configuration": {
+						"config": {
+							"scheduler_params": {
+								"num_cores": 2
+							},
+							"async_backing_params": {
+								"max_candidate_depth": 6
+							}
+						}
+					}
+				}))
+				// Have to set a `with_node` outside of the loop below, so that `r` has the right
+				// type.
+				.with_node(|node| node.with_name("validator-0"));
+
+			(1..VALIDATOR_COUNT)
+				.fold(r, |acc, i| acc.with_node(|node| node.with_name(&format!("validator-{i}"))))
+		})
+		.with_parachain(|p| {
+			p.with_id(2000)
+				.with_default_command("undying-collator")
+				.cumulus_based(false)
+				.with_default_image(
+					std::env::var("COL_IMAGE")
+						.unwrap_or("docker.io/paritypr/colander:latest".to_string())
+						.as_str(),
+				)
+				.with_collator(|n| {
+					n.with_name("normal-collator").with_args(vec![("-lparachain=debug").into()])
+				})
+				.with_collator(|n| {
+					n.with_name("malus-collator").with_args(vec![
+						("-lparachain=debug").into(),
+						("--malus-type=duplicate-collations").into(),
+					])
+				})
+		})
+		.build()
+		.map_err(|e| {
+			let errs = e.into_iter().map(|e| e.to_string()).collect::<Vec<_>>().join(" ");
+			anyhow!("config errs: {errs}")
+		})?;
+
+	let spawn_fn = zombienet_sdk::environment::get_spawn_fn();
+	let network = spawn_fn(config).await?;
+
+	let relay_node = network.get_node("validator-0")?;
+
+	let relay_client: OnlineClient<PolkadotConfig> = relay_node.wait_client().await?;
+	let alice = dev::alice();
+
+	// Assign two extra cores to parachain-2000.
+	relay_client
+		.tx()
+		.sign_and_submit_then_watch_default(
+			&rococo::tx()
+				.sudo()
+				.sudo(rococo::runtime_types::rococo_runtime::RuntimeCall::Utility(
+					rococo::runtime_types::pallet_utility::pallet::Call::batch {
+						calls: vec![
+							rococo::runtime_types::rococo_runtime::RuntimeCall::Coretime(
+								rococo::runtime_types::polkadot_runtime_parachains::coretime::pallet::Call::assign_core {
+									core: 0,
+									begin: 0,
+									assignment: vec![(CoreAssignment::Task(2000), PartsOf57600(57600))],
+									end_hint: None
+								}
+							),
+							rococo::runtime_types::rococo_runtime::RuntimeCall::Coretime(
+								rococo::runtime_types::polkadot_runtime_parachains::coretime::pallet::Call::assign_core {
+									core: 1,
+									begin: 0,
+									assignment: vec![(CoreAssignment::Task(2000), PartsOf57600(57600))],
+									end_hint: None
+								}
+							),
+						],
+					},
+				)),
+			&alice,
+		)
+		.await?
+		.wait_for_finalized_success()
+		.await?;
+
+	log::info!("2 more cores assigned to parachain-2000");
+
+	assert_para_throughput(&relay_client, 15, [(ParaId::from(2000), 40..46)].into_iter().collect())
+		.await?;
+
+	// Verify that all validators detect the malicious collator by checking their logs. This check
+	// must be performed after the para throughput check because the validator group needs to rotate
+	// at least once. This ensures that all validators have had a chance to detect the malicious
+	// behavior.
+	for i in 0..VALIDATOR_COUNT {
+		let validator_name = &format!("validator-{}", i);
+		let validator_node = network.get_node(validator_name)?;
+		validator_node
+			.wait_log_line_count_with_timeout(
+				"Candidate core index is invalid: The core index in commitments doesn't match the one in descriptor",
+				false,
+				1_usize,
+				// Since we have this check after the para throughput check, all validators
+				// should have already detected the malicious collator, and all expected logs
+				// should have already appeared, so there is no need to wait more than 1 second.
+				1_u64,
+			)
+			.await
+			.unwrap_or_else(|error| panic!("Expected log not found for {}: {:?}", validator_name, error));
+	}
+
+	log::info!("Test finished successfully");
+
+	Ok(())
+}
diff --git a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs
index ecdab38e1d2865faaa6957c5f2ce331dc96d61df..7e5d313ff68dd194db0d6cdaa2ad8154e6010359 100644
--- a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs
+++ b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs
@@ -2,4 +2,5 @@
 // SPDX-License-Identifier: Apache-2.0
 
 mod async_backing_6_seconds_rate;
+mod duplicate_collations;
 mod sync_backing;
diff --git a/polkadot/zombienet_tests/misc/0002-upgrade-node.toml b/polkadot/zombienet_tests/misc/0002-upgrade-node.toml
index 1edb18abcececa32cadcf3756ac11e66be5f12c6..5e5e3719936ab0432479fcd217945bf113659b6d 100644
--- a/polkadot/zombienet_tests/misc/0002-upgrade-node.toml
+++ b/polkadot/zombienet_tests/misc/0002-upgrade-node.toml
@@ -30,7 +30,7 @@ addToGenesis = true
   [parachains.collator]
   name = "collator01"
   image = "{{COL_IMAGE}}"
-  command = "undying-collator"
+  command = "adder-collator"
   args = ["-lparachain=debug"]
 
 [[parachains]]
@@ -40,7 +40,7 @@ addToGenesis = true
   [parachains.collator]
   name = "collator02"
   image = "{{COL_IMAGE}}"
-  command = "undying-collator"
+  command = "adder-collator"
   args = ["-lparachain=debug"]
 
 [types.Header]
diff --git a/prdoc/pr_6924.prdoc b/prdoc/pr_6924.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..dc27bb9adfcba0029d6438c62762134ede3595aa
--- /dev/null
+++ b/prdoc/pr_6924.prdoc
@@ -0,0 +1,19 @@
+title: "malus-collator: implement malicious collator submitting same collation to all backing groups"
+
+doc:
+  - audience: Node Dev
+    description: |
+      This PR modifies the undying collator to include a malus mode,
+      enabling it to submit the same collation to all assigned backing groups.
+
+      It also includes a test that spawns a network with the malus collator
+      and verifies that everything functions correctly.
+
+crates:
+  - name: polkadot
+    bump: none
+    validate: false
+  - name: test-parachain-undying
+    bump: patch
+  - name: test-parachain-undying-collator
+    bump: patch