diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs
index 33af9c9533388a1e4d5832a390c8eb96da756905..119bf270c63abe44fa9a59d78cc2f093b58cf18c 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/api.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs
@@ -47,14 +47,14 @@ pub trait TransactionBroadcastApi {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[method(name = "transaction_unstable_broadcast")]
-	fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;
+	#[method(name = "transaction_unstable_broadcast", raw_method)]
+	async fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;
 
 	/// Broadcast an extrinsic to the chain.
 	///
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[method(name = "transaction_unstable_stop")]
-	fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
+	#[method(name = "transaction_unstable_stop", raw_method)]
+	async fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
 }
diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs
index 4a15657a7f69e290e0d1cccdf7fc3082671a9ede..570174a3db6434fec278225fbb6b390cd3a5f07a 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs
@@ -67,6 +67,7 @@ fn maintained_pool(
 
 pub fn setup_api(
 	options: Options,
+	max_tx_per_connection: usize,
 ) -> (
 	Arc<TestApi>,
 	Arc<MiddlewarePool>,
@@ -85,9 +86,13 @@ pub fn setup_api(
 
 	let (task_executor, executor_recv) = TaskExecutorBroadcast::new();
 
-	let tx_api =
-		RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor))
-			.into_rpc();
+	let tx_api = RpcTransactionBroadcast::new(
+		client_mock.clone(),
+		pool.clone(),
+		Arc::new(task_executor),
+		max_tx_per_connection,
+	)
+	.into_rpc();
 
 	(api, pool, client_mock, tx_api, executor_recv, pool_state)
 }
diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs
index 14e188b6a87300e6c0d7cf4c53d4f401663f696b..f4a69bd6ed4782f49e15f098a1f4e7ad67b8bd1a 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs
@@ -26,6 +26,8 @@ use std::sync::Arc;
 use substrate_test_runtime_client::AccountKeyring::*;
 use substrate_test_runtime_transaction_pool::uxt;
 
+const MAX_TX_PER_CONNECTION: usize = 4;
+
 // Test helpers.
 use crate::transaction::tests::{
 	middleware_pool::{MiddlewarePoolEvent, TxStatusTypeTest},
@@ -35,7 +37,7 @@ use crate::transaction::tests::{
 #[tokio::test]
 async fn tx_broadcast_enters_pool() {
 	let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
-		setup_api(Default::default());
+		setup_api(Default::default(), MAX_TX_PER_CONNECTION);
 
 	// Start at block 1.
 	let block_1_header = api.push_block(1, vec![], true);
@@ -94,7 +96,8 @@ async fn tx_broadcast_enters_pool() {
 
 #[tokio::test]
 async fn tx_broadcast_invalid_tx() {
-	let (_, pool, _, tx_api, exec_middleware, _) = setup_api(Default::default());
+	let (_, pool, _, tx_api, exec_middleware, _) =
+		setup_api(Default::default(), MAX_TX_PER_CONNECTION);
 
 	// Invalid parameters.
 	let err = tx_api
@@ -131,7 +134,7 @@ async fn tx_broadcast_invalid_tx() {
 
 #[tokio::test]
 async fn tx_stop_with_invalid_operation_id() {
-	let (_, _, _, tx_api, _, _) = setup_api(Default::default());
+	let (_, _, _, tx_api, _, _) = setup_api(Default::default(), MAX_TX_PER_CONNECTION);
 
 	// Make an invalid stop call.
 	let err = tx_api
@@ -146,7 +149,7 @@ async fn tx_stop_with_invalid_operation_id() {
 #[tokio::test]
 async fn tx_broadcast_resubmits_future_nonce_tx() {
 	let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
-		setup_api(Default::default());
+		setup_api(Default::default(), MAX_TX_PER_CONNECTION);
 
 	// Start at block 1.
 	let block_1_header = api.push_block(1, vec![], true);
@@ -237,7 +240,7 @@ async fn tx_broadcast_resubmits_future_nonce_tx() {
 #[tokio::test]
 async fn tx_broadcast_stop_after_broadcast_finishes() {
 	let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
-		setup_api(Default::default());
+		setup_api(Default::default(), MAX_TX_PER_CONNECTION);
 
 	// Start at block 1.
 	let block_1_header = api.push_block(1, vec![], true);
@@ -320,7 +323,7 @@ async fn tx_broadcast_resubmits_invalid_tx() {
 	};
 
 	let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
-		setup_api(options);
+		setup_api(options, MAX_TX_PER_CONNECTION);
 
 	let uxt = uxt(Alice, ALICE_NONCE);
 	let xt = hex_string(&uxt.encode());
@@ -439,7 +442,8 @@ async fn tx_broadcast_resubmits_dropped_tx() {
 		ban_time: std::time::Duration::ZERO,
 	};
 
-	let (api, pool, client_mock, tx_api, _, mut pool_middleware) = setup_api(options);
+	let (api, pool, client_mock, tx_api, _, mut pool_middleware) =
+		setup_api(options, MAX_TX_PER_CONNECTION);
 
 	let current_uxt = uxt(Alice, ALICE_NONCE);
 	let current_xt = hex_string(&current_uxt.encode());
@@ -518,3 +522,53 @@ async fn tx_broadcast_resubmits_dropped_tx() {
 	// The dropped transaction was resubmitted.
 	assert_eq!(events.get(&future_xt).unwrap(), &vec![TxStatusTypeTest::Ready]);
 }
+
+#[tokio::test]
+async fn tx_broadcast_limit_reached() {
+	// One operation per connection.
+	let (api, _pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
+		setup_api(Default::default(), 1);
+
+	// Start at block 1.
+	let block_1_header = api.push_block(1, vec![], true);
+	let uxt = uxt(Alice, ALICE_NONCE);
+	let xt = hex_string(&uxt.encode());
+
+	let operation_id: String =
+		tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
+
+	// Announce block 1 to `transaction_unstable_broadcast`.
+	client_mock.trigger_import_stream(block_1_header).await;
+
+	// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
+	let event = get_next_event!(&mut pool_middleware);
+	assert_eq!(
+		event,
+		MiddlewarePoolEvent::TransactionStatus {
+			transaction: xt.clone(),
+			status: TxStatusTypeTest::Ready
+		}
+	);
+	assert_eq!(1, exec_middleware.num_tasks());
+
+	let operation_id_limit_reached: Option<String> =
+		tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
+	assert!(operation_id_limit_reached.is_none(), "No operation ID => tx was rejected");
+
+	// We still have in flight one operation.
+	assert_eq!(1, exec_middleware.num_tasks());
+
+	// Force the future to exit by calling stop.
+	let _: () = tx_api
+		.call("transaction_unstable_stop", rpc_params![&operation_id])
+		.await
+		.unwrap();
+
+	// Ensure the broadcast future finishes.
+	let _ = get_next_event!(&mut exec_middleware.recv);
+	assert_eq!(0, exec_middleware.num_tasks());
+
+	// Can resubmit again now.
+	let _operation_id: String =
+		tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
+}
diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs
index 6a7c69b8f7d1e765d0ce64ff590e191e97935d30..723440d1b11101b71bcc1602e4da3d15ac0e931e 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs
@@ -26,6 +26,7 @@ use crate::{
 	},
 	SubscriptionTaskExecutor,
 };
+
 use codec::Decode;
 use futures::{StreamExt, TryFutureExt};
 use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs
index ef1a426865d5d4a57aa189889f136b104c5b1d33..68c19010e31c5653509c380f6be69c072e6fe9bb 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs
@@ -18,11 +18,17 @@
 
 //! API implementation for broadcasting transactions.
 
-use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor};
+use crate::{
+	common::connections::RpcConnections, transaction::api::TransactionBroadcastApiServer,
+	SubscriptionTaskExecutor,
+};
 use codec::Decode;
 use futures::{FutureExt, Stream, StreamExt};
 use futures_util::stream::AbortHandle;
-use jsonrpsee::core::{async_trait, RpcResult};
+use jsonrpsee::{
+	core::{async_trait, RpcResult},
+	ConnectionDetails,
+};
 use parking_lot::RwLock;
 use rand::{distributions::Alphanumeric, Rng};
 use sc_client_api::BlockchainEvents;
@@ -46,6 +52,8 @@ pub struct TransactionBroadcast<Pool: TransactionPool, Client> {
 	executor: SubscriptionTaskExecutor,
 	/// The broadcast operation IDs.
 	broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState<Pool>>>>,
+	/// Keep track of how many concurrent operations are active for each connection.
+	rpc_connections: RpcConnections,
 }
 
 /// The state of a broadcast operation.
@@ -58,8 +66,19 @@ struct BroadcastState<Pool: TransactionPool> {
 
 impl<Pool: TransactionPool, Client> TransactionBroadcast<Pool, Client> {
 	/// Creates a new [`TransactionBroadcast`].
-	pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
-		TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() }
+	pub fn new(
+		client: Arc<Client>,
+		pool: Arc<Pool>,
+		executor: SubscriptionTaskExecutor,
+		max_transactions_per_connection: usize,
+	) -> Self {
+		TransactionBroadcast {
+			client,
+			pool,
+			executor,
+			broadcast_ids: Default::default(),
+			rpc_connections: RpcConnections::new(max_transactions_per_connection),
+		}
 	}
 
 	/// Generate an unique operation ID for the `transaction_broadcast` RPC method.
@@ -102,12 +121,26 @@ where
 	<Pool::Block as BlockT>::Hash: Unpin,
 	Client: HeaderBackend<Pool::Block> + BlockchainEvents<Pool::Block> + Send + Sync + 'static,
 {
-	fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>> {
+	async fn broadcast(
+		&self,
+		connection_details: ConnectionDetails,
+		bytes: Bytes,
+	) -> RpcResult<Option<String>> {
 		let pool = self.pool.clone();
 
 		// The unique ID of this operation.
 		let id = self.generate_unique_id();
 
+		// Ensure that the connection has not reached the maximum number of active operations.
+		let Some(reserved_connection) = self.rpc_connections.reserve_space(connection_details.id())
+		else {
+			return Ok(None)
+		};
+		let Some(reserved_identifier) = reserved_connection.register(id.clone()) else {
+			// This can only happen if the generated operation ID is not unique.
+			return Ok(None)
+		};
+
 		// The JSON-RPC server might check whether the transaction is valid before broadcasting it.
 		// If it does so and if the transaction is invalid, the server should silently do nothing
 		// and the JSON-RPC client is not informed of the problem. Invalid transactions should still
@@ -118,7 +151,11 @@ where
 		// Save the tx hash to remove it later.
 		let tx_hash = pool.hash_of(&decoded_extrinsic);
 
-		let mut best_block_import_stream =
+		// The compiler can no longer deduce the type of the stream and complains
+		// about `one type is more general than the other`.
+		let mut best_block_import_stream: std::pin::Pin<
+			Box<dyn Stream<Item = <Pool::Block as BlockT>::Hash> + Send>,
+		> =
 			Box::pin(self.client.import_notification_stream().filter_map(
 				|notification| async move { notification.is_new_best.then_some(notification.hash) },
 			));
@@ -180,6 +217,9 @@ where
 		// The future expected by the executor must be `Future<Output = ()>` instead of
 		// `Future<Output = Result<(), Aborted>>`.
 		let fut = fut.map(move |result| {
+			// Connection space is cleaned when this object is dropped.
+			drop(reserved_identifier);
+
 			// Remove the entry from the broadcast IDs map.
 			let Some(broadcast_state) = broadcast_ids.write().remove(&drop_id) else { return };
 
@@ -203,7 +243,16 @@ where
 		Ok(Some(id))
 	}
 
-	fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast> {
+	async fn stop_broadcast(
+		&self,
+		connection_details: ConnectionDetails,
+		operation_id: String,
+	) -> Result<(), ErrorBroadcast> {
+		// The operation ID must correlate to the same connection ID.
+		if !self.rpc_connections.contains_identifier(connection_details.id(), &operation_id) {
+			return Err(ErrorBroadcast::InvalidOperationID)
+		}
+
 		let mut broadcast_ids = self.broadcast_ids.write();
 
 		let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else {
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index 830f9884719dcf1fd1b024906f830b18a154a323..d0d7cba3862409c2df0d71ea9c6758dd83105340 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -644,10 +644,13 @@ where
 		(chain, state, child_state)
 	};
 
+	const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
+
 	let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
 		client.clone(),
 		transaction_pool.clone(),
 		task_executor.clone(),
+		MAX_TRANSACTION_PER_CONNECTION,
 	)
 	.into_rpc();