diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock
index 49b3dd3cf957b447af04a6a5c5c71d949f46a0b0..04b90dfffba1e5bdddf6e99f48eb989922641ec2 100644
--- a/substrate/Cargo.lock
+++ b/substrate/Cargo.lock
@@ -8792,10 +8792,19 @@ dependencies = [
 name = "sc-rpc-spec-v2"
 version = "0.10.0-dev"
 dependencies = [
+ "futures",
  "hex",
  "jsonrpsee",
+ "parity-scale-codec",
  "sc-chain-spec",
+ "sc-transaction-pool-api",
+ "serde",
  "serde_json",
+ "sp-api",
+ "sp-blockchain",
+ "sp-core",
+ "sp-runtime",
+ "thiserror",
  "tokio",
 ]
 
@@ -8848,6 +8857,7 @@ dependencies = [
  "sc-offchain",
  "sc-rpc",
  "sc-rpc-server",
+ "sc-rpc-spec-v2",
  "sc-sysinfo",
  "sc-telemetry",
  "sc-tracing",
@@ -9068,6 +9078,7 @@ dependencies = [
  "futures",
  "log",
  "serde",
+ "serde_json",
  "sp-blockchain",
  "sp-runtime",
  "thiserror",
diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml
index 12dec7464e6d019ceb43d9cc8824ecd011bd33c9..885d415eb50d2ac30bee50340e3584f65caaddc7 100644
--- a/substrate/client/rpc-spec-v2/Cargo.toml
+++ b/substrate/client/rpc-spec-v2/Cargo.toml
@@ -16,7 +16,17 @@ targets = ["x86_64-unknown-linux-gnu"]
 jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
 # Internal chain structures for "chain_spec".
 sc-chain-spec = { version = "4.0.0-dev", path = "../chain-spec" }
+# Pool for submitting extrinsics required by "transaction"
+sc-transaction-pool-api = { version = "4.0.0-dev", path = "../transaction-pool/api" }
+sp-core = { version = "6.0.0", path = "../../primitives/core" }
+sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" }
+sp-api = { version = "4.0.0-dev", path = "../../primitives/api" }
+sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
+codec = { package = "parity-scale-codec", version = "3.0.0" }
+thiserror = "1.0"
+serde = "1.0"
 hex = "0.4"
+futures = "0.3.21"
 
 [dev-dependencies]
 serde_json = "1.0"
diff --git a/substrate/client/rpc-spec-v2/src/lib.rs b/substrate/client/rpc-spec-v2/src/lib.rs
index 297fda13172d6c6b3395d6fa1048a214bdc677ab..f4b9d2f95bf97657116f956d15b0ea8e587762bb 100644
--- a/substrate/client/rpc-spec-v2/src/lib.rs
+++ b/substrate/client/rpc-spec-v2/src/lib.rs
@@ -24,3 +24,7 @@
 #![deny(unused_crate_dependencies)]
 
 pub mod chain_spec;
+pub mod transaction;
+
+/// Task executor that is being used by RPC subscriptions.
+pub type SubscriptionTaskExecutor = std::sync::Arc<dyn sp_core::traits::SpawnNamed>;
diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs
new file mode 100644
index 0000000000000000000000000000000000000000..2f0c799f1cc19d9254b3b4a3bfd900d5580d5611
--- /dev/null
+++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs
@@ -0,0 +1,37 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2022 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+//! API trait for transactions.
+
+use crate::transaction::event::TransactionEvent;
+use jsonrpsee::proc_macros::rpc;
+use sp_core::Bytes;
+
+#[rpc(client, server)]
+pub trait TransactionApi<Hash: Clone> {
+	/// Submit an extrinsic to watch.
+	///
+	/// See [`TransactionEvent`](crate::transaction::event::TransactionEvent) for details on
+	/// transaction life cycle.
+	#[subscription(
+		name = "transaction_unstable_submitAndWatch" => "transaction_unstable_submitExtrinsic",
+		unsubscribe = "transaction_unstable_unwatch",
+		item = TransactionEvent<Hash>,
+	)]
+	fn submit_and_watch(&self, bytes: Bytes);
+}
diff --git a/substrate/client/rpc-spec-v2/src/transaction/error.rs b/substrate/client/rpc-spec-v2/src/transaction/error.rs
new file mode 100644
index 0000000000000000000000000000000000000000..72a5959992f9e490e6f959ac1afe92a25693f19f
--- /dev/null
+++ b/substrate/client/rpc-spec-v2/src/transaction/error.rs
@@ -0,0 +1,100 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2022 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+//! Transaction RPC errors.
+//!
+//! Errors are interpreted as transaction events for subscriptions.
+
+use crate::transaction::event::{TransactionError, TransactionEvent};
+use sc_transaction_pool_api::error::Error as PoolError;
+use sp_runtime::transaction_validity::InvalidTransaction;
+
+/// Transaction RPC errors.
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+	/// Transaction pool error.
+	#[error("Transaction pool error: {}", .0)]
+	Pool(#[from] PoolError),
+	/// Verification error.
+	#[error("Extrinsic verification error: {}", .0)]
+	Verification(Box<dyn std::error::Error + Send + Sync>),
+}
+
+impl<Hash> From<Error> for TransactionEvent<Hash> {
+	fn from(e: Error) -> Self {
+		match e {
+			Error::Verification(e) => TransactionEvent::Invalid(TransactionError {
+				error: format!("Verification error: {}", e),
+			}),
+			Error::Pool(PoolError::InvalidTransaction(InvalidTransaction::Custom(e))) =>
+				TransactionEvent::Invalid(TransactionError {
+					error: format!("Invalid transaction with custom error: {}", e),
+				}),
+			Error::Pool(PoolError::InvalidTransaction(e)) => {
+				let msg: &str = e.into();
+				TransactionEvent::Invalid(TransactionError {
+					error: format!("Invalid transaction: {}", msg),
+				})
+			},
+			Error::Pool(PoolError::UnknownTransaction(e)) => {
+				let msg: &str = e.into();
+				TransactionEvent::Invalid(TransactionError {
+					error: format!("Unknown transaction validity: {}", msg),
+				})
+			},
+			Error::Pool(PoolError::TemporarilyBanned) =>
+				TransactionEvent::Invalid(TransactionError {
+					error: "Transaction is temporarily banned".into(),
+				}),
+			Error::Pool(PoolError::AlreadyImported(_)) =>
+				TransactionEvent::Invalid(TransactionError {
+					error: "Transaction is already imported".into(),
+				}),
+			Error::Pool(PoolError::TooLowPriority { old, new }) =>
+				TransactionEvent::Invalid(TransactionError {
+					error: format!(
+						"The priority of the transactin is too low (pool {} > current {})",
+						old, new
+					),
+				}),
+			Error::Pool(PoolError::CycleDetected) => TransactionEvent::Invalid(TransactionError {
+				error: "The transaction contains a cyclic dependency".into(),
+			}),
+			Error::Pool(PoolError::ImmediatelyDropped) =>
+				TransactionEvent::Invalid(TransactionError {
+					error: "The transaction could not enter the pool because of the limit".into(),
+				}),
+			Error::Pool(PoolError::Unactionable) => TransactionEvent::Invalid(TransactionError {
+				error: "Transaction cannot be propagated and the local node does not author blocks"
+					.into(),
+			}),
+			Error::Pool(PoolError::NoTagsProvided) => TransactionEvent::Invalid(TransactionError {
+				error: "Transaction does not provide any tags, so the pool cannot identify it"
+					.into(),
+			}),
+			Error::Pool(PoolError::InvalidBlockId(_)) =>
+				TransactionEvent::Invalid(TransactionError {
+					error: "The provided block ID is not valid".into(),
+				}),
+			Error::Pool(PoolError::RejectedFutureTransaction) =>
+				TransactionEvent::Invalid(TransactionError {
+					error: "The pool is not accepting future transactions".into(),
+				}),
+		}
+	}
+}
diff --git a/substrate/client/rpc-spec-v2/src/transaction/event.rs b/substrate/client/rpc-spec-v2/src/transaction/event.rs
new file mode 100644
index 0000000000000000000000000000000000000000..3c75eaff10fd4e6a9882880127a44791677dfa4c
--- /dev/null
+++ b/substrate/client/rpc-spec-v2/src/transaction/event.rs
@@ -0,0 +1,353 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2022 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+//! The transaction's event returned as json compatible object.
+
+use serde::{Deserialize, Serialize};
+
+/// The transaction was broadcasted to a number of peers.
+///
+/// # Note
+///
+/// The RPC does not guarantee that the peers have received the
+/// transaction.
+///
+/// When the number of peers is zero, the event guarantees that
+/// shutting down the local node will lead to the transaction
+/// not being included in the chain.
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TransactionBroadcasted {
+	/// The number of peers the transaction was broadcasted to.
+	#[serde(with = "as_string")]
+	pub num_peers: usize,
+}
+
+/// The transaction was included in a block of the chain.
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TransactionBlock<Hash> {
+	/// The hash of the block the transaction was included into.
+	pub hash: Hash,
+	/// The index (zero-based) of the transaction within the body of the block.
+	#[serde(with = "as_string")]
+	pub index: usize,
+}
+
+/// The transaction could not be processed due to an error.
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TransactionError {
+	/// Reason of the error.
+	pub error: String,
+}
+
+/// The transaction was dropped because of exceeding limits.
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TransactionDropped {
+	/// True if the transaction was broadcasted to other peers and
+	/// may still be included in the block.
+	pub broadcasted: bool,
+	/// Reason of the event.
+	pub error: String,
+}
+
+/// Possible transaction status events.
+///
+/// The status events can be grouped based on their kinds as:
+///
+/// 1. Runtime validated the transaction:
+/// 		- `Validated`
+///
+/// 2. Inside the `Ready` queue:
+/// 		- `Broadcast`
+///
+/// 3. Leaving the pool:
+/// 		- `BestChainBlockIncluded`
+/// 		- `Invalid`
+///
+/// 4. Block finalized:
+/// 		- `Finalized`
+///
+/// 5. At any time:
+/// 		- `Dropped`
+/// 		- `Error`
+///
+/// The subscription's stream is considered finished whenever the following events are
+/// received: `Finalized`, `Error`, `Invalid` or `Dropped`. However, the user is allowed
+/// to unsubscribe at any moment.
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+// We need to manually specify the trait bounds for the `Hash` trait to ensure `into` and
+// `from` still work.
+#[serde(bound(
+	serialize = "Hash: Serialize + Clone",
+	deserialize = "Hash: Deserialize<'de> + Clone"
+))]
+#[serde(into = "TransactionEventIR<Hash>", from = "TransactionEventIR<Hash>")]
+pub enum TransactionEvent<Hash> {
+	/// The transaction was validated by the runtime.
+	Validated,
+	/// The transaction was broadcasted to a number of peers.
+	Broadcasted(TransactionBroadcasted),
+	/// The transaction was included in a best block of the chain.
+	///
+	/// # Note
+	///
+	/// This may contain `None` if the block is no longer a best
+	/// block of the chain.
+	BestChainBlockIncluded(Option<TransactionBlock<Hash>>),
+	/// The transaction was included in a finalized block.
+	Finalized(TransactionBlock<Hash>),
+	/// The transaction could not be processed due to an error.
+	Error(TransactionError),
+	/// The transaction is marked as invalid.
+	Invalid(TransactionError),
+	/// The client was not capable of keeping track of this transaction.
+	Dropped(TransactionDropped),
+}
+
+/// Intermediate representation (IR) for the transaction events
+/// that handles block events only.
+///
+/// The block events require a JSON compatible interpretation similar to:
+///
+/// ```json
+/// { event: "EVENT", block: { hash: "0xFF", index: 0 } }
+/// ```
+///
+/// This IR is introduced to circumvent that the block events need to
+/// be serialized/deserialized with "tag" and "content", while other
+/// events only require "tag".
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+#[serde(tag = "event", content = "block")]
+enum TransactionEventBlockIR<Hash> {
+	/// The transaction was included in the best block of the chain.
+	BestChainBlockIncluded(Option<TransactionBlock<Hash>>),
+	/// The transaction was included in a finalized block of the chain.
+	Finalized(TransactionBlock<Hash>),
+}
+
+/// Intermediate representation (IR) for the transaction events
+/// that handles non-block events only.
+///
+/// The non-block events require a JSON compatible interpretation similar to:
+///
+/// ```json
+/// { event: "EVENT", num_peers: 0 }
+/// ```
+///
+/// This IR is introduced to circumvent that the block events need to
+/// be serialized/deserialized with "tag" and "content", while other
+/// events only require "tag".
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+#[serde(tag = "event")]
+enum TransactionEventNonBlockIR {
+	Validated,
+	Broadcasted(TransactionBroadcasted),
+	Error(TransactionError),
+	Invalid(TransactionError),
+	Dropped(TransactionDropped),
+}
+
+/// Intermediate representation (IR) used for serialization/deserialization of the
+/// [`TransactionEvent`] in a JSON compatible format.
+///
+/// Serde cannot mix `#[serde(tag = "event")]` with `#[serde(tag = "event", content = "block")]`
+/// for specific enum variants. Therefore, this IR is introduced to circumvent this
+/// restriction, while exposing a simplified [`TransactionEvent`] for users of the
+/// rust ecosystem.
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(bound(serialize = "Hash: Serialize", deserialize = "Hash: Deserialize<'de>"))]
+#[serde(rename_all = "camelCase")]
+#[serde(untagged)]
+enum TransactionEventIR<Hash> {
+	Block(TransactionEventBlockIR<Hash>),
+	NonBlock(TransactionEventNonBlockIR),
+}
+
+impl<Hash> From<TransactionEvent<Hash>> for TransactionEventIR<Hash> {
+	fn from(value: TransactionEvent<Hash>) -> Self {
+		match value {
+			TransactionEvent::Validated =>
+				TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Validated),
+			TransactionEvent::Broadcasted(event) =>
+				TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Broadcasted(event)),
+			TransactionEvent::BestChainBlockIncluded(event) =>
+				TransactionEventIR::Block(TransactionEventBlockIR::BestChainBlockIncluded(event)),
+			TransactionEvent::Finalized(event) =>
+				TransactionEventIR::Block(TransactionEventBlockIR::Finalized(event)),
+			TransactionEvent::Error(event) =>
+				TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Error(event)),
+			TransactionEvent::Invalid(event) =>
+				TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Invalid(event)),
+			TransactionEvent::Dropped(event) =>
+				TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Dropped(event)),
+		}
+	}
+}
+
+impl<Hash> From<TransactionEventIR<Hash>> for TransactionEvent<Hash> {
+	fn from(value: TransactionEventIR<Hash>) -> Self {
+		match value {
+			TransactionEventIR::NonBlock(status) => match status {
+				TransactionEventNonBlockIR::Validated => TransactionEvent::Validated,
+				TransactionEventNonBlockIR::Broadcasted(event) =>
+					TransactionEvent::Broadcasted(event),
+				TransactionEventNonBlockIR::Error(event) => TransactionEvent::Error(event),
+				TransactionEventNonBlockIR::Invalid(event) => TransactionEvent::Invalid(event),
+				TransactionEventNonBlockIR::Dropped(event) => TransactionEvent::Dropped(event),
+			},
+			TransactionEventIR::Block(block) => match block {
+				TransactionEventBlockIR::Finalized(event) => TransactionEvent::Finalized(event),
+				TransactionEventBlockIR::BestChainBlockIncluded(event) =>
+					TransactionEvent::BestChainBlockIncluded(event),
+			},
+		}
+	}
+}
+
+/// Serialize and deserialize helper as string.
+mod as_string {
+	use super::*;
+	use serde::{Deserializer, Serializer};
+
+	pub fn serialize<S: Serializer>(data: &usize, serializer: S) -> Result<S::Ok, S::Error> {
+		data.to_string().serialize(serializer)
+	}
+
+	pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<usize, D::Error> {
+		String::deserialize(deserializer)?
+			.parse()
+			.map_err(|e| serde::de::Error::custom(format!("Parsing failed: {}", e)))
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use sp_core::H256;
+
+	#[test]
+	fn validated_event() {
+		let event: TransactionEvent<()> = TransactionEvent::Validated;
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"event":"validated"}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, event);
+	}
+
+	#[test]
+	fn broadcasted_event() {
+		let event: TransactionEvent<()> =
+			TransactionEvent::Broadcasted(TransactionBroadcasted { num_peers: 2 });
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"event":"broadcasted","numPeers":"2"}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, event);
+	}
+
+	#[test]
+	fn best_chain_event() {
+		let event: TransactionEvent<()> = TransactionEvent::BestChainBlockIncluded(None);
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"event":"bestChainBlockIncluded","block":null}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, event);
+
+		let event: TransactionEvent<H256> =
+			TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
+				hash: H256::from_low_u64_be(1),
+				index: 2,
+			}));
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"event":"bestChainBlockIncluded","block":{"hash":"0x0000000000000000000000000000000000000000000000000000000000000001","index":"2"}}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionEvent<H256> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, event);
+	}
+
+	#[test]
+	fn finalized_event() {
+		let event: TransactionEvent<H256> = TransactionEvent::Finalized(TransactionBlock {
+			hash: H256::from_low_u64_be(1),
+			index: 10,
+		});
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"event":"finalized","block":{"hash":"0x0000000000000000000000000000000000000000000000000000000000000001","index":"10"}}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionEvent<H256> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, event);
+	}
+
+	#[test]
+	fn error_event() {
+		let event: TransactionEvent<()> =
+			TransactionEvent::Error(TransactionError { error: "abc".to_string() });
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"event":"error","error":"abc"}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, event);
+	}
+
+	#[test]
+	fn invalid_event() {
+		let event: TransactionEvent<()> =
+			TransactionEvent::Invalid(TransactionError { error: "abc".to_string() });
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"event":"invalid","error":"abc"}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, event);
+	}
+
+	#[test]
+	fn dropped_event() {
+		let event: TransactionEvent<()> = TransactionEvent::Dropped(TransactionDropped {
+			broadcasted: true,
+			error: "abc".to_string(),
+		});
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"event":"dropped","broadcasted":true,"error":"abc"}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, event);
+	}
+}
diff --git a/substrate/client/rpc-spec-v2/src/transaction/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..bb983894a428c00d7220d4ebb46c3d6048c4c949
--- /dev/null
+++ b/substrate/client/rpc-spec-v2/src/transaction/mod.rs
@@ -0,0 +1,38 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2022 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+//! Substrate transaction API.
+//!
+//! The transaction methods allow submitting a transaction and subscribing to
+//! its status updates generated by the chain.
+//!
+//! # Note
+//!
+//! Methods are prefixed by `transaction`.
+
+pub mod api;
+pub mod error;
+pub mod event;
+pub mod transaction;
+
+pub use api::TransactionApiServer;
+pub use event::{
+	TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
+	TransactionEvent,
+};
+pub use transaction::Transaction;
diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs
new file mode 100644
index 0000000000000000000000000000000000000000..e2cf736dff17a1deaf177184380d404d767f1939
--- /dev/null
+++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs
@@ -0,0 +1,208 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2022 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+//! API implementation for submitting transactions.
+
+use crate::{
+	transaction::{
+		api::TransactionApiServer,
+		error::Error,
+		event::{
+			TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
+			TransactionEvent,
+		},
+	},
+	SubscriptionTaskExecutor,
+};
+use jsonrpsee::{
+	core::async_trait,
+	types::{
+		error::{CallError, ErrorObject},
+		SubscriptionResult,
+	},
+	SubscriptionSink,
+};
+use sc_transaction_pool_api::{
+	error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
+	TransactionStatus,
+};
+use std::sync::Arc;
+
+use sp_api::ProvideRuntimeApi;
+use sp_blockchain::HeaderBackend;
+use sp_core::Bytes;
+use sp_runtime::{generic, traits::Block as BlockT};
+
+use codec::Decode;
+use futures::{FutureExt, StreamExt, TryFutureExt};
+
+/// An API for transaction RPC calls.
+pub struct Transaction<Pool, Client> {
+	/// Substrate client.
+	client: Arc<Client>,
+	/// Transactions pool.
+	pool: Arc<Pool>,
+	/// Executor to spawn subscriptions.
+	executor: SubscriptionTaskExecutor,
+}
+
+impl<Pool, Client> Transaction<Pool, Client> {
+	/// Creates a new [`Transaction`].
+	pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
+		Transaction { client, pool, executor }
+	}
+}
+
+/// Currently we treat all RPC transactions as externals.
+///
+/// Possibly in the future we could allow opt-in for special treatment
+/// of such transactions, so that the block authors can inject
+/// some unique transactions via RPC and have them included in the pool.
+const TX_SOURCE: TransactionSource = TransactionSource::External;
+
+/// Extrinsic has an invalid format.
+///
+/// # Note
+///
+/// This is similar to the old `author` API error code.
+const BAD_FORMAT: i32 = 1001;
+
+#[async_trait]
+impl<Pool, Client> TransactionApiServer<BlockHash<Pool>> for Transaction<Pool, Client>
+where
+	Pool: TransactionPool + Sync + Send + 'static,
+	Pool::Hash: Unpin,
+	<Pool::Block as BlockT>::Hash: Unpin,
+	Client: HeaderBackend<Pool::Block> + ProvideRuntimeApi<Pool::Block> + Send + Sync + 'static,
+{
+	fn submit_and_watch(&self, mut sink: SubscriptionSink, xt: Bytes) -> SubscriptionResult {
+		// This is the only place where the RPC server can return an error for this
+		// subscription. Other defects must be signaled as events to the sink.
+		let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
+			Ok(decoded_extrinsic) => decoded_extrinsic,
+			Err(e) => {
+				let err = CallError::Custom(ErrorObject::owned(
+					BAD_FORMAT,
+					format!("Extrinsic has invalid format: {}", e),
+					None::<()>,
+				));
+				let _ = sink.reject(err);
+				return Ok(())
+			},
+		};
+
+		let best_block_hash = self.client.info().best_hash;
+
+		let submit = self
+			.pool
+			.submit_and_watch(
+				&generic::BlockId::hash(best_block_hash),
+				TX_SOURCE,
+				decoded_extrinsic,
+			)
+			.map_err(|e| {
+				e.into_pool_error()
+					.map(Error::from)
+					.unwrap_or_else(|e| Error::Verification(Box::new(e)))
+			});
+
+		let fut = async move {
+			match submit.await {
+				Ok(stream) => {
+					let mut state = TransactionState::new();
+					let stream =
+						stream.filter_map(|event| async move { state.handle_event(event) });
+					sink.pipe_from_stream(stream.boxed()).await;
+				},
+				Err(err) => {
+					// We have not created an `Watcher` for the tx. Make sure the
+					// error is still propagated as an event.
+					let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
+					sink.pipe_from_stream(futures::stream::once(async { event }).boxed()).await;
+				},
+			};
+		};
+
+		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
+		Ok(())
+	}
+}
+
+/// The transaction's state that needs to be preserved between
+/// multiple events generated by the transaction-pool.
+///
+/// # Note
+///
+/// In the future, the RPC server can submit only the last event when multiple
+/// identical events happen in a row.
+#[derive(Clone, Copy)]
+struct TransactionState {
+	/// True if the transaction was previously broadcasted.
+	broadcasted: bool,
+}
+
+impl TransactionState {
+	/// Construct a new [`TransactionState`].
+	pub fn new() -> Self {
+		TransactionState { broadcasted: false }
+	}
+
+	/// Handle events generated by the transaction-pool and convert them
+	/// to the new API expected state.
+	#[inline]
+	pub fn handle_event<Hash: Clone, BlockHash: Clone>(
+		&mut self,
+		event: TransactionStatus<Hash, BlockHash>,
+	) -> Option<TransactionEvent<BlockHash>> {
+		match event {
+			TransactionStatus::Ready | TransactionStatus::Future =>
+				Some(TransactionEvent::<BlockHash>::Validated),
+			TransactionStatus::Broadcast(peers) => {
+				// Set the broadcasted flag once if we submitted the transaction to
+				// at least one peer.
+				self.broadcasted = self.broadcasted || !peers.is_empty();
+
+				Some(TransactionEvent::Broadcasted(TransactionBroadcasted {
+					num_peers: peers.len(),
+				}))
+			},
+			TransactionStatus::InBlock((hash, index)) =>
+				Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
+					hash,
+					index,
+				}))),
+			TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)),
+			TransactionStatus::FinalityTimeout(_) =>
+				Some(TransactionEvent::Dropped(TransactionDropped {
+					broadcasted: self.broadcasted,
+					error: "Maximum number of finality watchers has been reached".into(),
+				})),
+			TransactionStatus::Finalized((hash, index)) =>
+				Some(TransactionEvent::Finalized(TransactionBlock { hash, index })),
+			TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
+				error: "Extrinsic was rendered invalid by another extrinsic".into(),
+			})),
+			TransactionStatus::Dropped => Some(TransactionEvent::Invalid(TransactionError {
+				error: "Extrinsic dropped from the pool due to exceeding limits".into(),
+			})),
+			TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
+				error: "Extrinsic marked as invalid".into(),
+			})),
+		}
+	}
+}
diff --git a/substrate/client/service/Cargo.toml b/substrate/client/service/Cargo.toml
index 308da96fbbe7723aa391c0b9bacac9c2ca294d17..a0c8f21effec1421ff7a128db2d09ed2d5103041 100644
--- a/substrate/client/service/Cargo.toml
+++ b/substrate/client/service/Cargo.toml
@@ -69,6 +69,7 @@ sc-transaction-pool-api = { version = "4.0.0-dev", path = "../transaction-pool/a
 sp-transaction-storage-proof = { version = "4.0.0-dev", path = "../../primitives/transaction-storage-proof" }
 sc-rpc-server = { version = "4.0.0-dev", path = "../rpc-servers" }
 sc-rpc = { version = "4.0.0-dev", path = "../rpc" }
+sc-rpc-spec-v2 = { version = "0.10.0-dev", path = "../rpc-spec-v2" }
 sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
 sp-block-builder = { version = "4.0.0-dev", path = "../../primitives/block-builder" }
 sc-informant = { version = "0.10.0-dev", path = "../informant" }
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index 4301e17a8c31ec82c29b3546ce50729a1ef88dca..987198d4b7f4822ca83829a770fcd2ecc05980e4 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -57,6 +57,7 @@ use sc_rpc::{
 	system::SystemApiServer,
 	DenyUnsafe, SubscriptionTaskExecutor,
 };
+use sc_rpc_spec_v2::transaction::TransactionApiServer;
 use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
 use sc_transaction_pool_api::MaintainedTransactionPool;
 use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
@@ -673,6 +674,13 @@ where
 		(chain, state, child_state)
 	};
 
+	let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
+		client.clone(),
+		transaction_pool.clone(),
+		task_executor.clone(),
+	)
+	.into_rpc();
+
 	let author = sc_rpc::author::Author::new(
 		client.clone(),
 		transaction_pool,
@@ -690,6 +698,10 @@ where
 		rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
 	}
 
+	// Part of the RPC v2 spec.
+	rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
+
+	// Part of the old RPC spec.
 	rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
 	rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
 	rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
diff --git a/substrate/client/transaction-pool/api/Cargo.toml b/substrate/client/transaction-pool/api/Cargo.toml
index d34ffe512b023e9bee405605d81067daa95aee57..1ab0f32bc8bad848ec5644f90746fd94aaf075cb 100644
--- a/substrate/client/transaction-pool/api/Cargo.toml
+++ b/substrate/client/transaction-pool/api/Cargo.toml
@@ -15,3 +15,6 @@ serde = { version = "1.0.136", features = ["derive"] }
 thiserror = "1.0.30"
 sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
 sp-runtime = { version = "6.0.0", default-features = false, path = "../../../primitives/runtime" }
+
+[dev-dependencies]
+serde_json = "1.0"
diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs
index 0ebb8f9d4cd9cdb05431882aa720475455a7e902..c0a94516ffc9759b5bcdba6abbd17672234f2dba 100644
--- a/substrate/client/transaction-pool/api/src/lib.rs
+++ b/substrate/client/transaction-pool/api/src/lib.rs
@@ -108,15 +108,18 @@ pub enum TransactionStatus<Hash, BlockHash> {
 	Ready,
 	/// The transaction has been broadcast to the given peers.
 	Broadcast(Vec<String>),
-	/// Transaction has been included in block with given hash.
-	InBlock(BlockHash),
+	/// Transaction has been included in block with given hash
+	/// at the given position.
+	#[serde(with = "v1_compatible")]
+	InBlock((BlockHash, TxIndex)),
 	/// The block this transaction was included in has been retracted.
 	Retracted(BlockHash),
 	/// Maximum number of finality watchers has been reached,
 	/// old watchers are being removed.
 	FinalityTimeout(BlockHash),
-	/// Transaction has been finalized by a finality-gadget, e.g GRANDPA
-	Finalized(BlockHash),
+	/// Transaction has been finalized by a finality-gadget, e.g GRANDPA.
+	#[serde(with = "v1_compatible")]
+	Finalized((BlockHash, TxIndex)),
 	/// Transaction has been replaced in the pool, by another transaction
 	/// that provides the same tags. (e.g. same (sender, nonce)).
 	Usurped(Hash),
@@ -143,6 +146,8 @@ pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsi
 pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
 /// Transaction type for a local pool.
 pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
+/// Transaction's index within the block in which it was included.
+pub type TxIndex = usize;
 
 /// Typical future type used in transaction pool api.
 pub type PoolFuture<T, E> = std::pin::Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
@@ -362,3 +367,52 @@ impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TP
 		})
 	}
 }
+
+/// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec.
+mod v1_compatible {
+	use serde::{Deserialize, Deserializer, Serialize, Serializer};
+
+	pub fn serialize<S, H>(data: &(H, usize), serializer: S) -> Result<S::Ok, S::Error>
+	where
+		S: Serializer,
+		H: Serialize,
+	{
+		let (hash, _) = data;
+		serde::Serialize::serialize(&hash, serializer)
+	}
+
+	pub fn deserialize<'de, D, H>(deserializer: D) -> Result<(H, usize), D::Error>
+	where
+		D: Deserializer<'de>,
+		H: Deserialize<'de>,
+	{
+		let hash: H = serde::Deserialize::deserialize(deserializer)?;
+		Ok((hash, 0))
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn tx_status_compatibility() {
+		let event: TransactionStatus<u8, u8> = TransactionStatus::InBlock((1, 2));
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"inBlock":1}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, TransactionStatus::InBlock((1, 0)));
+
+		let event: TransactionStatus<u8, u8> = TransactionStatus::Finalized((1, 2));
+		let ser = serde_json::to_string(&event).unwrap();
+
+		let exp = r#"{"finalized":1}"#;
+		assert_eq!(ser, exp);
+
+		let event_dec: TransactionStatus<u8, u8> = serde_json::from_str(exp).unwrap();
+		assert_eq!(event_dec, TransactionStatus::Finalized((1, 0)));
+	}
+}
diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs
index d4f42b32fdbb84c19630f740890011ea67fb1327..776749abf2d5deb4ad39d788a1680949395d7c23 100644
--- a/substrate/client/transaction-pool/src/graph/listener.rs
+++ b/substrate/client/transaction-pool/src/graph/listener.rs
@@ -104,13 +104,18 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
 	/// Transaction was pruned from the pool.
 	pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) {
 		debug!(target: "txpool", "[{:?}] Pruned at {:?}", tx, block_hash);
-		self.fire(tx, |s| s.in_block(block_hash));
-		self.finality_watchers.entry(block_hash).or_insert(vec![]).push(tx.clone());
+		// Get the transactions included in the given block hash.
+		let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]);
+		txs.push(tx.clone());
+		// Current transaction is the last one included.
+		let tx_index = txs.len() - 1;
+
+		self.fire(tx, |watcher| watcher.in_block(block_hash, tx_index));
 
 		while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
 			if let Some((hash, txs)) = self.finality_watchers.pop_front() {
 				for tx in txs {
-					self.fire(&tx, |s| s.finality_timeout(hash));
+					self.fire(&tx, |watcher| watcher.finality_timeout(hash));
 				}
 			}
 		}
@@ -120,7 +125,7 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
 	pub fn retracted(&mut self, block_hash: BlockHash<C>) {
 		if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
 			for hash in hashes {
-				self.fire(&hash, |s| s.retracted(block_hash))
+				self.fire(&hash, |watcher| watcher.retracted(block_hash))
 			}
 		}
 	}
@@ -128,9 +133,9 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
 	/// Notify all watchers that transactions have been finalized
 	pub fn finalized(&mut self, block_hash: BlockHash<C>) {
 		if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
-			for hash in hashes {
+			for (tx_index, hash) in hashes.into_iter().enumerate() {
 				log::debug!(target: "txpool", "[{:?}] Sent finalization event (block {:?})", hash, block_hash);
-				self.fire(&hash, |s| s.finalized(block_hash))
+				self.fire(&hash, |watcher| watcher.finalized(block_hash, tx_index))
 			}
 		}
 	}
diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs
index 19acbddbe784307b5cfbc0507abc9353274fc1d5..108ae791e37b388ac1acdc698ce98ebdf246c3e5 100644
--- a/substrate/client/transaction-pool/src/graph/pool.rs
+++ b/substrate/client/transaction-pool/src/graph/pool.rs
@@ -770,7 +770,7 @@ mod tests {
 			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
 			assert_eq!(
 				stream.next(),
-				Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
+				Some(TransactionStatus::InBlock((H256::from_low_u64_be(2).into(), 0))),
 			);
 		}
 
@@ -803,7 +803,7 @@ mod tests {
 			assert_eq!(stream.next(), Some(TransactionStatus::Ready));
 			assert_eq!(
 				stream.next(),
-				Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
+				Some(TransactionStatus::InBlock((H256::from_low_u64_be(2).into(), 0))),
 			);
 		}
 
diff --git a/substrate/client/transaction-pool/src/graph/watcher.rs b/substrate/client/transaction-pool/src/graph/watcher.rs
index 8cd78cfc78240009ec5af523c347df81c95fa112..0613300c8684b13d62a96445b0ef6b16baf0573e 100644
--- a/substrate/client/transaction-pool/src/graph/watcher.rs
+++ b/substrate/client/transaction-pool/src/graph/watcher.rs
@@ -84,13 +84,13 @@ impl<H: Clone, BH: Clone> Sender<H, BH> {
 	}
 
 	/// Extrinsic has been included in block with given hash.
-	pub fn in_block(&mut self, hash: BH) {
-		self.send(TransactionStatus::InBlock(hash));
+	pub fn in_block(&mut self, hash: BH, index: usize) {
+		self.send(TransactionStatus::InBlock((hash, index)));
 	}
 
 	/// Extrinsic has been finalized by a finality gadget.
-	pub fn finalized(&mut self, hash: BH) {
-		self.send(TransactionStatus::Finalized(hash));
+	pub fn finalized(&mut self, hash: BH, index: usize) {
+		self.send(TransactionStatus::Finalized((hash, index)));
 		self.is_finalized = true;
 	}
 
diff --git a/substrate/client/transaction-pool/tests/pool.rs b/substrate/client/transaction-pool/tests/pool.rs
index f04a27cf81e1d93941be714d8ae79691c0f019bd..be75523c1230fdbb7da63ea83c6129b0a80bcbc2 100644
--- a/substrate/client/transaction-pool/tests/pool.rs
+++ b/substrate/client/transaction-pool/tests/pool.rs
@@ -328,7 +328,7 @@ fn should_revalidate_across_many_blocks() {
 
 	block_on(
 		watcher1
-			.take_while(|s| future::ready(*s != TransactionStatus::InBlock(block_hash)))
+			.take_while(|s| future::ready(*s != TransactionStatus::InBlock((block_hash, 0))))
 			.collect::<Vec<_>>(),
 	);
 
@@ -398,24 +398,24 @@ fn should_push_watchers_during_maintenance() {
 		futures::executor::block_on_stream(watcher0).collect::<Vec<_>>(),
 		vec![
 			TransactionStatus::Ready,
-			TransactionStatus::InBlock(header_hash),
-			TransactionStatus::Finalized(header_hash)
+			TransactionStatus::InBlock((header_hash, 0)),
+			TransactionStatus::Finalized((header_hash, 0))
 		],
 	);
 	assert_eq!(
 		futures::executor::block_on_stream(watcher1).collect::<Vec<_>>(),
 		vec![
 			TransactionStatus::Ready,
-			TransactionStatus::InBlock(header_hash),
-			TransactionStatus::Finalized(header_hash)
+			TransactionStatus::InBlock((header_hash, 1)),
+			TransactionStatus::Finalized((header_hash, 1))
 		],
 	);
 	assert_eq!(
 		futures::executor::block_on_stream(watcher2).collect::<Vec<_>>(),
 		vec![
 			TransactionStatus::Ready,
-			TransactionStatus::InBlock(header_hash),
-			TransactionStatus::Finalized(header_hash)
+			TransactionStatus::InBlock((header_hash, 2)),
+			TransactionStatus::Finalized((header_hash, 2))
 		],
 	);
 }
@@ -450,8 +450,8 @@ fn finalization() {
 
 	let mut stream = futures::executor::block_on_stream(watcher);
 	assert_eq!(stream.next(), Some(TransactionStatus::Ready));
-	assert_eq!(stream.next(), Some(TransactionStatus::InBlock(header.hash())));
-	assert_eq!(stream.next(), Some(TransactionStatus::Finalized(header.hash())));
+	assert_eq!(stream.next(), Some(TransactionStatus::InBlock((header.hash(), 0))));
+	assert_eq!(stream.next(), Some(TransactionStatus::Finalized((header.hash(), 0))));
 	assert_eq!(stream.next(), None);
 }
 
@@ -573,30 +573,31 @@ fn fork_aware_finalization() {
 	for (canon_watcher, h) in canon_watchers {
 		let mut stream = futures::executor::block_on_stream(canon_watcher);
 		assert_eq!(stream.next(), Some(TransactionStatus::Ready));
-		assert_eq!(stream.next(), Some(TransactionStatus::InBlock(h)));
-		assert_eq!(stream.next(), Some(TransactionStatus::Finalized(h)));
+		assert_eq!(stream.next(), Some(TransactionStatus::InBlock((h, 0))));
+		assert_eq!(stream.next(), Some(TransactionStatus::Finalized((h, 0))));
 		assert_eq!(stream.next(), None);
 	}
 
 	{
 		let mut stream = futures::executor::block_on_stream(from_dave_watcher);
 		assert_eq!(stream.next(), Some(TransactionStatus::Ready));
-		assert_eq!(stream.next(), Some(TransactionStatus::InBlock(c2)));
+		assert_eq!(stream.next(), Some(TransactionStatus::InBlock((c2, 0))));
 		assert_eq!(stream.next(), Some(TransactionStatus::Retracted(c2)));
 		assert_eq!(stream.next(), Some(TransactionStatus::Ready));
-		assert_eq!(stream.next(), Some(TransactionStatus::InBlock(e1)));
-		assert_eq!(stream.next(), Some(TransactionStatus::Finalized(e1)));
+		assert_eq!(stream.next(), Some(TransactionStatus::InBlock((e1, 0))));
+		assert_eq!(stream.next(), Some(TransactionStatus::Finalized((e1, 0))));
 		assert_eq!(stream.next(), None);
 	}
 
 	{
 		let mut stream = futures::executor::block_on_stream(from_bob_watcher);
 		assert_eq!(stream.next(), Some(TransactionStatus::Ready));
-		assert_eq!(stream.next(), Some(TransactionStatus::InBlock(d2)));
+		assert_eq!(stream.next(), Some(TransactionStatus::InBlock((d2, 0))));
 		assert_eq!(stream.next(), Some(TransactionStatus::Retracted(d2)));
 		assert_eq!(stream.next(), Some(TransactionStatus::Ready));
-		assert_eq!(stream.next(), Some(TransactionStatus::InBlock(e1)));
-		assert_eq!(stream.next(), Some(TransactionStatus::Finalized(e1)));
+		// In block e1 we submitted: [dave, bob] xts in this order.
+		assert_eq!(stream.next(), Some(TransactionStatus::InBlock((e1, 1))));
+		assert_eq!(stream.next(), Some(TransactionStatus::Finalized((e1, 1))));
 		assert_eq!(stream.next(), None);
 	}
 }
@@ -646,10 +647,10 @@ fn prune_and_retract_tx_at_same_time() {
 	{
 		let mut stream = futures::executor::block_on_stream(watcher);
 		assert_eq!(stream.next(), Some(TransactionStatus::Ready));
-		assert_eq!(stream.next(), Some(TransactionStatus::InBlock(b1)));
+		assert_eq!(stream.next(), Some(TransactionStatus::InBlock((b1, 0))));
 		assert_eq!(stream.next(), Some(TransactionStatus::Retracted(b1)));
-		assert_eq!(stream.next(), Some(TransactionStatus::InBlock(b2)));
-		assert_eq!(stream.next(), Some(TransactionStatus::Finalized(b2)));
+		assert_eq!(stream.next(), Some(TransactionStatus::InBlock((b2, 0))));
+		assert_eq!(stream.next(), Some(TransactionStatus::Finalized((b2, 0))));
 		assert_eq!(stream.next(), None);
 	}
 }