diff --git a/Cargo.lock b/Cargo.lock
index 2c938ec17bd000d56a3965d777fdc8eb65cbcc35..12e642bc9d06e038b06463740ed46f3ae78e7836 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -20959,7 +20959,7 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
 dependencies = [
  "aho-corasick",
  "memchr",
- "regex-automata 0.4.9",
+ "regex-automata 0.4.8",
  "regex-syntax 0.8.5",
 ]
 
@@ -20980,9 +20980,9 @@ checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69"
 
 [[package]]
 name = "regex-automata"
-version = "0.4.9"
+version = "0.4.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
+checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -23277,6 +23277,7 @@ dependencies = [
  "futures",
  "futures-util",
  "hex",
+ "itertools 0.11.0",
  "jsonrpsee",
  "log",
  "parity-scale-codec",
diff --git a/prdoc/pr_5997.prdoc b/prdoc/pr_5997.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..6bac36a44586b8d55d8ae26738ab36100312088b
--- /dev/null
+++ b/prdoc/pr_5997.prdoc
@@ -0,0 +1,18 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: Implement archive_unstable_storageDiff method
+
+doc:
+  - audience: Node Dev
+    description: |
+      This PR implements the `archive_unstable_storageDiff` rpc-v2 method.
+      Developers can use this method to fetch the storage differences
+      between two blocks. This is useful for oracles and archive nodes.
+      For more details see: https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/archive_unstable_storageDiff.md.
+
+crates:
+  - name: sc-rpc-spec-v2
+    bump: major
+  - name: sc-service
+    bump: patch
diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml
index daa805912fb958a19714e9d22b0dd911f002d26f..b304bc905925e26d4ad2631f00d8e4dbf9ae36f2 100644
--- a/substrate/client/rpc-spec-v2/Cargo.toml
+++ b/substrate/client/rpc-spec-v2/Cargo.toml
@@ -42,6 +42,7 @@ log = { workspace = true, default-features = true }
 futures-util = { workspace = true }
 rand = { workspace = true, default-features = true }
 schnellru = { workspace = true }
+itertools = { workspace = true }
 
 [dev-dependencies]
 async-trait = { workspace = true }
diff --git a/substrate/client/rpc-spec-v2/src/archive/api.rs b/substrate/client/rpc-spec-v2/src/archive/api.rs
index b197383040005de0b585c684a91c034ebc5e80ec..dcfeaecb147bb54c5fb351ded5a509447921c6b4 100644
--- a/substrate/client/rpc-spec-v2/src/archive/api.rs
+++ b/substrate/client/rpc-spec-v2/src/archive/api.rs
@@ -19,7 +19,10 @@
 //! API trait of the archive methods.
 
 use crate::{
-	common::events::{ArchiveStorageResult, PaginatedStorageQuery},
+	common::events::{
+		ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
+		PaginatedStorageQuery,
+	},
 	MethodResult,
 };
 use jsonrpsee::{core::RpcResult, proc_macros::rpc};
@@ -104,4 +107,21 @@ pub trait ArchiveApi<Hash> {
 		items: Vec<PaginatedStorageQuery<String>>,
 		child_trie: Option<String>,
 	) -> RpcResult<ArchiveStorageResult>;
+
+	/// Returns the storage difference between two blocks.
+	///
+	/// # Unstable
+	///
+	/// This method is unstable and can change in minor or patch releases.
+	#[subscription(
+		name = "archive_unstable_storageDiff" => "archive_unstable_storageDiffEvent",
+		unsubscribe = "archive_unstable_storageDiff_stopStorageDiff",
+		item = ArchiveStorageDiffEvent,
+	)]
+	fn archive_unstable_storage_diff(
+		&self,
+		hash: Hash,
+		items: Vec<ArchiveStorageDiffItem<String>>,
+		previous_hash: Option<Hash>,
+	);
 }
diff --git a/substrate/client/rpc-spec-v2/src/archive/archive.rs b/substrate/client/rpc-spec-v2/src/archive/archive.rs
index dd6c566a76ed475f41acf189dc5e6734e68f58bc..55054d91d85d1fd56ff9cf77d2138c674aa0a096 100644
--- a/substrate/client/rpc-spec-v2/src/archive/archive.rs
+++ b/substrate/client/rpc-spec-v2/src/archive/archive.rs
@@ -19,17 +19,29 @@
 //! API implementation for `archive`.
 
 use crate::{
-	archive::{error::Error as ArchiveError, ArchiveApiServer},
-	common::events::{ArchiveStorageResult, PaginatedStorageQuery},
-	hex_string, MethodResult,
+	archive::{
+		archive_storage::{ArchiveStorage, ArchiveStorageDiff},
+		error::Error as ArchiveError,
+		ArchiveApiServer,
+	},
+	common::events::{
+		ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
+		PaginatedStorageQuery,
+	},
+	hex_string, MethodResult, SubscriptionTaskExecutor,
 };
 
 use codec::Encode;
-use jsonrpsee::core::{async_trait, RpcResult};
+use futures::FutureExt;
+use jsonrpsee::{
+	core::{async_trait, RpcResult},
+	PendingSubscriptionSink,
+};
 use sc_client_api::{
 	Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
 	StorageProvider,
 };
+use sc_rpc::utils::Subscription;
 use sp_api::{CallApiAt, CallContext};
 use sp_blockchain::{
 	Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
@@ -41,7 +53,9 @@ use sp_runtime::{
 };
 use std::{collections::HashSet, marker::PhantomData, sync::Arc};
 
-use super::archive_storage::ArchiveStorage;
+use tokio::sync::mpsc;
+
+pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";
 
 /// The configuration of [`Archive`].
 pub struct ArchiveConfig {
@@ -64,6 +78,12 @@ const MAX_DESCENDANT_RESPONSES: usize = 5;
 /// `MAX_DESCENDANT_RESPONSES`.
 const MAX_QUERIED_ITEMS: usize = 8;
 
+/// The buffer capacity for each storage query.
+///
+/// This is small because the underlying JSON-RPC server has
+/// its down buffer capacity per connection as well.
+const STORAGE_QUERY_BUF: usize = 16;
+
 impl Default for ArchiveConfig {
 	fn default() -> Self {
 		Self {
@@ -79,6 +99,8 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
 	client: Arc<Client>,
 	/// Backend of the chain.
 	backend: Arc<BE>,
+	/// Executor to spawn subscriptions.
+	executor: SubscriptionTaskExecutor,
 	/// The hexadecimal encoded hash of the genesis block.
 	genesis_hash: String,
 	/// The maximum number of items the `archive_storage` can return for a descendant query before
@@ -96,12 +118,14 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
 		client: Arc<Client>,
 		backend: Arc<BE>,
 		genesis_hash: GenesisHash,
+		executor: SubscriptionTaskExecutor,
 		config: ArchiveConfig,
 	) -> Self {
 		let genesis_hash = hex_string(&genesis_hash.as_ref());
 		Self {
 			client,
 			backend,
+			executor,
 			genesis_hash,
 			storage_max_descendant_responses: config.max_descendant_responses,
 			storage_max_queried_items: config.max_queried_items,
@@ -278,4 +302,59 @@ where
 
 		Ok(storage_client.handle_query(hash, items, child_trie))
 	}
+
+	fn archive_unstable_storage_diff(
+		&self,
+		pending: PendingSubscriptionSink,
+		hash: Block::Hash,
+		items: Vec<ArchiveStorageDiffItem<String>>,
+		previous_hash: Option<Block::Hash>,
+	) {
+		let storage_client = ArchiveStorageDiff::new(self.client.clone());
+		let client = self.client.clone();
+
+		log::trace!(target: LOG_TARGET, "Storage diff subscription started");
+
+		let fut = async move {
+			let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };
+
+			let previous_hash = if let Some(previous_hash) = previous_hash {
+				previous_hash
+			} else {
+				let Ok(Some(current_header)) = client.header(hash) else {
+					let message = format!("Block header is not present: {hash}");
+					let _ = sink.send(&ArchiveStorageDiffEvent::err(message)).await;
+					return
+				};
+				*current_header.parent_hash()
+			};
+
+			let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
+			let storage_fut =
+				storage_client.handle_trie_queries(hash, items, previous_hash, tx.clone());
+
+			// We don't care about the return value of this join:
+			// - process_events might encounter an error (if the client disconnected)
+			// - storage_fut might encounter an error while processing a trie queries and
+			// the error is propagated via the sink.
+			let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await;
+		};
+
+		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
+	}
+}
+
+/// Sends all the events to the sink.
+async fn process_events(rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, sink: &mut Subscription) {
+	while let Some(event) = rx.recv().await {
+		if event.is_done() {
+			log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
+		} else if event.is_err() {
+			log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
+		}
+
+		if sink.send(&event).await.is_err() {
+			return
+		}
+	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/archive/archive_storage.rs b/substrate/client/rpc-spec-v2/src/archive/archive_storage.rs
index 26e7c299de418e499863c796c6566e912ac28f06..5a3920882f00e6ea78907a49564a752970c6211f 100644
--- a/substrate/client/rpc-spec-v2/src/archive/archive_storage.rs
+++ b/substrate/client/rpc-spec-v2/src/archive/archive_storage.rs
@@ -18,15 +18,28 @@
 
 //! Implementation of the `archive_storage` method.
 
-use std::sync::Arc;
+use std::{
+	collections::{hash_map::Entry, HashMap},
+	sync::Arc,
+};
 
+use itertools::Itertools;
 use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
 use sp_runtime::traits::Block as BlockT;
 
-use crate::common::{
-	events::{ArchiveStorageResult, PaginatedStorageQuery, StorageQueryType},
-	storage::{IterQueryType, QueryIter, Storage},
+use super::error::Error as ArchiveError;
+use crate::{
+	archive::archive::LOG_TARGET,
+	common::{
+		events::{
+			ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageDiffOperationType,
+			ArchiveStorageDiffResult, ArchiveStorageDiffType, ArchiveStorageResult,
+			PaginatedStorageQuery, StorageQueryType, StorageResult,
+		},
+		storage::{IterQueryType, QueryIter, Storage},
+	},
 };
+use tokio::sync::mpsc;
 
 /// Generates the events of the `archive_storage` method.
 pub struct ArchiveStorage<Client, Block, BE> {
@@ -127,3 +140,811 @@ where
 		ArchiveStorageResult::ok(storage_results, discarded_items)
 	}
 }
+
+/// Parse hex-encoded string parameter as raw bytes.
+///
+/// If the parsing fails, returns an error propagated to the RPC method.
+pub fn parse_hex_param(param: String) -> Result<Vec<u8>, ArchiveError> {
+	// Methods can accept empty parameters.
+	if param.is_empty() {
+		return Ok(Default::default())
+	}
+
+	array_bytes::hex2bytes(&param).map_err(|_| ArchiveError::InvalidParam(param))
+}
+
+#[derive(Debug, PartialEq, Clone)]
+pub struct DiffDetails {
+	key: StorageKey,
+	return_type: ArchiveStorageDiffType,
+	child_trie_key: Option<ChildInfo>,
+	child_trie_key_string: Option<String>,
+}
+
+/// The type of storage query.
+#[derive(Debug, PartialEq, Clone, Copy)]
+enum FetchStorageType {
+	/// Only fetch the value.
+	Value,
+	/// Only fetch the hash.
+	Hash,
+	/// Fetch both the value and the hash.
+	Both,
+}
+
+/// The return value of the `fetch_storage` method.
+#[derive(Debug, PartialEq, Clone)]
+enum FetchedStorage {
+	/// Storage value under a key.
+	Value(StorageResult),
+	/// Storage hash under a key.
+	Hash(StorageResult),
+	/// Both storage value and hash under a key.
+	Both { value: StorageResult, hash: StorageResult },
+}
+
+pub struct ArchiveStorageDiff<Client, Block, BE> {
+	client: Storage<Client, Block, BE>,
+}
+
+impl<Client, Block, BE> ArchiveStorageDiff<Client, Block, BE> {
+	pub fn new(client: Arc<Client>) -> Self {
+		Self { client: Storage::new(client) }
+	}
+}
+
+impl<Client, Block, BE> ArchiveStorageDiff<Client, Block, BE>
+where
+	Block: BlockT + 'static,
+	BE: Backend<Block> + 'static,
+	Client: StorageProvider<Block, BE> + Send + Sync + 'static,
+{
+	/// Fetch the storage from the given key.
+	fn fetch_storage(
+		&self,
+		hash: Block::Hash,
+		key: StorageKey,
+		maybe_child_trie: Option<ChildInfo>,
+		ty: FetchStorageType,
+	) -> Result<Option<FetchedStorage>, String> {
+		match ty {
+			FetchStorageType::Value => {
+				let result = self.client.query_value(hash, &key, maybe_child_trie.as_ref())?;
+
+				Ok(result.map(FetchedStorage::Value))
+			},
+
+			FetchStorageType::Hash => {
+				let result = self.client.query_hash(hash, &key, maybe_child_trie.as_ref())?;
+
+				Ok(result.map(FetchedStorage::Hash))
+			},
+
+			FetchStorageType::Both => {
+				let Some(value) = self.client.query_value(hash, &key, maybe_child_trie.as_ref())?
+				else {
+					return Ok(None);
+				};
+
+				let Some(hash) = self.client.query_hash(hash, &key, maybe_child_trie.as_ref())?
+				else {
+					return Ok(None);
+				};
+
+				Ok(Some(FetchedStorage::Both { value, hash }))
+			},
+		}
+	}
+
+	/// Check if the key belongs to the provided query items.
+	///
+	/// A key belongs to the query items when:
+	/// - the provided key is a prefix of the key in the query items.
+	/// - the query items are empty.
+	///
+	/// Returns an optional `FetchStorageType` based on the query items.
+	/// If the key does not belong to the query items, returns `None`.
+	fn belongs_to_query(key: &StorageKey, items: &[DiffDetails]) -> Option<FetchStorageType> {
+		// User has requested all keys, by default this fallbacks to fetching the value.
+		if items.is_empty() {
+			return Some(FetchStorageType::Value)
+		}
+
+		let mut value = false;
+		let mut hash = false;
+
+		for item in items {
+			if key.as_ref().starts_with(&item.key.as_ref()) {
+				match item.return_type {
+					ArchiveStorageDiffType::Value => value = true,
+					ArchiveStorageDiffType::Hash => hash = true,
+				}
+			}
+		}
+
+		match (value, hash) {
+			(true, true) => Some(FetchStorageType::Both),
+			(true, false) => Some(FetchStorageType::Value),
+			(false, true) => Some(FetchStorageType::Hash),
+			(false, false) => None,
+		}
+	}
+
+	/// Send the provided result to the `tx` sender.
+	///
+	/// Returns `false` if the sender has been closed.
+	fn send_result(
+		tx: &mpsc::Sender<ArchiveStorageDiffEvent>,
+		result: FetchedStorage,
+		operation_type: ArchiveStorageDiffOperationType,
+		child_trie_key: Option<String>,
+	) -> bool {
+		let items = match result {
+			FetchedStorage::Value(storage_result) | FetchedStorage::Hash(storage_result) =>
+				vec![storage_result],
+			FetchedStorage::Both { value, hash } => vec![value, hash],
+		};
+
+		for item in items {
+			let res = ArchiveStorageDiffEvent::StorageDiff(ArchiveStorageDiffResult {
+				key: item.key,
+				result: item.result,
+				operation_type,
+				child_trie_key: child_trie_key.clone(),
+			});
+			if tx.blocking_send(res).is_err() {
+				return false
+			}
+		}
+
+		true
+	}
+
+	fn handle_trie_queries_inner(
+		&self,
+		hash: Block::Hash,
+		previous_hash: Block::Hash,
+		items: Vec<DiffDetails>,
+		tx: &mpsc::Sender<ArchiveStorageDiffEvent>,
+	) -> Result<(), String> {
+		// Parse the child trie key as `ChildInfo` and `String`.
+		let maybe_child_trie = items.first().and_then(|item| item.child_trie_key.clone());
+		let maybe_child_trie_str =
+			items.first().and_then(|item| item.child_trie_key_string.clone());
+
+		// Iterator over the current block and previous block
+		// at the same time to compare the keys. This approach effectively
+		// leverages backpressure to avoid memory consumption.
+		let keys_iter = self.client.raw_keys_iter(hash, maybe_child_trie.clone())?;
+		let previous_keys_iter =
+			self.client.raw_keys_iter(previous_hash, maybe_child_trie.clone())?;
+
+		let mut diff_iter = lexicographic_diff(keys_iter, previous_keys_iter);
+
+		while let Some(item) = diff_iter.next() {
+			let (operation_type, key) = match item {
+				Diff::Added(key) => (ArchiveStorageDiffOperationType::Added, key),
+				Diff::Deleted(key) => (ArchiveStorageDiffOperationType::Deleted, key),
+				Diff::Equal(key) => (ArchiveStorageDiffOperationType::Modified, key),
+			};
+
+			let Some(fetch_type) = Self::belongs_to_query(&key, &items) else {
+				// The key does not belong the the query items.
+				continue;
+			};
+
+			let maybe_result = match operation_type {
+				ArchiveStorageDiffOperationType::Added =>
+					self.fetch_storage(hash, key.clone(), maybe_child_trie.clone(), fetch_type)?,
+				ArchiveStorageDiffOperationType::Deleted => self.fetch_storage(
+					previous_hash,
+					key.clone(),
+					maybe_child_trie.clone(),
+					fetch_type,
+				)?,
+				ArchiveStorageDiffOperationType::Modified => {
+					let Some(storage_result) = self.fetch_storage(
+						hash,
+						key.clone(),
+						maybe_child_trie.clone(),
+						fetch_type,
+					)?
+					else {
+						continue
+					};
+
+					let Some(previous_storage_result) = self.fetch_storage(
+						previous_hash,
+						key.clone(),
+						maybe_child_trie.clone(),
+						fetch_type,
+					)?
+					else {
+						continue
+					};
+
+					// For modified records we need to check the actual storage values.
+					if storage_result == previous_storage_result {
+						continue
+					}
+
+					Some(storage_result)
+				},
+			};
+
+			if let Some(storage_result) = maybe_result {
+				if !Self::send_result(
+					&tx,
+					storage_result,
+					operation_type,
+					maybe_child_trie_str.clone(),
+				) {
+					return Ok(())
+				}
+			}
+		}
+
+		Ok(())
+	}
+
+	/// This method will iterate over the keys of the main trie or a child trie and fetch the
+	/// given keys. The fetched keys will be sent to the provided `tx` sender to leverage
+	/// the backpressure mechanism.
+	pub async fn handle_trie_queries(
+		&self,
+		hash: Block::Hash,
+		items: Vec<ArchiveStorageDiffItem<String>>,
+		previous_hash: Block::Hash,
+		tx: mpsc::Sender<ArchiveStorageDiffEvent>,
+	) -> Result<(), tokio::task::JoinError> {
+		let this = ArchiveStorageDiff { client: self.client.clone() };
+
+		tokio::task::spawn_blocking(move || {
+			// Deduplicate the items.
+			let mut trie_items = match deduplicate_storage_diff_items(items) {
+				Ok(items) => items,
+				Err(error) => {
+					let _ = tx.blocking_send(ArchiveStorageDiffEvent::err(error.to_string()));
+					return
+				},
+			};
+			// Default to using the main storage trie if no items are provided.
+			if trie_items.is_empty() {
+				trie_items.push(Vec::new());
+			}
+			log::trace!(target: LOG_TARGET, "Storage diff deduplicated items: {:?}", trie_items);
+
+			for items in trie_items {
+				log::trace!(
+					target: LOG_TARGET,
+					"handle_trie_queries: hash={:?}, previous_hash={:?}, items={:?}",
+					hash,
+					previous_hash,
+					items
+				);
+
+				let result = this.handle_trie_queries_inner(hash, previous_hash, items, &tx);
+
+				if let Err(error) = result {
+					log::trace!(
+						target: LOG_TARGET,
+						"handle_trie_queries: sending error={:?}",
+						error,
+					);
+
+					let _ = tx.blocking_send(ArchiveStorageDiffEvent::err(error));
+
+					return
+				} else {
+					log::trace!(
+						target: LOG_TARGET,
+						"handle_trie_queries: sending storage diff done",
+					);
+				}
+			}
+
+			let _ = tx.blocking_send(ArchiveStorageDiffEvent::StorageDiffDone);
+		})
+		.await?;
+
+		Ok(())
+	}
+}
+
+/// The result of the `lexicographic_diff` method.
+#[derive(Debug, PartialEq)]
+enum Diff<T> {
+	Added(T),
+	Deleted(T),
+	Equal(T),
+}
+
+/// Compare two iterators lexicographically and return the differences.
+fn lexicographic_diff<T, LeftIter, RightIter>(
+	mut left: LeftIter,
+	mut right: RightIter,
+) -> impl Iterator<Item = Diff<T>>
+where
+	T: Ord,
+	LeftIter: Iterator<Item = T>,
+	RightIter: Iterator<Item = T>,
+{
+	let mut a = left.next();
+	let mut b = right.next();
+
+	core::iter::from_fn(move || match (a.take(), b.take()) {
+		(Some(a_value), Some(b_value)) =>
+			if a_value < b_value {
+				b = Some(b_value);
+				a = left.next();
+
+				Some(Diff::Added(a_value))
+			} else if a_value > b_value {
+				a = Some(a_value);
+				b = right.next();
+
+				Some(Diff::Deleted(b_value))
+			} else {
+				a = left.next();
+				b = right.next();
+
+				Some(Diff::Equal(a_value))
+			},
+		(Some(a_value), None) => {
+			a = left.next();
+			Some(Diff::Added(a_value))
+		},
+		(None, Some(b_value)) => {
+			b = right.next();
+			Some(Diff::Deleted(b_value))
+		},
+		(None, None) => None,
+	})
+}
+
+/// Deduplicate the provided items and return a list of `DiffDetails`.
+///
+/// Each list corresponds to a single child trie or the main trie.
+fn deduplicate_storage_diff_items(
+	items: Vec<ArchiveStorageDiffItem<String>>,
+) -> Result<Vec<Vec<DiffDetails>>, ArchiveError> {
+	let mut deduplicated: HashMap<Option<ChildInfo>, Vec<DiffDetails>> = HashMap::new();
+
+	for diff_item in items {
+		// Ensure the provided hex keys are valid before deduplication.
+		let key = StorageKey(parse_hex_param(diff_item.key)?);
+		let child_trie_key_string = diff_item.child_trie_key.clone();
+		let child_trie_key = diff_item
+			.child_trie_key
+			.map(|child_trie_key| parse_hex_param(child_trie_key))
+			.transpose()?
+			.map(ChildInfo::new_default_from_vec);
+
+		let diff_item = DiffDetails {
+			key,
+			return_type: diff_item.return_type,
+			child_trie_key: child_trie_key.clone(),
+			child_trie_key_string,
+		};
+
+		match deduplicated.entry(child_trie_key.clone()) {
+			Entry::Occupied(mut entry) => {
+				let mut should_insert = true;
+
+				for existing in entry.get() {
+					// This points to a different return type.
+					if existing.return_type != diff_item.return_type {
+						continue
+					}
+					// Keys and return types are identical.
+					if existing.key == diff_item.key {
+						should_insert = false;
+						break
+					}
+
+					// The following two conditions ensure that we keep the shortest key.
+
+					// The current key is a longer prefix of the existing key.
+					if diff_item.key.as_ref().starts_with(&existing.key.as_ref()) {
+						should_insert = false;
+						break
+					}
+
+					// The existing key is a longer prefix of the current key.
+					// We need to keep the current key and remove the existing one.
+					if existing.key.as_ref().starts_with(&diff_item.key.as_ref()) {
+						let to_remove = existing.clone();
+						entry.get_mut().retain(|item| item != &to_remove);
+						break;
+					}
+				}
+
+				if should_insert {
+					entry.get_mut().push(diff_item);
+				}
+			},
+			Entry::Vacant(entry) => {
+				entry.insert(vec![diff_item]);
+			},
+		}
+	}
+
+	Ok(deduplicated
+		.into_iter()
+		.sorted_by_key(|(child_trie_key, _)| child_trie_key.clone())
+		.map(|(_, values)| values)
+		.collect())
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn dedup_empty() {
+		let items = vec![];
+		let result = deduplicate_storage_diff_items(items).unwrap();
+		assert!(result.is_empty());
+	}
+
+	#[test]
+	fn dedup_single() {
+		let items = vec![ArchiveStorageDiffItem {
+			key: "0x01".into(),
+			return_type: ArchiveStorageDiffType::Value,
+			child_trie_key: None,
+		}];
+		let result = deduplicate_storage_diff_items(items).unwrap();
+		assert_eq!(result.len(), 1);
+		assert_eq!(result[0].len(), 1);
+
+		let expected = DiffDetails {
+			key: StorageKey(vec![1]),
+			return_type: ArchiveStorageDiffType::Value,
+			child_trie_key: None,
+			child_trie_key_string: None,
+		};
+		assert_eq!(result[0][0], expected);
+	}
+
+	#[test]
+	fn dedup_with_different_keys() {
+		let items = vec![
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+			ArchiveStorageDiffItem {
+				key: "0x02".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+		];
+		let result = deduplicate_storage_diff_items(items).unwrap();
+		assert_eq!(result.len(), 1);
+		assert_eq!(result[0].len(), 2);
+
+		let expected = vec![
+			DiffDetails {
+				key: StorageKey(vec![1]),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+				child_trie_key_string: None,
+			},
+			DiffDetails {
+				key: StorageKey(vec![2]),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+				child_trie_key_string: None,
+			},
+		];
+		assert_eq!(result[0], expected);
+	}
+
+	#[test]
+	fn dedup_with_same_keys() {
+		// Identical keys.
+		let items = vec![
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+		];
+		let result = deduplicate_storage_diff_items(items).unwrap();
+		assert_eq!(result.len(), 1);
+		assert_eq!(result[0].len(), 1);
+
+		let expected = vec![DiffDetails {
+			key: StorageKey(vec![1]),
+			return_type: ArchiveStorageDiffType::Value,
+			child_trie_key: None,
+			child_trie_key_string: None,
+		}];
+		assert_eq!(result[0], expected);
+	}
+
+	#[test]
+	fn dedup_with_same_prefix() {
+		// Identical keys.
+		let items = vec![
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01ff".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+		];
+		let result = deduplicate_storage_diff_items(items).unwrap();
+		assert_eq!(result.len(), 1);
+		assert_eq!(result[0].len(), 1);
+
+		let expected = vec![DiffDetails {
+			key: StorageKey(vec![1]),
+			return_type: ArchiveStorageDiffType::Value,
+			child_trie_key: None,
+			child_trie_key_string: None,
+		}];
+		assert_eq!(result[0], expected);
+	}
+
+	#[test]
+	fn dedup_with_different_return_types() {
+		let items = vec![
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Hash,
+				child_trie_key: None,
+			},
+		];
+		let result = deduplicate_storage_diff_items(items).unwrap();
+		assert_eq!(result.len(), 1);
+		assert_eq!(result[0].len(), 2);
+
+		let expected = vec![
+			DiffDetails {
+				key: StorageKey(vec![1]),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+				child_trie_key_string: None,
+			},
+			DiffDetails {
+				key: StorageKey(vec![1]),
+				return_type: ArchiveStorageDiffType::Hash,
+				child_trie_key: None,
+				child_trie_key_string: None,
+			},
+		];
+		assert_eq!(result[0], expected);
+	}
+
+	#[test]
+	fn dedup_with_different_child_tries() {
+		let items = vec![
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: Some("0x01".into()),
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: Some("0x02".into()),
+			},
+		];
+		let result = deduplicate_storage_diff_items(items).unwrap();
+		assert_eq!(result.len(), 2);
+		assert_eq!(result[0].len(), 1);
+		assert_eq!(result[1].len(), 1);
+
+		let expected = vec![
+			vec![DiffDetails {
+				key: StorageKey(vec![1]),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: Some(ChildInfo::new_default_from_vec(vec![1])),
+				child_trie_key_string: Some("0x01".into()),
+			}],
+			vec![DiffDetails {
+				key: StorageKey(vec![1]),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: Some(ChildInfo::new_default_from_vec(vec![2])),
+				child_trie_key_string: Some("0x02".into()),
+			}],
+		];
+		assert_eq!(result, expected);
+	}
+
+	#[test]
+	fn dedup_with_same_child_tries() {
+		let items = vec![
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: Some("0x01".into()),
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: Some("0x01".into()),
+			},
+		];
+		let result = deduplicate_storage_diff_items(items).unwrap();
+		assert_eq!(result.len(), 1);
+		assert_eq!(result[0].len(), 1);
+
+		let expected = vec![DiffDetails {
+			key: StorageKey(vec![1]),
+			return_type: ArchiveStorageDiffType::Value,
+			child_trie_key: Some(ChildInfo::new_default_from_vec(vec![1])),
+			child_trie_key_string: Some("0x01".into()),
+		}];
+		assert_eq!(result[0], expected);
+	}
+
+	#[test]
+	fn dedup_with_shorter_key_reverse_order() {
+		let items = vec![
+			ArchiveStorageDiffItem {
+				key: "0x01ff".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+		];
+		let result = deduplicate_storage_diff_items(items).unwrap();
+		assert_eq!(result.len(), 1);
+		assert_eq!(result[0].len(), 1);
+
+		let expected = vec![DiffDetails {
+			key: StorageKey(vec![1]),
+			return_type: ArchiveStorageDiffType::Value,
+			child_trie_key: None,
+			child_trie_key_string: None,
+		}];
+		assert_eq!(result[0], expected);
+	}
+
+	#[test]
+	fn dedup_multiple_child_tries() {
+		let items = vec![
+			ArchiveStorageDiffItem {
+				key: "0x02".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: Some("0x01".into()),
+			},
+			ArchiveStorageDiffItem {
+				key: "0x02".into(),
+				return_type: ArchiveStorageDiffType::Hash,
+				child_trie_key: Some("0x01".into()),
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: Some("0x02".into()),
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01".into(),
+				return_type: ArchiveStorageDiffType::Hash,
+				child_trie_key: Some("0x02".into()),
+			},
+			ArchiveStorageDiffItem {
+				key: "0x01ff".into(),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: Some("0x02".into()),
+			},
+		];
+
+		let result = deduplicate_storage_diff_items(items).unwrap();
+
+		let expected = vec![
+			vec![DiffDetails {
+				key: StorageKey(vec![2]),
+				return_type: ArchiveStorageDiffType::Value,
+				child_trie_key: None,
+				child_trie_key_string: None,
+			}],
+			vec![
+				DiffDetails {
+					key: StorageKey(vec![1]),
+					return_type: ArchiveStorageDiffType::Value,
+					child_trie_key: Some(ChildInfo::new_default_from_vec(vec![1])),
+					child_trie_key_string: Some("0x01".into()),
+				},
+				DiffDetails {
+					key: StorageKey(vec![2]),
+					return_type: ArchiveStorageDiffType::Hash,
+					child_trie_key: Some(ChildInfo::new_default_from_vec(vec![1])),
+					child_trie_key_string: Some("0x01".into()),
+				},
+			],
+			vec![
+				DiffDetails {
+					key: StorageKey(vec![1]),
+					return_type: ArchiveStorageDiffType::Value,
+					child_trie_key: Some(ChildInfo::new_default_from_vec(vec![2])),
+					child_trie_key_string: Some("0x02".into()),
+				},
+				DiffDetails {
+					key: StorageKey(vec![1]),
+					return_type: ArchiveStorageDiffType::Hash,
+					child_trie_key: Some(ChildInfo::new_default_from_vec(vec![2])),
+					child_trie_key_string: Some("0x02".into()),
+				},
+			],
+		];
+
+		assert_eq!(result, expected);
+	}
+
+	#[test]
+	fn test_lexicographic_diff() {
+		let left = vec![1, 2, 3, 4, 5];
+		let right = vec![2, 3, 4, 5, 6];
+
+		let diff = lexicographic_diff(left.into_iter(), right.into_iter()).collect::<Vec<_>>();
+		let expected = vec![
+			Diff::Added(1),
+			Diff::Equal(2),
+			Diff::Equal(3),
+			Diff::Equal(4),
+			Diff::Equal(5),
+			Diff::Deleted(6),
+		];
+		assert_eq!(diff, expected);
+	}
+
+	#[test]
+	fn test_lexicographic_diff_one_side_empty() {
+		let left = vec![];
+		let right = vec![1, 2, 3, 4, 5, 6];
+
+		let diff = lexicographic_diff(left.into_iter(), right.into_iter()).collect::<Vec<_>>();
+		let expected = vec![
+			Diff::Deleted(1),
+			Diff::Deleted(2),
+			Diff::Deleted(3),
+			Diff::Deleted(4),
+			Diff::Deleted(5),
+			Diff::Deleted(6),
+		];
+		assert_eq!(diff, expected);
+
+		let left = vec![1, 2, 3, 4, 5, 6];
+		let right = vec![];
+
+		let diff = lexicographic_diff(left.into_iter(), right.into_iter()).collect::<Vec<_>>();
+		let expected = vec![
+			Diff::Added(1),
+			Diff::Added(2),
+			Diff::Added(3),
+			Diff::Added(4),
+			Diff::Added(5),
+			Diff::Added(6),
+		];
+		assert_eq!(diff, expected);
+	}
+}
diff --git a/substrate/client/rpc-spec-v2/src/archive/tests.rs b/substrate/client/rpc-spec-v2/src/archive/tests.rs
index 078016f5b3e210311c80c26fc6afaf587a1780dd..994c5d28bd617f68d8b8853ad4d06a2447e0c04b 100644
--- a/substrate/client/rpc-spec-v2/src/archive/tests.rs
+++ b/substrate/client/rpc-spec-v2/src/archive/tests.rs
@@ -18,8 +18,9 @@
 
 use crate::{
 	common::events::{
-		ArchiveStorageMethodOk, ArchiveStorageResult, PaginatedStorageQuery, StorageQueryType,
-		StorageResultType,
+		ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageDiffOperationType,
+		ArchiveStorageDiffResult, ArchiveStorageDiffType, ArchiveStorageMethodOk,
+		ArchiveStorageResult, PaginatedStorageQuery, StorageQueryType, StorageResultType,
 	},
 	hex_string, MethodResult,
 };
@@ -32,10 +33,13 @@ use super::{
 use assert_matches::assert_matches;
 use codec::{Decode, Encode};
 use jsonrpsee::{
-	core::EmptyServerParams as EmptyParams, rpc_params, MethodsError as Error, RpcModule,
+	core::{server::Subscription as RpcSubscription, EmptyServerParams as EmptyParams},
+	rpc_params, MethodsError as Error, RpcModule,
 };
+
 use sc_block_builder::BlockBuilderBuilder;
 use sc_client_api::ChildInfo;
+use sc_rpc::testing::TokioTestExecutor;
 use sp_blockchain::HeaderBackend;
 use sp_consensus::BlockOrigin;
 use sp_core::{Blake2Hasher, Hasher};
@@ -78,6 +82,7 @@ fn setup_api(
 		client.clone(),
 		backend,
 		CHAIN_GENESIS,
+		Arc::new(TokioTestExecutor::default()),
 		ArchiveConfig { max_descendant_responses, max_queried_items },
 	)
 	.into_rpc();
@@ -85,6 +90,15 @@ fn setup_api(
 	(client, api)
 }
 
+async fn get_next_event<T: serde::de::DeserializeOwned>(sub: &mut RpcSubscription) -> T {
+	let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next())
+		.await
+		.unwrap()
+		.unwrap()
+		.unwrap();
+	event
+}
+
 #[tokio::test]
 async fn archive_genesis() {
 	let (_client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT);
@@ -838,3 +852,260 @@ async fn archive_storage_discarded_items() {
 		_ => panic!("Unexpected result"),
 	};
 }
+
+#[tokio::test]
+async fn archive_storage_diff_main_trie() {
+	let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT);
+
+	let mut builder = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(client.chain_info().genesis_hash)
+		.with_parent_block_number(0)
+		.build()
+		.unwrap();
+	builder.push_storage_change(b":A".to_vec(), Some(b"B".to_vec())).unwrap();
+	builder.push_storage_change(b":AA".to_vec(), Some(b"BB".to_vec())).unwrap();
+	let prev_block = builder.build().unwrap().block;
+	let prev_hash = format!("{:?}", prev_block.header.hash());
+	client.import(BlockOrigin::Own, prev_block.clone()).await.unwrap();
+
+	let mut builder = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(prev_block.hash())
+		.with_parent_block_number(1)
+		.build()
+		.unwrap();
+	builder.push_storage_change(b":A".to_vec(), Some(b"11".to_vec())).unwrap();
+	builder.push_storage_change(b":AA".to_vec(), Some(b"22".to_vec())).unwrap();
+	builder.push_storage_change(b":AAA".to_vec(), Some(b"222".to_vec())).unwrap();
+	let block = builder.build().unwrap().block;
+	let block_hash = format!("{:?}", block.header.hash());
+	client.import(BlockOrigin::Own, block.clone()).await.unwrap();
+
+	// Search for items in the main trie:
+	// - values of keys under ":A"
+	// - hashes of keys under ":AA"
+	let items = vec![
+		ArchiveStorageDiffItem::<String> {
+			key: hex_string(b":A"),
+			return_type: ArchiveStorageDiffType::Value,
+			child_trie_key: None,
+		},
+		ArchiveStorageDiffItem::<String> {
+			key: hex_string(b":AA"),
+			return_type: ArchiveStorageDiffType::Hash,
+			child_trie_key: None,
+		},
+	];
+	let mut sub = api
+		.subscribe_unbounded(
+			"archive_unstable_storageDiff",
+			rpc_params![&block_hash, items.clone(), &prev_hash],
+		)
+		.await
+		.unwrap();
+
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_eq!(
+		ArchiveStorageDiffEvent::StorageDiff(ArchiveStorageDiffResult {
+			key: hex_string(b":A"),
+			result: StorageResultType::Value(hex_string(b"11")),
+			operation_type: ArchiveStorageDiffOperationType::Modified,
+			child_trie_key: None,
+		}),
+		event,
+	);
+
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_eq!(
+		ArchiveStorageDiffEvent::StorageDiff(ArchiveStorageDiffResult {
+			key: hex_string(b":AA"),
+			result: StorageResultType::Value(hex_string(b"22")),
+			operation_type: ArchiveStorageDiffOperationType::Modified,
+			child_trie_key: None,
+		}),
+		event,
+	);
+
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_eq!(
+		ArchiveStorageDiffEvent::StorageDiff(ArchiveStorageDiffResult {
+			key: hex_string(b":AA"),
+			result: StorageResultType::Hash(format!("{:?}", Blake2Hasher::hash(b"22"))),
+			operation_type: ArchiveStorageDiffOperationType::Modified,
+			child_trie_key: None,
+		}),
+		event,
+	);
+
+	// Added key.
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_eq!(
+		ArchiveStorageDiffEvent::StorageDiff(ArchiveStorageDiffResult {
+			key: hex_string(b":AAA"),
+			result: StorageResultType::Value(hex_string(b"222")),
+			operation_type: ArchiveStorageDiffOperationType::Added,
+			child_trie_key: None,
+		}),
+		event,
+	);
+
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_eq!(
+		ArchiveStorageDiffEvent::StorageDiff(ArchiveStorageDiffResult {
+			key: hex_string(b":AAA"),
+			result: StorageResultType::Hash(format!("{:?}", Blake2Hasher::hash(b"222"))),
+			operation_type: ArchiveStorageDiffOperationType::Added,
+			child_trie_key: None,
+		}),
+		event,
+	);
+
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_eq!(ArchiveStorageDiffEvent::StorageDiffDone, event);
+}
+
+#[tokio::test]
+async fn archive_storage_diff_no_changes() {
+	let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT);
+
+	// Build 2 identical blocks.
+	let mut builder = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(client.chain_info().genesis_hash)
+		.with_parent_block_number(0)
+		.build()
+		.unwrap();
+	builder.push_storage_change(b":A".to_vec(), Some(b"B".to_vec())).unwrap();
+	builder.push_storage_change(b":AA".to_vec(), Some(b"BB".to_vec())).unwrap();
+	builder.push_storage_change(b":B".to_vec(), Some(b"CC".to_vec())).unwrap();
+	builder.push_storage_change(b":BA".to_vec(), Some(b"CC".to_vec())).unwrap();
+	let prev_block = builder.build().unwrap().block;
+	let prev_hash = format!("{:?}", prev_block.header.hash());
+	client.import(BlockOrigin::Own, prev_block.clone()).await.unwrap();
+
+	let mut builder = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(prev_block.hash())
+		.with_parent_block_number(1)
+		.build()
+		.unwrap();
+	builder.push_storage_change(b":A".to_vec(), Some(b"B".to_vec())).unwrap();
+	builder.push_storage_change(b":AA".to_vec(), Some(b"BB".to_vec())).unwrap();
+	let block = builder.build().unwrap().block;
+	let block_hash = format!("{:?}", block.header.hash());
+	client.import(BlockOrigin::Own, block.clone()).await.unwrap();
+
+	// Search for items in the main trie with keys prefixed with ":A".
+	let items = vec![ArchiveStorageDiffItem::<String> {
+		key: hex_string(b":A"),
+		return_type: ArchiveStorageDiffType::Value,
+		child_trie_key: None,
+	}];
+	let mut sub = api
+		.subscribe_unbounded(
+			"archive_unstable_storageDiff",
+			rpc_params![&block_hash, items.clone(), &prev_hash],
+		)
+		.await
+		.unwrap();
+
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_eq!(ArchiveStorageDiffEvent::StorageDiffDone, event);
+}
+
+#[tokio::test]
+async fn archive_storage_diff_deleted_changes() {
+	let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT);
+
+	// Blocks are imported as forks.
+	let mut builder = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(client.chain_info().genesis_hash)
+		.with_parent_block_number(0)
+		.build()
+		.unwrap();
+	builder.push_storage_change(b":A".to_vec(), Some(b"B".to_vec())).unwrap();
+	builder.push_storage_change(b":AA".to_vec(), Some(b"BB".to_vec())).unwrap();
+	builder.push_storage_change(b":B".to_vec(), Some(b"CC".to_vec())).unwrap();
+	builder.push_storage_change(b":BA".to_vec(), Some(b"CC".to_vec())).unwrap();
+	let prev_block = builder.build().unwrap().block;
+	let prev_hash = format!("{:?}", prev_block.header.hash());
+	client.import(BlockOrigin::Own, prev_block.clone()).await.unwrap();
+
+	let mut builder = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(client.chain_info().genesis_hash)
+		.with_parent_block_number(0)
+		.build()
+		.unwrap();
+	builder
+		.push_transfer(Transfer {
+			from: AccountKeyring::Alice.into(),
+			to: AccountKeyring::Ferdie.into(),
+			amount: 41,
+			nonce: 0,
+		})
+		.unwrap();
+	builder.push_storage_change(b":A".to_vec(), Some(b"B".to_vec())).unwrap();
+	let block = builder.build().unwrap().block;
+	let block_hash = format!("{:?}", block.header.hash());
+	client.import(BlockOrigin::Own, block.clone()).await.unwrap();
+
+	// Search for items in the main trie with keys prefixed with ":A".
+	let items = vec![ArchiveStorageDiffItem::<String> {
+		key: hex_string(b":A"),
+		return_type: ArchiveStorageDiffType::Value,
+		child_trie_key: None,
+	}];
+
+	let mut sub = api
+		.subscribe_unbounded(
+			"archive_unstable_storageDiff",
+			rpc_params![&block_hash, items.clone(), &prev_hash],
+		)
+		.await
+		.unwrap();
+
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_eq!(
+		ArchiveStorageDiffEvent::StorageDiff(ArchiveStorageDiffResult {
+			key: hex_string(b":AA"),
+			result: StorageResultType::Value(hex_string(b"BB")),
+			operation_type: ArchiveStorageDiffOperationType::Deleted,
+			child_trie_key: None,
+		}),
+		event,
+	);
+
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_eq!(ArchiveStorageDiffEvent::StorageDiffDone, event);
+}
+
+#[tokio::test]
+async fn archive_storage_diff_invalid_params() {
+	let invalid_hash = hex_string(&INVALID_HASH);
+	let (_, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT);
+
+	// Invalid shape for parameters.
+	let items: Vec<ArchiveStorageDiffItem<String>> = Vec::new();
+	let err = api
+		.subscribe_unbounded(
+			"archive_unstable_storageDiff",
+			rpc_params!["123", items.clone(), &invalid_hash],
+		)
+		.await
+		.unwrap_err();
+	assert_matches!(err,
+		Error::JsonRpc(ref err) if err.code() == crate::chain_head::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params"
+	);
+
+	// The shape is right, but the block hash is invalid.
+	let items: Vec<ArchiveStorageDiffItem<String>> = Vec::new();
+	let mut sub = api
+		.subscribe_unbounded(
+			"archive_unstable_storageDiff",
+			rpc_params![&invalid_hash, items.clone(), &invalid_hash],
+		)
+		.await
+		.unwrap();
+
+	let event = get_next_event::<ArchiveStorageDiffEvent>(&mut sub).await;
+	assert_matches!(event,
+		ArchiveStorageDiffEvent::StorageDiffError(ref err) if err.error.contains("Header was not found")
+	);
+}
diff --git a/substrate/client/rpc-spec-v2/src/common/events.rs b/substrate/client/rpc-spec-v2/src/common/events.rs
index b1627d74c844e539dd6b71383008a083aeaa5de0..198a60bf4cac530158ca6b62344f0bad34363d2e 100644
--- a/substrate/client/rpc-spec-v2/src/common/events.rs
+++ b/substrate/client/rpc-spec-v2/src/common/events.rs
@@ -81,7 +81,7 @@ pub struct StorageResult {
 }
 
 /// The type of the storage query.
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub enum StorageResultType {
 	/// Fetch the value of the provided key.
@@ -136,17 +136,221 @@ pub struct ArchiveStorageMethodOk {
 }
 
 /// The error of a storage call.
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct ArchiveStorageMethodErr {
 	/// Reported error.
 	pub error: String,
 }
 
+/// The type of theƂ archive storage difference query.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum ArchiveStorageDiffType {
+	/// The result is provided as value of the key.
+	Value,
+	/// The result the hash of the value of the key.
+	Hash,
+}
+
+/// The storage item to query.
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ArchiveStorageDiffItem<Key> {
+	/// The provided key.
+	pub key: Key,
+	/// The type of the storage query.
+	pub return_type: ArchiveStorageDiffType,
+	/// The child trie key if provided.
+	#[serde(skip_serializing_if = "Option::is_none")]
+	#[serde(default)]
+	pub child_trie_key: Option<Key>,
+}
+
+/// The result of a storage difference call.
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ArchiveStorageDiffMethodResult {
+	/// Reported results.
+	pub result: Vec<ArchiveStorageDiffResult>,
+}
+
+/// The result of a storage difference call operation type.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum ArchiveStorageDiffOperationType {
+	/// The key is added.
+	Added,
+	/// The key is modified.
+	Modified,
+	/// The key is removed.
+	Deleted,
+}
+
+/// The result of an individual storage difference key.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ArchiveStorageDiffResult {
+	/// The hex-encoded key of the result.
+	pub key: String,
+	/// The result of the query.
+	#[serde(flatten)]
+	pub result: StorageResultType,
+	/// The operation type.
+	#[serde(rename = "type")]
+	pub operation_type: ArchiveStorageDiffOperationType,
+	/// The child trie key if provided.
+	#[serde(skip_serializing_if = "Option::is_none")]
+	#[serde(default)]
+	pub child_trie_key: Option<String>,
+}
+
+/// The event generated by the `archive_storageDiff` method.
+///
+/// The `archive_storageDiff` can generate the following events:
+///  - `storageDiff` event - generated when a `ArchiveStorageDiffResult` is produced.
+///  - `storageDiffError` event - generated when an error is produced.
+///  - `storageDiffDone` event - generated when the `archive_storageDiff` method completed.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+#[serde(tag = "event")]
+pub enum ArchiveStorageDiffEvent {
+	/// The `storageDiff` event.
+	StorageDiff(ArchiveStorageDiffResult),
+	/// The `storageDiffError` event.
+	StorageDiffError(ArchiveStorageMethodErr),
+	/// The `storageDiffDone` event.
+	StorageDiffDone,
+}
+
+impl ArchiveStorageDiffEvent {
+	/// Create a new `ArchiveStorageDiffEvent::StorageDiffError` event.
+	pub fn err(error: String) -> Self {
+		Self::StorageDiffError(ArchiveStorageMethodErr { error })
+	}
+
+	/// Checks if the event is a `StorageDiffDone` event.
+	pub fn is_done(&self) -> bool {
+		matches!(self, Self::StorageDiffDone)
+	}
+
+	/// Checks if the event is a `StorageDiffError` event.
+	pub fn is_err(&self) -> bool {
+		matches!(self, Self::StorageDiffError(_))
+	}
+}
+
 #[cfg(test)]
 mod tests {
 	use super::*;
 
+	#[test]
+	fn archive_diff_input() {
+		// Item with Value.
+		let item = ArchiveStorageDiffItem {
+			key: "0x1",
+			return_type: ArchiveStorageDiffType::Value,
+			child_trie_key: None,
+		};
+		// Encode
+		let ser = serde_json::to_string(&item).unwrap();
+		let exp = r#"{"key":"0x1","returnType":"value"}"#;
+		assert_eq!(ser, exp);
+		// Decode
+		let dec: ArchiveStorageDiffItem<&str> = serde_json::from_str(exp).unwrap();
+		assert_eq!(dec, item);
+
+		// Item with Hash.
+		let item = ArchiveStorageDiffItem {
+			key: "0x1",
+			return_type: ArchiveStorageDiffType::Hash,
+			child_trie_key: None,
+		};
+		// Encode
+		let ser = serde_json::to_string(&item).unwrap();
+		let exp = r#"{"key":"0x1","returnType":"hash"}"#;
+		assert_eq!(ser, exp);
+		// Decode
+		let dec: ArchiveStorageDiffItem<&str> = serde_json::from_str(exp).unwrap();
+		assert_eq!(dec, item);
+
+		// Item with Value and child trie key.
+		let item = ArchiveStorageDiffItem {
+			key: "0x1",
+			return_type: ArchiveStorageDiffType::Value,
+			child_trie_key: Some("0x2"),
+		};
+		// Encode
+		let ser = serde_json::to_string(&item).unwrap();
+		let exp = r#"{"key":"0x1","returnType":"value","childTrieKey":"0x2"}"#;
+		assert_eq!(ser, exp);
+		// Decode
+		let dec: ArchiveStorageDiffItem<&str> = serde_json::from_str(exp).unwrap();
+		assert_eq!(dec, item);
+
+		// Item with Hash and child trie key.
+		let item = ArchiveStorageDiffItem {
+			key: "0x1",
+			return_type: ArchiveStorageDiffType::Hash,
+			child_trie_key: Some("0x2"),
+		};
+		// Encode
+		let ser = serde_json::to_string(&item).unwrap();
+		let exp = r#"{"key":"0x1","returnType":"hash","childTrieKey":"0x2"}"#;
+		assert_eq!(ser, exp);
+		// Decode
+		let dec: ArchiveStorageDiffItem<&str> = serde_json::from_str(exp).unwrap();
+		assert_eq!(dec, item);
+	}
+
+	#[test]
+	fn archive_diff_output() {
+		// Item with Value.
+		let item = ArchiveStorageDiffResult {
+			key: "0x1".into(),
+			result: StorageResultType::Value("res".into()),
+			operation_type: ArchiveStorageDiffOperationType::Added,
+			child_trie_key: None,
+		};
+		// Encode
+		let ser = serde_json::to_string(&item).unwrap();
+		let exp = r#"{"key":"0x1","value":"res","type":"added"}"#;
+		assert_eq!(ser, exp);
+		// Decode
+		let dec: ArchiveStorageDiffResult = serde_json::from_str(exp).unwrap();
+		assert_eq!(dec, item);
+
+		// Item with Hash.
+		let item = ArchiveStorageDiffResult {
+			key: "0x1".into(),
+			result: StorageResultType::Hash("res".into()),
+			operation_type: ArchiveStorageDiffOperationType::Modified,
+			child_trie_key: None,
+		};
+		// Encode
+		let ser = serde_json::to_string(&item).unwrap();
+		let exp = r#"{"key":"0x1","hash":"res","type":"modified"}"#;
+		assert_eq!(ser, exp);
+		// Decode
+		let dec: ArchiveStorageDiffResult = serde_json::from_str(exp).unwrap();
+		assert_eq!(dec, item);
+
+		// Item with Hash, child trie key and removed.
+		let item = ArchiveStorageDiffResult {
+			key: "0x1".into(),
+			result: StorageResultType::Hash("res".into()),
+			operation_type: ArchiveStorageDiffOperationType::Deleted,
+			child_trie_key: Some("0x2".into()),
+		};
+		// Encode
+		let ser = serde_json::to_string(&item).unwrap();
+		let exp = r#"{"key":"0x1","hash":"res","type":"deleted","childTrieKey":"0x2"}"#;
+		assert_eq!(ser, exp);
+		// Decode
+		let dec: ArchiveStorageDiffResult = serde_json::from_str(exp).unwrap();
+		assert_eq!(dec, item);
+	}
+
 	#[test]
 	fn storage_result() {
 		// Item with Value.
diff --git a/substrate/client/rpc-spec-v2/src/common/storage.rs b/substrate/client/rpc-spec-v2/src/common/storage.rs
index 2e24a8da8ca8451e93b91d0aa1e220a1829df7e4..673e20b2bc780c664135e4b502cecad753069f07 100644
--- a/substrate/client/rpc-spec-v2/src/common/storage.rs
+++ b/substrate/client/rpc-spec-v2/src/common/storage.rs
@@ -248,4 +248,19 @@ where
 		});
 		Ok((ret, maybe_next_query))
 	}
+
+	/// Raw iterator over the keys.
+	pub fn raw_keys_iter(
+		&self,
+		hash: Block::Hash,
+		child_key: Option<ChildInfo>,
+	) -> Result<impl Iterator<Item = StorageKey>, String> {
+		let keys_iter = if let Some(child_key) = child_key {
+			self.client.child_storage_keys(hash, child_key, None, None)
+		} else {
+			self.client.storage_keys(hash, None, None)
+		};
+
+		keys_iter.map_err(|err| err.to_string())
+	}
 }
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index ac9371a8941b35f22ba87c0d9b1a02e66d1c8c99..027a444012af5b12dcbea7a56f732ffd4255549e 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -755,6 +755,7 @@ where
 			client.clone(),
 			backend.clone(),
 			genesis_hash,
+			task_executor.clone(),
 			// Defaults to sensible limits for the `Archive`.
 			sc_rpc_spec_v2::archive::ArchiveConfig::default(),
 		)