diff --git a/Cargo.lock b/Cargo.lock
index f5b272d117566c9b0715d8dd8cae5e0adcb6ecd8..68d399f78a7703bf4b155215b5d5be3ece90b0c5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -16433,11 +16433,13 @@ dependencies = [
  "parity-scale-codec",
  "parking_lot 0.12.1",
  "pretty_assertions",
+ "rand",
  "sc-block-builder",
  "sc-chain-spec",
  "sc-client-api",
  "sc-rpc",
  "sc-service",
+ "sc-transaction-pool",
  "sc-transaction-pool-api",
  "sc-utils",
  "serde",
@@ -16453,6 +16455,7 @@ dependencies = [
  "sp-version",
  "substrate-test-runtime",
  "substrate-test-runtime-client",
+ "substrate-test-runtime-transaction-pool",
  "thiserror",
  "tokio",
  "tokio-stream",
diff --git a/prdoc/pr_3079.prdoc b/prdoc/pr_3079.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..c745c1ffbfe5775f205dc5510a637dbfd4e6f56c
--- /dev/null
+++ b/prdoc/pr_3079.prdoc
@@ -0,0 +1,15 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: Implement transaction_unstable_broadcast and transaction_unstable_stop
+
+doc:
+  - audience: Node Dev
+    description: |
+      A new RPC class is added to handle transactions. The `transaction_unstable_broadcast` broadcasts
+      the provided transaction to the peers of the node, until the `transaction_unstable_stop` is called.
+      The APIs are marked as unstable and subject to change in the future.
+      To know if the transaction was added to the chain, users can decode the bodies of announced finalized blocks.
+      This is a low-level approach for `transactionWatch_unstable_submitAndWatch`.
+
+crates: [ ]
diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml
index 12a02e0b45083b18bf5bc34997288199eb2425a5..1b7870764dc361cf3c4114a1bfab1b59671f4610 100644
--- a/substrate/client/rpc-spec-v2/Cargo.toml
+++ b/substrate/client/rpc-spec-v2/Cargo.toml
@@ -41,12 +41,14 @@ tokio = { version = "1.22.0", features = ["sync"] }
 array-bytes = "6.1"
 log = { workspace = true, default-features = true }
 futures-util = { version = "0.3.30", default-features = false }
+rand = "0.8.5"
 
 [dev-dependencies]
 serde_json = "1.0.111"
 tokio = { version = "1.22.0", features = ["macros"] }
 substrate-test-runtime-client = { path = "../../test-utils/runtime/client" }
 substrate-test-runtime = { path = "../../test-utils/runtime" }
+substrate-test-runtime-transaction-pool = { path = "../../test-utils/runtime/transaction-pool" }
 sp-consensus = { path = "../../primitives/consensus/common" }
 sp-externalities = { path = "../../primitives/externalities" }
 sp-maybe-compressed-blob = { path = "../../primitives/maybe-compressed-blob" }
@@ -54,3 +56,4 @@ sc-block-builder = { path = "../block-builder" }
 sc-service = { path = "../service", features = ["test-helpers"] }
 assert_matches = "1.3.0"
 pretty_assertions = "1.2.1"
+sc-transaction-pool = { path = "../transaction-pool" }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
index 4cbbd00f64f31f04f44ff83f9e5980e7d22e7308..c9fe19aca2b1898da25e45019e1924256d732d9a 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
@@ -23,7 +23,7 @@
 //! Methods are prefixed by `chainHead`.
 
 #[cfg(test)]
-mod test_utils;
+pub mod test_utils;
 #[cfg(test)]
 mod tests;
 
diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs
index 53c83b662a35fdcb67a5e212307c6b5ec7c2a61f..33af9c9533388a1e4d5832a390c8eb96da756905 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/api.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs
@@ -18,8 +18,8 @@
 
 //! API trait for transactions.
 
-use crate::transaction::event::TransactionEvent;
-use jsonrpsee::proc_macros::rpc;
+use crate::transaction::{error::ErrorBroadcast, event::TransactionEvent};
+use jsonrpsee::{core::RpcResult, proc_macros::rpc};
 use sp_core::Bytes;
 
 #[rpc(client, server)]
@@ -28,6 +28,10 @@ pub trait TransactionApi<Hash: Clone> {
 	///
 	/// See [`TransactionEvent`](crate::transaction::event::TransactionEvent) for details on
 	/// transaction life cycle.
+	///
+	/// # Unstable
+	///
+	/// This method is unstable and subject to change in the future.
 	#[subscription(
 		name = "transactionWatch_unstable_submitAndWatch" => "transactionWatch_unstable_watchEvent",
 		unsubscribe = "transactionWatch_unstable_unwatch",
@@ -35,3 +39,22 @@ pub trait TransactionApi<Hash: Clone> {
 	)]
 	fn submit_and_watch(&self, bytes: Bytes);
 }
+
+#[rpc(client, server)]
+pub trait TransactionBroadcastApi {
+	/// Broadcast an extrinsic to the chain.
+	///
+	/// # Unstable
+	///
+	/// This method is unstable and subject to change in the future.
+	#[method(name = "transaction_unstable_broadcast")]
+	fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;
+
+	/// Broadcast an extrinsic to the chain.
+	///
+	/// # Unstable
+	///
+	/// This method is unstable and subject to change in the future.
+	#[method(name = "transaction_unstable_stop")]
+	fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
+}
diff --git a/substrate/client/rpc-spec-v2/src/transaction/error.rs b/substrate/client/rpc-spec-v2/src/transaction/error.rs
index d2de07afd5955cd2d41148bab0119384b117bf42..116977af66001096ff7cb14775b768451833a866 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/error.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/error.rs
@@ -21,6 +21,7 @@
 //! Errors are interpreted as transaction events for subscriptions.
 
 use crate::transaction::event::{TransactionError, TransactionEvent};
+use jsonrpsee::types::error::ErrorObject;
 use sc_transaction_pool_api::error::Error as PoolError;
 use sp_runtime::transaction_validity::InvalidTransaction;
 
@@ -98,3 +99,29 @@ impl<Hash> From<Error> for TransactionEvent<Hash> {
 		}
 	}
 }
+
+/// TransactionBroadcast error.
+#[derive(Debug, thiserror::Error)]
+pub enum ErrorBroadcast {
+	/// The provided operation ID is invalid.
+	#[error("Invalid operation id")]
+	InvalidOperationID,
+}
+
+/// General purpose errors, as defined in
+/// <https://www.jsonrpc.org/specification#error_object>.
+pub mod json_rpc_spec {
+	/// Invalid parameter error.
+	pub const INVALID_PARAM_ERROR: i32 = -32602;
+}
+
+impl From<ErrorBroadcast> for ErrorObject<'static> {
+	fn from(e: ErrorBroadcast) -> Self {
+		let msg = e.to_string();
+
+		match e {
+			ErrorBroadcast::InvalidOperationID =>
+				ErrorObject::owned(json_rpc_spec::INVALID_PARAM_ERROR, msg, None::<()>),
+		}
+	}
+}
diff --git a/substrate/client/rpc-spec-v2/src/transaction/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/mod.rs
index 212912ba1c728feb5e01614a74fcab09ce5bc079..74268a5372a37a7f40d79e50d2ecf6659315d534 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/mod.rs
@@ -25,14 +25,19 @@
 //!
 //! Methods are prefixed by `transaction`.
 
+#[cfg(test)]
+mod tests;
+
 pub mod api;
 pub mod error;
 pub mod event;
 pub mod transaction;
+pub mod transaction_broadcast;
 
-pub use api::TransactionApiServer;
+pub use api::{TransactionApiServer, TransactionBroadcastApiServer};
 pub use event::{
 	TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
 	TransactionEvent,
 };
 pub use transaction::Transaction;
+pub use transaction_broadcast::TransactionBroadcast;
diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs
new file mode 100644
index 0000000000000000000000000000000000000000..45477494768ae8dde854ba1190568d9db9cbe8f2
--- /dev/null
+++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs
@@ -0,0 +1,238 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+use super::*;
+use crate::{
+	chain_head::test_utils::ChainHeadMockClient, hex_string,
+	transaction::TransactionBroadcast as RpcTransactionBroadcast,
+};
+use assert_matches::assert_matches;
+use codec::Encode;
+use futures::Future;
+use jsonrpsee::{core::error::Error, rpc_params, RpcModule};
+use sc_transaction_pool::*;
+use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool};
+use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
+use std::{pin::Pin, sync::Arc, time::Duration};
+use substrate_test_runtime_client::{prelude::*, AccountKeyring::*, Client};
+use substrate_test_runtime_transaction_pool::{uxt, TestApi};
+use tokio::sync::mpsc;
+
+type Block = substrate_test_runtime_client::runtime::Block;
+
+/// Wrap the `TaskExecutor` to know when the broadcast future is dropped.
+#[derive(Clone)]
+struct TaskExecutorBroadcast {
+	executor: TaskExecutor,
+	sender: mpsc::UnboundedSender<()>,
+}
+
+/// The channel that receives events when the broadcast futures are dropped.
+type TaskExecutorRecv = mpsc::UnboundedReceiver<()>;
+
+impl TaskExecutorBroadcast {
+	/// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures
+	/// are dropped.
+	fn new() -> (Self, TaskExecutorRecv) {
+		let (sender, recv) = mpsc::unbounded_channel();
+
+		(Self { executor: TaskExecutor::new(), sender }, recv)
+	}
+}
+
+impl SpawnNamed for TaskExecutorBroadcast {
+	fn spawn(
+		&self,
+		name: &'static str,
+		group: Option<&'static str>,
+		future: futures::future::BoxFuture<'static, ()>,
+	) {
+		let sender = self.sender.clone();
+		let future = Box::pin(async move {
+			future.await;
+			let _ = sender.send(());
+		});
+
+		self.executor.spawn(name, group, future)
+	}
+
+	fn spawn_blocking(
+		&self,
+		name: &'static str,
+		group: Option<&'static str>,
+		future: futures::future::BoxFuture<'static, ()>,
+	) {
+		let sender = self.sender.clone();
+		let future = Box::pin(async move {
+			future.await;
+			let _ = sender.send(());
+		});
+
+		self.executor.spawn_blocking(name, group, future)
+	}
+}
+
+/// Initial Alice account nonce.
+const ALICE_NONCE: u64 = 209;
+
+fn create_basic_pool_with_genesis(
+	test_api: Arc<TestApi>,
+) -> (BasicPool<TestApi, Block>, Pin<Box<dyn Future<Output = ()> + Send>>) {
+	let genesis_hash = {
+		test_api
+			.chain()
+			.read()
+			.block_by_number
+			.get(&0)
+			.map(|blocks| blocks[0].0.header.hash())
+			.expect("there is block 0. qed")
+	};
+	BasicPool::new_test(test_api, genesis_hash, genesis_hash)
+}
+
+fn maintained_pool() -> (BasicPool<TestApi, Block>, Arc<TestApi>, futures::executor::ThreadPool) {
+	let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE));
+	let (pool, background_task) = create_basic_pool_with_genesis(api.clone());
+
+	let thread_pool = futures::executor::ThreadPool::new().unwrap();
+	thread_pool.spawn_ok(background_task);
+	(pool, api, thread_pool)
+}
+
+fn setup_api() -> (
+	Arc<TestApi>,
+	Arc<BasicPool<TestApi, Block>>,
+	Arc<ChainHeadMockClient<Client<Backend>>>,
+	RpcModule<
+		TransactionBroadcast<BasicPool<TestApi, Block>, ChainHeadMockClient<Client<Backend>>>,
+	>,
+	TaskExecutorRecv,
+) {
+	let (pool, api, _) = maintained_pool();
+	let pool = Arc::new(pool);
+
+	let builder = TestClientBuilder::new();
+	let client = Arc::new(builder.build());
+	let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));
+
+	let (task_executor, executor_recv) = TaskExecutorBroadcast::new();
+
+	let tx_api =
+		RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor))
+			.into_rpc();
+
+	(api, pool, client_mock, tx_api, executor_recv)
+}
+
+#[tokio::test]
+async fn tx_broadcast_enters_pool() {
+	let (api, pool, client_mock, tx_api, _) = setup_api();
+
+	// Start at block 1.
+	let block_1_header = api.push_block(1, vec![], true);
+
+	let uxt = uxt(Alice, ALICE_NONCE);
+	let xt = hex_string(&uxt.encode());
+
+	let operation_id: String =
+		tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
+
+	// Announce block 1 to `transaction_unstable_broadcast`.
+	client_mock.trigger_import_stream(block_1_header).await;
+
+	// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
+
+	// TODO: Improve testability by extending the `transaction_unstable_broadcast` with
+	// a middleware trait that intercepts the transaction status for testing.
+	let mut num_retries = 12;
+	while num_retries > 0 && pool.status().ready != 1 {
+		tokio::time::sleep(Duration::from_secs(5)).await;
+		num_retries -= 1;
+	}
+	assert_eq!(1, pool.status().ready);
+	assert_eq!(uxt.encode().len(), pool.status().ready_bytes);
+
+	// Import block 2 with the transaction included.
+	let block_2_header = api.push_block(2, vec![uxt.clone()], true);
+	let block_2 = block_2_header.hash();
+
+	// Announce block 2 to the pool.
+	let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
+	pool.maintain(event).await;
+
+	assert_eq!(0, pool.status().ready);
+
+	// Stop call can still be made.
+	let _: () = tx_api
+		.call("transaction_unstable_stop", rpc_params![&operation_id])
+		.await
+		.unwrap();
+}
+
+#[tokio::test]
+async fn tx_broadcast_invalid_tx() {
+	let (_, pool, _, tx_api, mut exec_recv) = setup_api();
+
+	// Invalid parameters.
+	let err = tx_api
+		.call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8])
+		.await
+		.unwrap_err();
+	assert_matches!(err,
+		Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params"
+	);
+
+	assert_eq!(0, pool.status().ready);
+
+	// Invalid transaction that cannot be decoded. The broadcast silently exits.
+	let xt = "0xdeadbeef";
+	let operation_id: String =
+		tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
+
+	assert_eq!(0, pool.status().ready);
+
+	// Await the broadcast future to exit.
+	// Without this we'd be subject to races, where we try to call the stop before the tx is
+	// dropped.
+	exec_recv.recv().await.unwrap();
+
+	// The broadcast future was dropped, and the operation is no longer active.
+	// When the operation is not active, either from the tx being finalized or a
+	// terminal error; the stop method should return an error.
+	let err = tx_api
+		.call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id])
+		.await
+		.unwrap_err();
+	assert_matches!(err,
+		Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
+	);
+}
+
+#[tokio::test]
+async fn tx_invalid_stop() {
+	let (_, _, _, tx_api, _) = setup_api();
+
+	// Make an invalid stop call.
+	let err = tx_api
+		.call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"])
+		.await
+		.unwrap_err();
+	assert_matches!(err,
+		Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
+	);
+}
diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs
index b2cfa36c9c99f7bd3db5072e8adaabd1c6fed962..17889b3bad2a55794cc191514106fe02274512b6 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs
@@ -29,21 +29,18 @@ use crate::{
 	},
 	SubscriptionTaskExecutor,
 };
+use codec::Decode;
+use futures::{StreamExt, TryFutureExt};
 use jsonrpsee::{core::async_trait, types::error::ErrorObject, PendingSubscriptionSink};
+use sc_rpc::utils::pipe_from_stream;
 use sc_transaction_pool_api::{
 	error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
 	TransactionStatus,
 };
-use std::sync::Arc;
-
-use sc_rpc::utils::pipe_from_stream;
-use sp_api::ProvideRuntimeApi;
 use sp_blockchain::HeaderBackend;
 use sp_core::Bytes;
 use sp_runtime::traits::Block as BlockT;
-
-use codec::Decode;
-use futures::{StreamExt, TryFutureExt};
+use std::sync::Arc;
 
 /// An API for transaction RPC calls.
 pub struct Transaction<Pool, Client> {
@@ -82,7 +79,7 @@ where
 	Pool: TransactionPool + Sync + Send + 'static,
 	Pool::Hash: Unpin,
 	<Pool::Block as BlockT>::Hash: Unpin,
-	Client: HeaderBackend<Pool::Block> + ProvideRuntimeApi<Pool::Block> + Send + Sync + 'static,
+	Client: HeaderBackend<Pool::Block> + Send + Sync + 'static,
 {
 	fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) {
 		let client = self.client.clone();
diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs
new file mode 100644
index 0000000000000000000000000000000000000000..92c838261874a816335b6d7e82c7df7667e2c235
--- /dev/null
+++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs
@@ -0,0 +1,251 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+//! API implementation for broadcasting transactions.
+
+use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor};
+use codec::Decode;
+use futures::{FutureExt, Stream, StreamExt};
+use futures_util::stream::AbortHandle;
+use jsonrpsee::core::{async_trait, RpcResult};
+use parking_lot::RwLock;
+use rand::{distributions::Alphanumeric, Rng};
+use sc_client_api::BlockchainEvents;
+use sc_transaction_pool_api::{
+	error::IntoPoolError, TransactionFor, TransactionPool, TransactionSource,
+};
+use sp_blockchain::HeaderBackend;
+use sp_core::Bytes;
+use sp_runtime::traits::Block as BlockT;
+use std::{collections::HashMap, sync::Arc};
+
+use super::error::ErrorBroadcast;
+
+/// An API for transaction RPC calls.
+pub struct TransactionBroadcast<Pool, Client> {
+	/// Substrate client.
+	client: Arc<Client>,
+	/// Transactions pool.
+	pool: Arc<Pool>,
+	/// Executor to spawn subscriptions.
+	executor: SubscriptionTaskExecutor,
+	/// The brodcast operation IDs.
+	broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState>>>,
+}
+
+/// The state of a broadcast operation.
+struct BroadcastState {
+	/// Handle to abort the running future that broadcasts the transaction.
+	handle: AbortHandle,
+}
+
+impl<Pool, Client> TransactionBroadcast<Pool, Client> {
+	/// Creates a new [`TransactionBroadcast`].
+	pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
+		TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() }
+	}
+
+	/// Generate an unique operation ID for the `transaction_broadcast` RPC method.
+	pub fn generate_unique_id(&self) -> String {
+		let generate_operation_id = || {
+			// The length of the operation ID.
+			const OPERATION_ID_LEN: usize = 16;
+
+			rand::thread_rng()
+				.sample_iter(Alphanumeric)
+				.take(OPERATION_ID_LEN)
+				.map(char::from)
+				.collect::<String>()
+		};
+
+		let mut id = generate_operation_id();
+
+		let broadcast_ids = self.broadcast_ids.read();
+
+		while broadcast_ids.contains_key(&id) {
+			id = generate_operation_id();
+		}
+
+		id
+	}
+}
+
+/// Currently we treat all RPC transactions as externals.
+///
+/// Possibly in the future we could allow opt-in for special treatment
+/// of such transactions, so that the block authors can inject
+/// some unique transactions via RPC and have them included in the pool.
+const TX_SOURCE: TransactionSource = TransactionSource::External;
+
+#[async_trait]
+impl<Pool, Client> TransactionBroadcastApiServer for TransactionBroadcast<Pool, Client>
+where
+	Pool: TransactionPool + Sync + Send + 'static,
+	Pool::Error: IntoPoolError,
+	<Pool::Block as BlockT>::Hash: Unpin,
+	Client: HeaderBackend<Pool::Block> + BlockchainEvents<Pool::Block> + Send + Sync + 'static,
+{
+	fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>> {
+		let pool = self.pool.clone();
+
+		// The unique ID of this operation.
+		let id = self.generate_unique_id();
+
+		let mut best_block_import_stream =
+			Box::pin(self.client.import_notification_stream().filter_map(
+				|notification| async move { notification.is_new_best.then_some(notification.hash) },
+			));
+
+		let broadcast_transaction_fut = async move {
+			// There is nothing we could do with an extrinsic of invalid format.
+			let Ok(decoded_extrinsic) = TransactionFor::<Pool>::decode(&mut &bytes[..]) else {
+				return;
+			};
+
+			// Flag to determine if the we should broadcast the transaction again.
+			let mut is_done = false;
+
+			while !is_done {
+				// Wait for the last block to become available.
+				let Some(best_block_hash) =
+					last_stream_element(&mut best_block_import_stream).await
+				else {
+					return;
+				};
+
+				let mut stream = match pool
+					.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone())
+					.await
+				{
+					Ok(stream) => stream,
+					// The transaction was not included to the pool.
+					Err(e) => {
+						let Ok(pool_err) = e.into_pool_error() else { return };
+
+						if pool_err.is_retriable() {
+							// Try to resubmit the transaction at a later block for
+							// recoverable errors.
+							continue
+						} else {
+							return;
+						}
+					},
+				};
+
+				while let Some(event) = stream.next().await {
+					// Check if the transaction could be submitted again
+					// at a later time.
+					if event.is_retriable() {
+						break;
+					}
+
+					// Stop if this is the final event of the transaction stream
+					// and the event is not retriable.
+					if event.is_final() {
+						is_done = true;
+						break;
+					}
+				}
+			}
+		};
+
+		// Convert the future into an abortable future, for easily terminating it from the
+		// `transaction_stop` method.
+		let (fut, handle) = futures::future::abortable(broadcast_transaction_fut);
+		let broadcast_ids = self.broadcast_ids.clone();
+		let drop_id = id.clone();
+		// The future expected by the executor must be `Future<Output = ()>` instead of
+		// `Future<Output = Result<(), Aborted>>`.
+		let fut = fut.map(move |_| {
+			// Remove the entry from the broadcast IDs map.
+			broadcast_ids.write().remove(&drop_id);
+		});
+
+		// Keep track of this entry and the abortable handle.
+		{
+			let mut broadcast_ids = self.broadcast_ids.write();
+			broadcast_ids.insert(id.clone(), BroadcastState { handle });
+		}
+
+		sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
+
+		Ok(Some(id))
+	}
+
+	fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast> {
+		let mut broadcast_ids = self.broadcast_ids.write();
+
+		let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else {
+			return Err(ErrorBroadcast::InvalidOperationID)
+		};
+
+		broadcast_state.handle.abort();
+
+		Ok(())
+	}
+}
+
+/// Returns the last element of the providided stream, or `None` if the stream is closed.
+async fn last_stream_element<S>(stream: &mut S) -> Option<S::Item>
+where
+	S: Stream + Unpin,
+{
+	let Some(mut element) = stream.next().await else { return None };
+
+	// We are effectively polling the stream for the last available item at this time.
+	// The `now_or_never` returns `None` if the stream is `Pending`.
+	//
+	// If the stream contains `Hash0x1 Hash0x2 Hash0x3 Hash0x4`, we want only `Hash0x4`.
+	while let Some(next) = stream.next().now_or_never() {
+		let Some(next) = next else {
+			// Nothing to do if the stream terminated.
+			return None
+		};
+		element = next;
+	}
+
+	Some(element)
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use tokio_stream::wrappers::ReceiverStream;
+
+	#[tokio::test]
+	async fn check_last_stream_element() {
+		let (tx, rx) = tokio::sync::mpsc::channel(16);
+
+		let mut stream = ReceiverStream::new(rx);
+		// Check the stream with one element queued.
+		tx.send(1).await.unwrap();
+		assert_eq!(last_stream_element(&mut stream).await, Some(1));
+
+		// Check the stream with multiple elements.
+		tx.send(1).await.unwrap();
+		tx.send(2).await.unwrap();
+		tx.send(3).await.unwrap();
+		assert_eq!(last_stream_element(&mut stream).await, Some(3));
+
+		// Drop the stream with some elements
+		tx.send(1).await.unwrap();
+		tx.send(2).await.unwrap();
+		drop(tx);
+		assert_eq!(last_stream_element(&mut stream).await, None);
+	}
+}