diff --git a/.gitlab/pipeline/zombienet/polkadot.yml b/.gitlab/pipeline/zombienet/polkadot.yml
index 6b72075c513b73c075d1dc10c90d0461bf0e1a82..38c5332f309703dab881d1df88709fc4fe95e49c 100644
--- a/.gitlab/pipeline/zombienet/polkadot.yml
+++ b/.gitlab/pipeline/zombienet/polkadot.yml
@@ -176,6 +176,14 @@ zombienet-polkadot-elastic-scaling-0002-elastic-scaling-doesnt-break-parachains:
       --local-dir="${LOCAL_DIR}/elastic_scaling"
       --test="0002-elastic-scaling-doesnt-break-parachains.zndsl"
 
+zombienet-polkadot-functional-0012-spam-statement-distribution-requests:
+  extends:
+    - .zombienet-polkadot-common
+  script:
+    - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
+      --local-dir="${LOCAL_DIR}/functional"
+      --test="0012-spam-statement-distribution-requests.zndsl"
+
 zombienet-polkadot-smoke-0001-parachains-smoke-test:
   extends:
     - .zombienet-polkadot-common
diff --git a/Cargo.lock b/Cargo.lock
index 1fe4012070a02e3e85aaed5167680d78c03d2ca0..c3bf3f26385be8967cb86ca6186d3948df8ee7d1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -14272,6 +14272,7 @@ dependencies = [
  "polkadot-node-core-pvf-common",
  "polkadot-node-core-pvf-execute-worker",
  "polkadot-node-core-pvf-prepare-worker",
+ "polkadot-node-network-protocol",
  "polkadot-node-primitives",
  "polkadot-node-subsystem",
  "polkadot-node-subsystem-test-helpers",
diff --git a/polkadot/node/malus/Cargo.toml b/polkadot/node/malus/Cargo.toml
index 2f63c2f0938d72bea717ac5aff0b2faace47808b..750074fa9b3cb6209d640b75189768fd94faccf2 100644
--- a/polkadot/node/malus/Cargo.toml
+++ b/polkadot/node/malus/Cargo.toml
@@ -37,6 +37,7 @@ polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator"
 polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" }
 polkadot-node-core-backing = { path = "../core/backing" }
 polkadot-node-primitives = { path = "../primitives" }
+polkadot-node-network-protocol = { path = "../network/protocol" }
 polkadot-primitives = { path = "../../primitives" }
 color-eyre = { version = "0.6.1", default-features = false }
 assert_matches = "1.5"
diff --git a/polkadot/node/malus/src/malus.rs b/polkadot/node/malus/src/malus.rs
index 7a9e320e27368e51da0508910fea5b7c54f2d6c7..6257201537c8d417fc9abf972951340b144d110d 100644
--- a/polkadot/node/malus/src/malus.rs
+++ b/polkadot/node/malus/src/malus.rs
@@ -40,6 +40,8 @@ enum NemesisVariant {
 	DisputeAncestor(DisputeAncestorOptions),
 	/// Delayed disputing of finalized candidates.
 	DisputeFinalizedCandidates(DisputeFinalizedCandidatesOptions),
+	/// Spam many request statements instead of sending a single one.
+	SpamStatementRequests(SpamStatementRequestsOptions),
 }
 
 #[derive(Debug, Parser)]
@@ -98,6 +100,11 @@ impl MalusCli {
 					finality_delay,
 				)?
 			},
+			NemesisVariant::SpamStatementRequests(opts) => {
+				let SpamStatementRequestsOptions { spam_factor, cli } = opts;
+
+				polkadot_cli::run_node(cli, SpamStatementRequests { spam_factor }, finality_delay)?
+			},
 		}
 		Ok(())
 	}
diff --git a/polkadot/node/malus/src/variants/mod.rs b/polkadot/node/malus/src/variants/mod.rs
index 3ca1bf4b4696843476c3de2b6dc441224b0ea9bd..ec945ae1945735b368b173647a4135ac408ca392 100644
--- a/polkadot/node/malus/src/variants/mod.rs
+++ b/polkadot/node/malus/src/variants/mod.rs
@@ -20,6 +20,7 @@ mod back_garbage_candidate;
 mod common;
 mod dispute_finalized_candidates;
 mod dispute_valid_candidates;
+mod spam_statement_requests;
 mod suggest_garbage_candidate;
 mod support_disabled;
 
@@ -27,6 +28,7 @@ pub(crate) use self::{
 	back_garbage_candidate::{BackGarbageCandidateOptions, BackGarbageCandidates},
 	dispute_finalized_candidates::{DisputeFinalizedCandidates, DisputeFinalizedCandidatesOptions},
 	dispute_valid_candidates::{DisputeAncestorOptions, DisputeValidCandidates},
+	spam_statement_requests::{SpamStatementRequests, SpamStatementRequestsOptions},
 	suggest_garbage_candidate::{SuggestGarbageCandidateOptions, SuggestGarbageCandidates},
 	support_disabled::{SupportDisabled, SupportDisabledOptions},
 };
diff --git a/polkadot/node/malus/src/variants/spam_statement_requests.rs b/polkadot/node/malus/src/variants/spam_statement_requests.rs
new file mode 100644
index 0000000000000000000000000000000000000000..c5970c988ac2a3417ba7a9974e6e44f32cf9b854
--- /dev/null
+++ b/polkadot/node/malus/src/variants/spam_statement_requests.rs
@@ -0,0 +1,155 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! A malicious node variant that attempts spam statement requests.
+//!
+//! This malus variant behaves honestly in everything except when propagating statement distribution
+//! requests through the network bridge subsystem. Instead of sending a single request when it needs
+//! something it attempts to spam the peer with multiple requests.
+//!
+//! Attention: For usage with `zombienet` only!
+
+#![allow(missing_docs)]
+
+use polkadot_cli::{
+	service::{
+		AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
+		OverseerGenArgs, OverseerHandle,
+	},
+	validator_overseer_builder, Cli,
+};
+use polkadot_node_network_protocol::request_response::{outgoing::Requests, OutgoingRequest};
+use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, SpawnGlue};
+use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
+use sp_core::traits::SpawnNamed;
+
+// Filter wrapping related types.
+use crate::{interceptor::*, shared::MALUS};
+
+use std::sync::Arc;
+
+/// Wraps around network bridge and replaces it.
+#[derive(Clone)]
+struct RequestSpammer {
+	spam_factor: u32, // How many statement distribution requests to send.
+}
+
+impl<Sender> MessageInterceptor<Sender> for RequestSpammer
+where
+	Sender: overseer::NetworkBridgeTxSenderTrait + Clone + Send + 'static,
+{
+	type Message = NetworkBridgeTxMessage;
+
+	/// Intercept NetworkBridgeTxMessage::SendRequests with Requests::AttestedCandidateV2 inside and
+	/// duplicate that request
+	fn intercept_incoming(
+		&self,
+		_subsystem_sender: &mut Sender,
+		msg: FromOrchestra<Self::Message>,
+	) -> Option<FromOrchestra<Self::Message>> {
+		match msg {
+			FromOrchestra::Communication {
+				msg: NetworkBridgeTxMessage::SendRequests(requests, if_disconnected),
+			} => {
+				let mut new_requests = Vec::new();
+
+				for request in requests {
+					match request {
+						Requests::AttestedCandidateV2(ref req) => {
+							// Temporarily store peer and payload for duplication
+							let peer_to_duplicate = req.peer.clone();
+							let payload_to_duplicate = req.payload.clone();
+							// Push the original request
+							new_requests.push(request);
+
+							// Duplicate for spam purposes
+							gum::info!(
+								target: MALUS,
+								"😈 Duplicating AttestedCandidateV2 request extra {:?} times to peer: {:?}.", self.spam_factor, peer_to_duplicate,
+							);
+							new_requests.extend((0..self.spam_factor - 1).map(|_| {
+								let (new_outgoing_request, _) = OutgoingRequest::new(
+									peer_to_duplicate.clone(),
+									payload_to_duplicate.clone(),
+								);
+								Requests::AttestedCandidateV2(new_outgoing_request)
+							}));
+						},
+						_ => {
+							new_requests.push(request);
+						},
+					}
+				}
+
+				// Passthrough the message with a potentially modified number of requests
+				Some(FromOrchestra::Communication {
+					msg: NetworkBridgeTxMessage::SendRequests(new_requests, if_disconnected),
+				})
+			},
+			FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }),
+			FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)),
+		}
+	}
+}
+
+//----------------------------------------------------------------------------------
+
+#[derive(Debug, clap::Parser)]
+#[clap(rename_all = "kebab-case")]
+#[allow(missing_docs)]
+pub struct SpamStatementRequestsOptions {
+	/// How many statement distribution requests to send.
+	#[clap(long, ignore_case = true, default_value_t = 1000, value_parser = clap::value_parser!(u32).range(0..=10000000))]
+	pub spam_factor: u32,
+
+	#[clap(flatten)]
+	pub cli: Cli,
+}
+
+/// SpamStatementRequests implementation wrapper which implements `OverseerGen` glue.
+pub(crate) struct SpamStatementRequests {
+	/// How many statement distribution requests to send.
+	pub spam_factor: u32,
+}
+
+impl OverseerGen for SpamStatementRequests {
+	fn generate<Spawner, RuntimeClient>(
+		&self,
+		connector: OverseerConnector,
+		args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
+		ext_args: Option<ExtendedOverseerGenArgs>,
+	) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
+	where
+		RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
+		Spawner: 'static + SpawnNamed + Clone + Unpin,
+	{
+		gum::info!(
+			target: MALUS,
+			"😈 Started Malus node that duplicates each statement distribution request spam_factor = {:?} times.",
+			&self.spam_factor,
+		);
+
+		let request_spammer = RequestSpammer { spam_factor: self.spam_factor };
+
+		validator_overseer_builder(
+			args,
+			ext_args.expect("Extended arguments required to build validator overseer are provided"),
+		)?
+		.replace_network_bridge_tx(move |cb| InterceptedSubsystem::new(cb, request_spammer))
+		.build_with_connector(connector)
+		.map_err(|e| e.into())
+	}
+}
diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs
index 7e91d2849120283c3fcefc4193a6e3d8bff809a1..4ca199c3378bfc80eeee9c5f2985b1ad570e5736 100644
--- a/polkadot/node/network/statement-distribution/src/lib.rs
+++ b/polkadot/node/network/statement-distribution/src/lib.rs
@@ -207,6 +207,7 @@ impl<R: rand::Rng> StatementDistributionSubsystem<R> {
 			v2::respond_task(
 				self.req_receiver.take().expect("Mandatory argument to new. qed"),
 				res_sender.clone(),
+				self.metrics.clone(),
 			)
 			.boxed(),
 		)
diff --git a/polkadot/node/network/statement-distribution/src/metrics.rs b/polkadot/node/network/statement-distribution/src/metrics.rs
index b9a51dc89d61afd9d2805f294b208f92572dd754..1bc994174263905d3058154ead29eaaa16bd2ad4 100644
--- a/polkadot/node/network/statement-distribution/src/metrics.rs
+++ b/polkadot/node/network/statement-distribution/src/metrics.rs
@@ -24,14 +24,19 @@ const HISTOGRAM_LATENCY_BUCKETS: &[f64] = &[
 
 #[derive(Clone)]
 struct MetricsInner {
+	// V1
 	statements_distributed: prometheus::Counter<prometheus::U64>,
 	sent_requests: prometheus::Counter<prometheus::U64>,
 	received_responses: prometheus::CounterVec<prometheus::U64>,
-	active_leaves_update: prometheus::Histogram,
-	share: prometheus::Histogram,
 	network_bridge_update: prometheus::HistogramVec,
 	statements_unexpected: prometheus::CounterVec<prometheus::U64>,
 	created_message_size: prometheus::Gauge<prometheus::U64>,
+	// V1+
+	active_leaves_update: prometheus::Histogram,
+	share: prometheus::Histogram,
+	// V2+
+	peer_rate_limit_request_drop: prometheus::Counter<prometheus::U64>,
+	max_parallel_requests_reached: prometheus::Counter<prometheus::U64>,
 }
 
 /// Statement Distribution metrics.
@@ -114,6 +119,23 @@ impl Metrics {
 			metrics.created_message_size.set(size as u64);
 		}
 	}
+
+	/// Update sent dropped requests counter when request dropped because
+	/// of peer rate limit
+	pub fn on_request_dropped_peer_rate_limit(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.peer_rate_limit_request_drop.inc();
+		}
+	}
+
+	/// Update max parallel requests reached counter
+	/// This counter is updated when the maximum number of parallel requests is reached
+	/// and we are waiting for one of the requests to finish
+	pub fn on_max_parallel_requests_reached(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.max_parallel_requests_reached.inc();
+		}
+	}
 }
 
 impl metrics::Metrics for Metrics {
@@ -193,6 +215,20 @@ impl metrics::Metrics for Metrics {
 				))?,
 				registry,
 			)?,
+			peer_rate_limit_request_drop: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_statement_distribution_peer_rate_limit_request_drop_total",
+					"Number of statement distribution requests dropped because of the peer rate limiting.",
+				)?,
+				registry,
+			)?,
+			max_parallel_requests_reached: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_statement_distribution_max_parallel_requests_reached_total",
+					"Number of times the maximum number of parallel requests was reached.",
+				)?,
+				registry,
+			)?,
 		};
 		Ok(Metrics(Some(metrics)))
 	}
diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs
index 118e34e92063e5571f637d9e44460b5b78c4fd8a..8579ac15cbc13f5186acea86f4dab492b943c4aa 100644
--- a/polkadot/node/network/statement-distribution/src/v2/mod.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs
@@ -59,6 +59,8 @@ use sp_keystore::KeystorePtr;
 use fatality::Nested;
 use futures::{
 	channel::{mpsc, oneshot},
+	future::FutureExt,
+	select,
 	stream::FuturesUnordered,
 	SinkExt, StreamExt,
 };
@@ -73,6 +75,7 @@ use std::{
 
 use crate::{
 	error::{JfyiError, JfyiErrorResult},
+	metrics::Metrics,
 	LOG_TARGET,
 };
 use candidates::{BadAdvertisement, Candidates, PostConfirmation};
@@ -3423,35 +3426,61 @@ pub(crate) struct ResponderMessage {
 pub(crate) async fn respond_task(
 	mut receiver: IncomingRequestReceiver<AttestedCandidateRequest>,
 	mut sender: mpsc::Sender<ResponderMessage>,
+	metrics: Metrics,
 ) {
 	let mut pending_out = FuturesUnordered::new();
+	let mut active_peers = HashSet::new();
+
 	loop {
-		// Ensure we are not handling too many requests in parallel.
-		if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
-			// Wait for one to finish:
-			pending_out.next().await;
-		}
+		select! {
+			// New request
+			request_result = receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse() => {
+				let request = match request_result.into_nested() {
+					Ok(Ok(v)) => v,
+					Err(fatal) => {
+						gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
+						return
+					},
+					Ok(Err(jfyi)) => {
+						gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
+						continue
+					},
+				};
 
-		let req = match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() {
-			Ok(Ok(v)) => v,
-			Err(fatal) => {
-				gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
-				return
+				// If peer currently being served drop request
+				if active_peers.contains(&request.peer) {
+					gum::trace!(target: LOG_TARGET, "Peer already being served, dropping request");
+					metrics.on_request_dropped_peer_rate_limit();
+					continue
+				}
+
+				// If we are over parallel limit wait for one to finish
+				if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
+					gum::trace!(target: LOG_TARGET, "Over max parallel requests, waiting for one to finish");
+					metrics.on_max_parallel_requests_reached();
+					let (_, peer) = pending_out.select_next_some().await;
+					active_peers.remove(&peer);
+				}
+
+				// Start serving the request
+				let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
+				let peer = request.peer;
+				if let Err(err) = sender
+					.feed(ResponderMessage { request, sent_feedback: pending_sent_tx })
+					.await
+				{
+					gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
+					return
+				}
+				let future_with_peer = pending_sent_rx.map(move |result| (result, peer));
+				pending_out.push(future_with_peer);
+				active_peers.insert(peer);
 			},
-			Ok(Err(jfyi)) => {
-				gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
-				continue
+			// Request served/finished
+			result = pending_out.select_next_some() => {
+				let (_, peer) = result;
+				active_peers.remove(&peer);
 			},
-		};
-
-		let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
-		if let Err(err) = sender
-			.feed(ResponderMessage { request: req, sent_feedback: pending_sent_tx })
-			.await
-		{
-			gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
-			return
 		}
-		pending_out.push(pending_sent_rx);
 	}
 }
diff --git a/polkadot/node/network/statement-distribution/src/v2/requests.rs b/polkadot/node/network/statement-distribution/src/v2/requests.rs
index 1ed18ffd42a9fa6eca92284b097f69b9caa12bb1..b8ed34d26c8a5272273297203b82e9ce263a107c 100644
--- a/polkadot/node/network/statement-distribution/src/v2/requests.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/requests.rs
@@ -366,6 +366,7 @@ impl RequestManager {
 				id,
 				&props,
 				&peer_advertised,
+				&response_manager,
 			) {
 				None => continue,
 				Some(t) => t,
@@ -387,14 +388,17 @@ impl RequestManager {
 			);
 
 			let stored_id = id.clone();
-			response_manager.push(Box::pin(async move {
-				TaggedResponse {
-					identifier: stored_id,
-					requested_peer: target,
-					props,
-					response: response_fut.await,
-				}
-			}));
+			response_manager.push(
+				Box::pin(async move {
+					TaggedResponse {
+						identifier: stored_id,
+						requested_peer: target,
+						props,
+						response: response_fut.await,
+					}
+				}),
+				target,
+			);
 
 			entry.in_flight = true;
 
@@ -422,28 +426,35 @@ impl RequestManager {
 /// A manager for pending responses.
 pub struct ResponseManager {
 	pending_responses: FuturesUnordered<BoxFuture<'static, TaggedResponse>>,
+	active_peers: HashSet<PeerId>,
 }
 
 impl ResponseManager {
 	pub fn new() -> Self {
-		Self { pending_responses: FuturesUnordered::new() }
+		Self { pending_responses: FuturesUnordered::new(), active_peers: HashSet::new() }
 	}
 
 	/// Await the next incoming response to a sent request, or immediately
 	/// return `None` if there are no pending responses.
 	pub async fn incoming(&mut self) -> Option<UnhandledResponse> {
-		self.pending_responses
-			.next()
-			.await
-			.map(|response| UnhandledResponse { response })
+		self.pending_responses.next().await.map(|response| {
+			self.active_peers.remove(&response.requested_peer);
+			UnhandledResponse { response }
+		})
 	}
 
 	fn len(&self) -> usize {
 		self.pending_responses.len()
 	}
 
-	fn push(&mut self, response: BoxFuture<'static, TaggedResponse>) {
+	fn push(&mut self, response: BoxFuture<'static, TaggedResponse>, target: PeerId) {
 		self.pending_responses.push(response);
+		self.active_peers.insert(target);
+	}
+
+	/// Returns true if we are currently sending a request to the peer.
+	fn is_sending_to(&self, peer: &PeerId) -> bool {
+		self.active_peers.contains(peer)
 	}
 }
 
@@ -471,10 +482,16 @@ fn find_request_target_with_update(
 	candidate_identifier: &CandidateIdentifier,
 	props: &RequestProperties,
 	peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option<StatementFilter>,
+	response_manager: &ResponseManager,
 ) -> Option<PeerId> {
 	let mut prune = Vec::new();
 	let mut target = None;
 	for (i, p) in known_by.iter().enumerate() {
+		// If we are already sending to that peer, skip for now
+		if response_manager.is_sending_to(p) {
+			continue
+		}
+
 		let mut filter = match peer_advertised(candidate_identifier, p) {
 			None => {
 				prune.push(i);
@@ -1002,7 +1019,8 @@ mod tests {
 		candidate_receipt.descriptor.persisted_validation_data_hash =
 			persisted_validation_data.hash();
 		let candidate = candidate_receipt.hash();
-		let requested_peer = PeerId::random();
+		let requested_peer_1 = PeerId::random();
+		let requested_peer_2 = PeerId::random();
 
 		let identifier1 = request_manager
 			.get_or_insert(relay_parent, candidate, 1.into())
@@ -1010,14 +1028,14 @@ mod tests {
 			.clone();
 		request_manager
 			.get_or_insert(relay_parent, candidate, 1.into())
-			.add_peer(requested_peer);
+			.add_peer(requested_peer_1);
 		let identifier2 = request_manager
 			.get_or_insert(relay_parent, candidate, 2.into())
 			.identifier
 			.clone();
 		request_manager
 			.get_or_insert(relay_parent, candidate, 2.into())
-			.add_peer(requested_peer);
+			.add_peer(requested_peer_2);
 
 		assert_ne!(identifier1, identifier2);
 		assert_eq!(request_manager.requests.len(), 2);
@@ -1053,7 +1071,7 @@ mod tests {
 			let response = UnhandledResponse {
 				response: TaggedResponse {
 					identifier: identifier1,
-					requested_peer,
+					requested_peer: requested_peer_1,
 					props: request_properties.clone(),
 					response: Ok(AttestedCandidateResponse {
 						candidate_receipt: candidate_receipt.clone(),
@@ -1076,13 +1094,13 @@ mod tests {
 			assert_eq!(
 				output,
 				ResponseValidationOutput {
-					requested_peer,
+					requested_peer: requested_peer_1,
 					request_status: CandidateRequestStatus::Complete {
 						candidate: candidate_receipt.clone(),
 						persisted_validation_data: persisted_validation_data.clone(),
 						statements,
 					},
-					reputation_changes: vec![(requested_peer, BENEFIT_VALID_RESPONSE)],
+					reputation_changes: vec![(requested_peer_1, BENEFIT_VALID_RESPONSE)],
 				}
 			);
 		}
@@ -1093,7 +1111,7 @@ mod tests {
 			let response = UnhandledResponse {
 				response: TaggedResponse {
 					identifier: identifier2,
-					requested_peer,
+					requested_peer: requested_peer_2,
 					props: request_properties,
 					response: Ok(AttestedCandidateResponse {
 						candidate_receipt: candidate_receipt.clone(),
@@ -1115,12 +1133,14 @@ mod tests {
 			assert_eq!(
 				output,
 				ResponseValidationOutput {
-					requested_peer,
+					requested_peer: requested_peer_2,
 					request_status: CandidateRequestStatus::Outdated,
 					reputation_changes: vec![],
 				}
 			);
 		}
+
+		assert_eq!(request_manager.requests.len(), 0);
 	}
 
 	// Test case where we had a request in-flight and the request entry was garbage-collected on
@@ -1293,4 +1313,140 @@ mod tests {
 		assert_eq!(request_manager.requests.len(), 0);
 		assert_eq!(request_manager.by_priority.len(), 0);
 	}
+
+	// Test case where we queue 2 requests to be sent to the same peer and 1 request to another
+	// peer. Same peer requests should be served one at a time but they should not block the other
+	// peer request.
+	#[test]
+	fn rate_limit_requests_to_same_peer() {
+		let mut request_manager = RequestManager::new();
+		let mut response_manager = ResponseManager::new();
+
+		let relay_parent = Hash::from_low_u64_le(1);
+
+		// Create 3 candidates
+		let mut candidate_receipt_1 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
+		let persisted_validation_data_1 = dummy_pvd();
+		candidate_receipt_1.descriptor.persisted_validation_data_hash =
+			persisted_validation_data_1.hash();
+		let candidate_1 = candidate_receipt_1.hash();
+
+		let mut candidate_receipt_2 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
+		let persisted_validation_data_2 = dummy_pvd();
+		candidate_receipt_2.descriptor.persisted_validation_data_hash =
+			persisted_validation_data_2.hash();
+		let candidate_2 = candidate_receipt_2.hash();
+
+		let mut candidate_receipt_3 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
+		let persisted_validation_data_3 = dummy_pvd();
+		candidate_receipt_3.descriptor.persisted_validation_data_hash =
+			persisted_validation_data_3.hash();
+		let candidate_3 = candidate_receipt_3.hash();
+
+		// Create 2 peers
+		let requested_peer_1 = PeerId::random();
+		let requested_peer_2 = PeerId::random();
+
+		let group_size = 3;
+		let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
+		let unwanted_mask = StatementFilter::blank(group_size);
+		let disabled_mask: BitVec<u8, Lsb0> = Default::default();
+		let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
+		let request_props = |_identifier: &CandidateIdentifier| Some((&request_properties).clone());
+		let peer_advertised =
+			|_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
+
+		// Add request for candidate 1 from peer 1
+		let identifier1 = request_manager
+			.get_or_insert(relay_parent, candidate_1, 1.into())
+			.identifier
+			.clone();
+		request_manager
+			.get_or_insert(relay_parent, candidate_1, 1.into())
+			.add_peer(requested_peer_1);
+
+		// Add request for candidate 3 from peer 2 (this one can be served in parallel)
+		let _identifier3 = request_manager
+			.get_or_insert(relay_parent, candidate_3, 1.into())
+			.identifier
+			.clone();
+		request_manager
+			.get_or_insert(relay_parent, candidate_3, 1.into())
+			.add_peer(requested_peer_2);
+
+		// Successfully dispatch request for candidate 1 from peer 1 and candidate 3 from peer 2
+		for _ in 0..2 {
+			let outgoing =
+				request_manager.next_request(&mut response_manager, request_props, peer_advertised);
+			assert!(outgoing.is_some());
+		}
+		assert_eq!(response_manager.active_peers.len(), 2);
+		assert!(response_manager.is_sending_to(&requested_peer_1));
+		assert!(response_manager.is_sending_to(&requested_peer_2));
+		assert_eq!(request_manager.requests.len(), 2);
+
+		// Add request for candidate 2 from peer 1
+		let _identifier2 = request_manager
+			.get_or_insert(relay_parent, candidate_2, 1.into())
+			.identifier
+			.clone();
+		request_manager
+			.get_or_insert(relay_parent, candidate_2, 1.into())
+			.add_peer(requested_peer_1);
+
+		// Do not dispatch the request for the second candidate from peer 1 (already serving that
+		// peer)
+		let outgoing =
+			request_manager.next_request(&mut response_manager, request_props, peer_advertised);
+		assert!(outgoing.is_none());
+		assert_eq!(response_manager.active_peers.len(), 2);
+		assert!(response_manager.is_sending_to(&requested_peer_1));
+		assert!(response_manager.is_sending_to(&requested_peer_2));
+		assert_eq!(request_manager.requests.len(), 3);
+
+		// Manually mark response received (response future resolved)
+		response_manager.active_peers.remove(&requested_peer_1);
+		response_manager.pending_responses = FuturesUnordered::new();
+
+		// Validate first response (candidate 1 from peer 1)
+		{
+			let statements = vec![];
+			let response = UnhandledResponse {
+				response: TaggedResponse {
+					identifier: identifier1,
+					requested_peer: requested_peer_1,
+					props: request_properties.clone(),
+					response: Ok(AttestedCandidateResponse {
+						candidate_receipt: candidate_receipt_1.clone(),
+						persisted_validation_data: persisted_validation_data_1.clone(),
+						statements,
+					}),
+				},
+			};
+			let validator_key_lookup = |_v| None;
+			let allowed_para_lookup = |_para, _g_index| true;
+			let _output = response.validate_response(
+				&mut request_manager,
+				group,
+				0,
+				validator_key_lookup,
+				allowed_para_lookup,
+				disabled_mask.clone(),
+			);
+
+			// First request served successfully
+			assert_eq!(request_manager.requests.len(), 2);
+			assert_eq!(response_manager.active_peers.len(), 1);
+			assert!(response_manager.is_sending_to(&requested_peer_2));
+		}
+
+		// Check if the request that was ignored previously will be served now
+		let outgoing =
+			request_manager.next_request(&mut response_manager, request_props, peer_advertised);
+		assert!(outgoing.is_some());
+		assert_eq!(response_manager.active_peers.len(), 2);
+		assert!(response_manager.is_sending_to(&requested_peer_1));
+		assert!(response_manager.is_sending_to(&requested_peer_2));
+		assert_eq!(request_manager.requests.len(), 2);
+	}
 }
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
index 8cf139802148c0a6e720f6918c61824d5ef96575..c9de42d2c4681b0b8cd13677964a85474b183201 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/requests.rs
@@ -1891,7 +1891,7 @@ fn local_node_sanity_checks_incoming_requests() {
 			let mask = StatementFilter::blank(state.config.group_size + 1);
 			let response = state
 				.send_request(
-					peer_c,
+					peer_a,
 					request_v2::AttestedCandidateRequest { candidate_hash: candidate.hash(), mask },
 				)
 				.await
diff --git a/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.toml b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.toml
new file mode 100644
index 0000000000000000000000000000000000000000..14208425d62bac4a7c227211c99da4e01dd89866
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.toml
@@ -0,0 +1,43 @@
+[settings]
+timeout = 1000
+
+[relaychain.genesis.runtimeGenesis.patch.configuration.config]
+  needed_approvals = 2
+
+[relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params]
+  max_validators_per_core = 5
+
+[relaychain]
+default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}"
+chain = "rococo-local"
+default_command = "polkadot"
+
+[relaychain.default_resources]
+limits = { memory = "4G", cpu = "2" }
+requests = { memory = "2G", cpu = "1" }
+
+  [[relaychain.node_groups]]
+  name = "honest"
+  count = 4
+  args = ["-lparachain=debug,parachain::statement-distribution=trace"]
+
+  [[relaychain.nodes]]
+  image = "{{MALUS_IMAGE}}"
+  name = "malus"
+  command = "malus spam-statement-requests"
+  args = [ "--alice", "-lparachain=debug,MALUS=trace", "--spam-factor=1000" ]
+
+{% for id in range(2000,2001) %}
+[[parachains]]
+id = {{id}}
+  [parachains.collator]
+  image = "{{COL_IMAGE}}"
+  name = "collator"
+  command = "undying-collator"
+  args = ["-lparachain=debug"]
+{% endfor %}
+
+[types.Header]
+number = "u64"
+parent_hash = "Hash"
+post_state = "Hash"
diff --git a/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.zndsl b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.zndsl
new file mode 100644
index 0000000000000000000000000000000000000000..9985dd24ee38a56c823e07d07556fd77eb7a1ac5
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0012-spam-statement-distribution-requests.zndsl
@@ -0,0 +1,27 @@
+Description: Test if parachains progress when group is getting spammed by statement distribution requests.
+Network: ./0012-spam-statement-distribution-requests.toml
+Creds: config
+
+# Check authority status and peers.
+malus: reports node_roles is 4
+honest: reports node_roles is 4
+
+# Ensure parachains are registered.
+honest: parachain 2000 is registered within 60 seconds
+
+# Ensure that malus is already attempting to DoS
+malus: log line contains "😈 Duplicating AttestedCandidateV2 request" within 90 seconds
+
+# Ensure parachains made progress.
+honest: parachain 2000 block height is at least 10 within 200 seconds
+
+# Ensure that honest nodes drop extra requests
+honest: log line contains "Peer already being served, dropping request" within 60 seconds
+
+# Check lag - approval
+honest: reports polkadot_parachain_approval_checking_finality_lag is 0
+
+# Check lag - dispute conclusion
+honest: reports polkadot_parachain_disputes_finality_lag is 0
+
+
diff --git a/prdoc/pr_3444.prdoc b/prdoc/pr_3444.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..3afb38106417bdb3784fba0f7306b04c7705b9cf
--- /dev/null
+++ b/prdoc/pr_3444.prdoc
@@ -0,0 +1,25 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: Rate-limiting of statement distribution v2 requests to 1 per peer
+
+doc:
+  - audience: Node Dev
+    description: |
+      A new malicious node variant that sends duplicate statement
+      distribution messages to spam other peers.
+  
+  - audience: Node Operator
+    description: |
+      Added rate-limiting in the statement distribution request-response
+      protocol. Requesters will not issue another request to a peer if one
+      is already pending with that peer and receiving nodes will reject
+      requests from peers that they are currently serving. 
+      This should reduce the risk of validator-validator DoS attacks and
+      better load-balance statement distribution.
+
+crates: 
+  - name: polkadot-test-malus
+    bump: minor
+  - name: polkadot-statement-distribution
+    bump: minor