diff --git a/bridges/bin/runtime-common/src/messages.rs b/bridges/bin/runtime-common/src/messages.rs
index 0fe9935dbdb6dfc776977ff8cfbad87d3eee9f6e..03801d5279d0a8453542336d8089f5c887a3d005 100644
--- a/bridges/bin/runtime-common/src/messages.rs
+++ b/bridges/bin/runtime-common/src/messages.rs
@@ -395,6 +395,7 @@ mod tests {
 	use codec::Encode;
 	use sp_core::H256;
 	use sp_runtime::traits::Header as _;
+	use sp_trie::accessed_nodes_tracker::Error as AccessedNodesTrackerError;
 
 	#[test]
 	fn verify_chain_message_rejects_message_with_too_large_declared_weight() {
@@ -541,7 +542,7 @@ mod tests {
 				target::verify_messages_proof::<OnThisChainBridge>(proof, 10)
 			},),
 			Err(VerificationError::HeaderChain(HeaderChainError::StorageProof(
-				StorageProofError::DuplicateNodesInProof
+				StorageProofError::StorageProof(sp_trie::StorageProofError::DuplicateNodes.into())
 			))),
 		);
 	}
@@ -553,7 +554,9 @@ mod tests {
 				proof.storage_proof.push(vec![42]);
 				target::verify_messages_proof::<OnThisChainBridge>(proof, 10)
 			},),
-			Err(VerificationError::StorageProof(StorageProofError::UnusedNodesInTheProof)),
+			Err(VerificationError::StorageProof(StorageProofError::AccessedNodesTracker(
+				AccessedNodesTrackerError::UnusedNodes.into()
+			))),
 		);
 	}
 
diff --git a/bridges/primitives/runtime/src/storage_proof.rs b/bridges/primitives/runtime/src/storage_proof.rs
index 1b706aa66c16fc73a21ce83f550bea8a8fe128e5..251ceec5a9ee7b18bce428a6e8128790e0542d29 100644
--- a/bridges/primitives/runtime/src/storage_proof.rs
+++ b/bridges/primitives/runtime/src/storage_proof.rs
@@ -21,15 +21,16 @@ use codec::{Decode, Encode};
 use frame_support::PalletError;
 use hash_db::{HashDB, Hasher, EMPTY_PREFIX};
 use scale_info::TypeInfo;
-use sp_std::{boxed::Box, collections::btree_set::BTreeSet, vec::Vec};
+use sp_std::{boxed::Box, vec::Vec};
+pub use sp_trie::RawStorageProof;
 use sp_trie::{
-	read_trie_value, LayoutV1, MemoryDB, Recorder, StorageProof, Trie, TrieConfiguration,
+	accessed_nodes_tracker::{AccessedNodesTracker, Error as AccessedNodesTrackerError},
+	read_trie_value,
+	recorder_ext::RecorderExt,
+	LayoutV1, MemoryDB, Recorder, StorageProof, StorageProofError, Trie, TrieConfiguration,
 	TrieDBBuilder, TrieError, TrieHash,
 };
 
-/// Raw storage proof type (just raw trie nodes).
-pub type RawStorageProof = Vec<Vec<u8>>;
-
 /// Storage proof size requirements.
 ///
 /// This is currently used by benchmarks when generating storage proofs.
@@ -51,10 +52,9 @@ pub struct StorageProofChecker<H>
 where
 	H: Hasher,
 {
-	proof_nodes_count: usize,
 	root: H::Out,
 	db: MemoryDB<H>,
-	recorder: Recorder<LayoutV1<H>>,
+	accessed_nodes_tracker: AccessedNodesTracker<H::Out>,
 }
 
 impl<H> StorageProofChecker<H>
@@ -65,52 +65,39 @@ where
 	///
 	/// This returns an error if the given proof is invalid with respect to the given root.
 	pub fn new(root: H::Out, proof: RawStorageProof) -> Result<Self, Error> {
-		// 1. we don't want extra items in the storage proof
-		// 2. `StorageProof` is storing all trie nodes in the `BTreeSet`
-		//
-		// => someone could simply add duplicate items to the proof and we won't be
-		// able to detect that by just using `StorageProof`
-		//
-		// => let's check it when we are converting our "raw proof" into `StorageProof`
-		let proof_nodes_count = proof.len();
-		let proof = StorageProof::new(proof);
-		if proof_nodes_count != proof.iter_nodes().count() {
-			return Err(Error::DuplicateNodesInProof)
-		}
+		let proof = StorageProof::new_with_duplicate_nodes_check(proof)
+			.map_err(|e| Error::StorageProof(e.into()))?;
+
+		let recorder = AccessedNodesTracker::new(proof.len());
 
 		let db = proof.into_memory_db();
 		if !db.contains(&root, EMPTY_PREFIX) {
 			return Err(Error::StorageRootMismatch)
 		}
 
-		let recorder = Recorder::default();
-		let checker = StorageProofChecker { proof_nodes_count, root, db, recorder };
-		Ok(checker)
+		Ok(StorageProofChecker { root, db, accessed_nodes_tracker: recorder })
 	}
 
 	/// Returns error if the proof has some nodes that are left intact by previous `read_value`
 	/// calls.
-	pub fn ensure_no_unused_nodes(mut self) -> Result<(), Error> {
-		let visited_nodes = self
-			.recorder
-			.drain()
-			.into_iter()
-			.map(|record| record.data)
-			.collect::<BTreeSet<_>>();
-		let visited_nodes_count = visited_nodes.len();
-		if self.proof_nodes_count == visited_nodes_count {
-			Ok(())
-		} else {
-			Err(Error::UnusedNodesInTheProof)
-		}
+	pub fn ensure_no_unused_nodes(self) -> Result<(), Error> {
+		self.accessed_nodes_tracker
+			.ensure_no_unused_nodes()
+			.map_err(|e| Error::AccessedNodesTracker(e.into()))
 	}
 
 	/// Reads a value from the available subset of storage. If the value cannot be read due to an
 	/// incomplete or otherwise invalid proof, this function returns an error.
 	pub fn read_value(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
 		// LayoutV1 or LayoutV0 is identical for proof that only read values.
-		read_trie_value::<LayoutV1<H>, _>(&self.db, &self.root, key, Some(&mut self.recorder), None)
-			.map_err(|_| Error::StorageValueUnavailable)
+		read_trie_value::<LayoutV1<H>, _>(
+			&self.db,
+			&self.root,
+			key,
+			Some(&mut self.accessed_nodes_tracker),
+			None,
+		)
+		.map_err(|_| Error::StorageValueUnavailable)
 	}
 
 	/// Reads and decodes a value from the available subset of storage. If the value cannot be read
@@ -145,10 +132,10 @@ where
 /// Storage proof related errors.
 #[derive(Encode, Decode, Clone, Eq, PartialEq, PalletError, Debug, TypeInfo)]
 pub enum Error {
-	/// Duplicate trie nodes are found in the proof.
-	DuplicateNodesInProof,
-	/// Unused trie nodes are found in the proof.
-	UnusedNodesInTheProof,
+	/// Error generated by the `AccessedNodesTrackerError`.
+	AccessedNodesTracker(StrippableError<AccessedNodesTrackerError>),
+	/// Error originating in the `storage_proof` module.
+	StorageProof(StrippableError<StorageProofError>),
 	/// Expected storage root is missing from the proof.
 	StorageRootMismatch,
 	/// Unable to reach expected storage value using provided trie nodes.
@@ -202,15 +189,7 @@ where
 		trie.get(&key)?;
 	}
 
-	// recorder may record the same trie node multiple times and we don't want duplicate nodes
-	// in our proofs => let's deduplicate it by collecting to the BTreeSet first
-	Ok(recorder
-		.drain()
-		.into_iter()
-		.map(|n| n.data.to_vec())
-		.collect::<BTreeSet<_>>()
-		.into_iter()
-		.collect())
+	Ok(recorder.into_raw_storage_proof())
 }
 
 #[cfg(test)]
@@ -243,30 +222,22 @@ pub mod tests {
 		);
 	}
 
-	#[test]
-	fn proof_with_duplicate_items_is_rejected() {
-		let (root, mut proof) = craft_valid_storage_proof();
-		proof.push(proof.first().unwrap().clone());
-
-		assert_eq!(
-			StorageProofChecker::<sp_core::Blake2Hasher>::new(root, proof).map(drop),
-			Err(Error::DuplicateNodesInProof),
-		);
-	}
-
 	#[test]
 	fn proof_with_unused_items_is_rejected() {
 		let (root, proof) = craft_valid_storage_proof();
 
 		let mut checker =
 			StorageProofChecker::<sp_core::Blake2Hasher>::new(root, proof.clone()).unwrap();
-		checker.read_value(b"key1").unwrap();
+		checker.read_value(b"key1").unwrap().unwrap();
 		checker.read_value(b"key2").unwrap();
 		checker.read_value(b"key4").unwrap();
 		checker.read_value(b"key22").unwrap();
 		assert_eq!(checker.ensure_no_unused_nodes(), Ok(()));
 
 		let checker = StorageProofChecker::<sp_core::Blake2Hasher>::new(root, proof).unwrap();
-		assert_eq!(checker.ensure_no_unused_nodes(), Err(Error::UnusedNodesInTheProof));
+		assert_eq!(
+			checker.ensure_no_unused_nodes(),
+			Err(Error::AccessedNodesTracker(AccessedNodesTrackerError::UnusedNodes.into()))
+		);
 	}
 }
diff --git a/substrate/frame/session/src/historical/mod.rs b/substrate/frame/session/src/historical/mod.rs
index b9cecea1a7f7144fb7f846548b827455dce0a1e5..618497e3d54db378717561beb47070204af99ab6 100644
--- a/substrate/frame/session/src/historical/mod.rs
+++ b/substrate/frame/session/src/historical/mod.rs
@@ -37,21 +37,22 @@ use sp_runtime::{
 };
 use sp_session::{MembershipProof, ValidatorCount};
 use sp_staking::SessionIndex;
-use sp_std::prelude::*;
+use sp_std::{fmt::Debug, prelude::*};
 use sp_trie::{
 	trie_types::{TrieDBBuilder, TrieDBMutBuilderV0},
-	LayoutV0, MemoryDB, Recorder, Trie, TrieMut, EMPTY_PREFIX,
+	LayoutV0, MemoryDB, Recorder, StorageProof, Trie, TrieMut, TrieRecorder,
 };
 
 use frame_support::{
 	print,
 	traits::{KeyOwnerProofSystem, ValidatorSet, ValidatorSetWithIdentification},
-	Parameter,
+	Parameter, LOG_TARGET,
 };
 
 use crate::{self as pallet_session, Pallet as Session};
 
 pub use pallet::*;
+use sp_trie::{accessed_nodes_tracker::AccessedNodesTracker, recorder_ext::RecorderExt};
 
 #[frame_support::pallet]
 pub mod pallet {
@@ -118,6 +119,16 @@ impl<T: Config> Pallet<T> {
 			}
 		})
 	}
+
+	fn full_id_validators() -> Vec<(T::ValidatorId, T::FullIdentification)> {
+		<Session<T>>::validators()
+			.into_iter()
+			.filter_map(|validator| {
+				T::FullIdentificationOf::convert(validator.clone())
+					.map(|full_id| (validator, full_id))
+			})
+			.collect::<Vec<_>>()
+	}
 }
 
 impl<T: Config> ValidatorSet<T::AccountId> for Pallet<T> {
@@ -264,35 +275,16 @@ impl<T: Config> ProvingTrie<T> {
 		Ok(ProvingTrie { db, root })
 	}
 
-	fn from_nodes(root: T::Hash, nodes: &[Vec<u8>]) -> Self {
-		use sp_trie::HashDBT;
-
-		let mut memory_db = MemoryDB::default();
-		for node in nodes {
-			HashDBT::insert(&mut memory_db, EMPTY_PREFIX, &node[..]);
-		}
-
-		ProvingTrie { db: memory_db, root }
+	fn from_proof(root: T::Hash, proof: StorageProof) -> Self {
+		ProvingTrie { db: proof.into_memory_db(), root }
 	}
 
 	/// Prove the full verification data for a given key and key ID.
 	pub fn prove(&self, key_id: KeyTypeId, key_data: &[u8]) -> Option<Vec<Vec<u8>>> {
 		let mut recorder = Recorder::<LayoutV0<T::Hashing>>::new();
-		{
-			let trie =
-				TrieDBBuilder::new(&self.db, &self.root).with_recorder(&mut recorder).build();
-			let val_idx = (key_id, key_data).using_encoded(|s| {
-				trie.get(s).ok()?.and_then(|raw| u32::decode(&mut &*raw).ok())
-			})?;
-
-			val_idx.using_encoded(|s| {
-				trie.get(s)
-					.ok()?
-					.and_then(|raw| <IdentificationTuple<T>>::decode(&mut &*raw).ok())
-			})?;
-		}
+		self.query(key_id, key_data, Some(&mut recorder));
 
-		Some(recorder.drain().into_iter().map(|r| r.data).collect())
+		Some(recorder.into_raw_storage_proof())
 	}
 
 	/// Access the underlying trie root.
@@ -300,10 +292,17 @@ impl<T: Config> ProvingTrie<T> {
 		&self.root
 	}
 
-	// Check a proof contained within the current memory-db. Returns `None` if the
-	// nodes within the current `MemoryDB` are insufficient to query the item.
-	fn query(&self, key_id: KeyTypeId, key_data: &[u8]) -> Option<IdentificationTuple<T>> {
-		let trie = TrieDBBuilder::new(&self.db, &self.root).build();
+	/// Search for a key inside the proof.
+	fn query(
+		&self,
+		key_id: KeyTypeId,
+		key_data: &[u8],
+		recorder: Option<&mut dyn TrieRecorder<T::Hash>>,
+	) -> Option<IdentificationTuple<T>> {
+		let trie = TrieDBBuilder::new(&self.db, &self.root)
+			.with_optional_recorder(recorder)
+			.build();
+
 		let val_idx = (key_id, key_data)
 			.using_encoded(|s| trie.get(s))
 			.ok()?
@@ -322,13 +321,7 @@ impl<T: Config, D: AsRef<[u8]>> KeyOwnerProofSystem<(KeyTypeId, D)> for Pallet<T
 
 	fn prove(key: (KeyTypeId, D)) -> Option<Self::Proof> {
 		let session = <Session<T>>::current_index();
-		let validators = <Session<T>>::validators()
-			.into_iter()
-			.filter_map(|validator| {
-				T::FullIdentificationOf::convert(validator.clone())
-					.map(|full_id| (validator, full_id))
-			})
-			.collect::<Vec<_>>();
+		let validators = Self::full_id_validators();
 
 		let count = validators.len() as ValidatorCount;
 
@@ -343,30 +336,35 @@ impl<T: Config, D: AsRef<[u8]>> KeyOwnerProofSystem<(KeyTypeId, D)> for Pallet<T
 	}
 
 	fn check_proof(key: (KeyTypeId, D), proof: Self::Proof) -> Option<IdentificationTuple<T>> {
-		let (id, data) = key;
-
-		if proof.session == <Session<T>>::current_index() {
-			<Session<T>>::key_owner(id, data.as_ref()).and_then(|owner| {
-				T::FullIdentificationOf::convert(owner.clone()).and_then(move |id| {
-					let count = <Session<T>>::validators().len() as ValidatorCount;
-
-					if count != proof.validator_count {
-						return None
-					}
+		fn print_error<E: Debug>(e: E) {
+			log::error!(
+				target: LOG_TARGET,
+				"Rejecting equivocation report because of key ownership proof error: {:?}", e
+			);
+		}
 
-					Some((owner, id))
-				})
-			})
+		let (id, data) = key;
+		let (root, count) = if proof.session == <Session<T>>::current_index() {
+			let validators = Self::full_id_validators();
+			let count = validators.len() as ValidatorCount;
+			let trie = ProvingTrie::<T>::generate_for(validators).ok()?;
+			(trie.root, count)
 		} else {
-			let (root, count) = <HistoricalSessions<T>>::get(&proof.session)?;
-
-			if count != proof.validator_count {
-				return None
-			}
+			<HistoricalSessions<T>>::get(&proof.session)?
+		};
 
-			let trie = ProvingTrie::<T>::from_nodes(root, &proof.trie_nodes);
-			trie.query(id, data.as_ref())
+		if count != proof.validator_count {
+			return None
 		}
+
+		let proof = StorageProof::new_with_duplicate_nodes_check(proof.trie_nodes)
+			.map_err(print_error)
+			.ok()?;
+		let mut accessed_nodes_tracker = AccessedNodesTracker::<T::Hash>::new(proof.len());
+		let trie = ProvingTrie::<T>::from_proof(root, proof);
+		let res = trie.query(id, data.as_ref(), Some(&mut accessed_nodes_tracker))?;
+		accessed_nodes_tracker.ensure_no_unused_nodes().map_err(print_error).ok()?;
+		Some(res)
 	}
 }
 
diff --git a/substrate/primitives/trie/src/accessed_nodes_tracker.rs b/substrate/primitives/trie/src/accessed_nodes_tracker.rs
new file mode 100644
index 0000000000000000000000000000000000000000..378e3c2812c06fe7530c391e06b390e8bec0c95f
--- /dev/null
+++ b/substrate/primitives/trie/src/accessed_nodes_tracker.rs
@@ -0,0 +1,119 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// 	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Helpers for checking for duplicate nodes.
+
+use alloc::collections::BTreeSet;
+use core::hash::Hash;
+use scale_info::TypeInfo;
+use sp_core::{Decode, Encode};
+use trie_db::{RecordedForKey, TrieAccess, TrieRecorder};
+
+/// Error associated with the `AccessedNodesTracker` module.
+#[derive(Encode, Decode, Clone, Eq, PartialEq, Debug, TypeInfo)]
+pub enum Error {
+	/// The proof contains unused nodes.
+	UnusedNodes,
+}
+
+/// Helper struct used to ensure that a storage proof doesn't contain duplicate or unused nodes.
+///
+/// The struct needs to be used as a `TrieRecorder` and `ensure_no_unused_nodes()` has to be called
+/// to actually perform the check.
+pub struct AccessedNodesTracker<H: Hash> {
+	proof_nodes_count: usize,
+	recorder: BTreeSet<H>,
+}
+
+impl<H: Hash> AccessedNodesTracker<H> {
+	/// Create a new instance of `RedundantNodesChecker`, starting from a `RawStorageProof`.
+	pub fn new(proof_nodes_count: usize) -> Self {
+		Self { proof_nodes_count, recorder: BTreeSet::new() }
+	}
+
+	/// Ensure that all the nodes in the proof have been accessed.
+	pub fn ensure_no_unused_nodes(self) -> Result<(), Error> {
+		if self.proof_nodes_count != self.recorder.len() {
+			return Err(Error::UnusedNodes)
+		}
+
+		Ok(())
+	}
+}
+
+impl<H: Hash + Ord> TrieRecorder<H> for AccessedNodesTracker<H> {
+	fn record(&mut self, access: TrieAccess<H>) {
+		match access {
+			TrieAccess::NodeOwned { hash, .. } |
+			TrieAccess::EncodedNode { hash, .. } |
+			TrieAccess::Value { hash, .. } => {
+				self.recorder.insert(hash);
+			},
+			_ => {},
+		}
+	}
+
+	fn trie_nodes_recorded_for_key(&self, _key: &[u8]) -> RecordedForKey {
+		RecordedForKey::None
+	}
+}
+
+#[cfg(test)]
+pub mod tests {
+	use super::*;
+	use crate::{tests::create_storage_proof, StorageProof};
+	use hash_db::Hasher;
+	use trie_db::{Trie, TrieDBBuilder};
+
+	type Hash = <sp_core::Blake2Hasher as Hasher>::Out;
+	type Layout = crate::LayoutV1<sp_core::Blake2Hasher>;
+
+	const TEST_DATA: &[(&[u8], &[u8])] =
+		&[(b"key1", &[1; 64]), (b"key2", &[2; 64]), (b"key3", &[3; 64])];
+
+	#[test]
+	fn proof_with_unused_nodes_is_rejected() {
+		let (raw_proof, root) = create_storage_proof::<Layout>(TEST_DATA);
+		let proof = StorageProof::new(raw_proof.clone());
+		let proof_nodes_count = proof.len();
+
+		let mut accessed_nodes_tracker = AccessedNodesTracker::<Hash>::new(proof_nodes_count);
+		{
+			let db = proof.clone().into_memory_db();
+			let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+				.with_recorder(&mut accessed_nodes_tracker)
+				.build();
+
+			trie.get(b"key1").unwrap().unwrap();
+			trie.get(b"key2").unwrap().unwrap();
+			trie.get(b"key3").unwrap().unwrap();
+		}
+		assert_eq!(accessed_nodes_tracker.ensure_no_unused_nodes(), Ok(()));
+
+		let mut accessed_nodes_tracker = AccessedNodesTracker::<Hash>::new(proof_nodes_count);
+		{
+			let db = proof.into_memory_db();
+			let trie = TrieDBBuilder::<Layout>::new(&db, &root)
+				.with_recorder(&mut accessed_nodes_tracker)
+				.build();
+
+			trie.get(b"key1").unwrap().unwrap();
+			trie.get(b"key2").unwrap().unwrap();
+		}
+		assert_eq!(accessed_nodes_tracker.ensure_no_unused_nodes(), Err(Error::UnusedNodes));
+	}
+}
diff --git a/substrate/primitives/trie/src/lib.rs b/substrate/primitives/trie/src/lib.rs
index 0c14e3af196d062e08f86ee8b6a7fa370a2a28bd..ef6b6a5743c2bbc6f3606a5dd645ff0e47c8a360 100644
--- a/substrate/primitives/trie/src/lib.rs
+++ b/substrate/primitives/trie/src/lib.rs
@@ -21,6 +21,7 @@
 
 extern crate alloc;
 
+pub mod accessed_nodes_tracker;
 #[cfg(feature = "std")]
 pub mod cache;
 mod error;
@@ -28,6 +29,7 @@ mod node_codec;
 mod node_header;
 #[cfg(feature = "std")]
 pub mod recorder;
+pub mod recorder_ext;
 mod storage_proof;
 mod trie_codec;
 mod trie_stream;
@@ -46,7 +48,7 @@ use hash_db::{Hasher, Prefix};
 pub use memory_db::{prefixed_key, HashKey, KeyFunction, PrefixedKey};
 /// The Substrate format implementation of `NodeCodec`.
 pub use node_codec::NodeCodec;
-pub use storage_proof::{CompactProof, StorageProof};
+pub use storage_proof::{CompactProof, StorageProof, StorageProofError};
 /// Trie codec reexport, mainly child trie support
 /// for trie compact proof.
 pub use trie_codec::{decode_compact, encode_compact, Error as CompactProofError};
@@ -64,6 +66,9 @@ pub use trie_db::{proof::VerifyError, MerkleValue};
 /// The Substrate format implementation of `TrieStream`.
 pub use trie_stream::TrieStream;
 
+/// Raw storage proof type (just raw trie nodes).
+pub type RawStorageProof = Vec<Vec<u8>>;
+
 /// substrate trie layout
 pub struct LayoutV0<H>(PhantomData<H>);
 
@@ -616,6 +621,50 @@ mod tests {
 
 	type MemoryDBMeta<H> = memory_db::MemoryDB<H, memory_db::HashKey<H>, trie_db::DBValue>;
 
+	pub fn create_trie<L: TrieLayout>(
+		data: &[(&[u8], &[u8])],
+	) -> (MemoryDB<L::Hash>, trie_db::TrieHash<L>) {
+		let mut db = MemoryDB::default();
+		let mut root = Default::default();
+
+		{
+			let mut trie = trie_db::TrieDBMutBuilder::<L>::new(&mut db, &mut root).build();
+			for (k, v) in data {
+				trie.insert(k, v).expect("Inserts data");
+			}
+		}
+
+		let mut recorder = Recorder::<L>::new();
+		{
+			let trie = trie_db::TrieDBBuilder::<L>::new(&mut db, &mut root)
+				.with_recorder(&mut recorder)
+				.build();
+			for (k, _v) in data {
+				trie.get(k).unwrap();
+			}
+		}
+
+		(db, root)
+	}
+
+	pub fn create_storage_proof<L: TrieLayout>(
+		data: &[(&[u8], &[u8])],
+	) -> (RawStorageProof, trie_db::TrieHash<L>) {
+		let (db, root) = create_trie::<L>(data);
+
+		let mut recorder = Recorder::<L>::new();
+		{
+			let trie = trie_db::TrieDBBuilder::<L>::new(&db, &root)
+				.with_recorder(&mut recorder)
+				.build();
+			for (k, _v) in data {
+				trie.get(k).unwrap();
+			}
+		}
+
+		(recorder.drain().into_iter().map(|record| record.data).collect(), root)
+	}
+
 	fn hashed_null_node<T: TrieConfiguration>() -> TrieHash<T> {
 		<T::Codec as NodeCodecT>::hashed_null_node()
 	}
diff --git a/substrate/primitives/trie/src/recorder.rs b/substrate/primitives/trie/src/recorder.rs
index 22a22b33b370994d554415519b4ee42fd82ae891..2886577eddc60a3a902321f312acdeb9a5262965 100644
--- a/substrate/primitives/trie/src/recorder.rs
+++ b/substrate/primitives/trie/src/recorder.rs
@@ -145,7 +145,7 @@ impl<H: Hasher> Recorder<H> {
 
 	/// Convert the recording to a [`StorageProof`].
 	///
-	/// In contrast to [`Self::drain_storage_proof`] this doesn't consumes and doesn't clears the
+	/// In contrast to [`Self::drain_storage_proof`] this doesn't consume and doesn't clear the
 	/// recordings.
 	///
 	/// Returns the [`StorageProof`].
@@ -429,7 +429,8 @@ impl<'a, H: Hasher> trie_db::TrieRecorder<H::Out> for TrieRecorder<'a, H> {
 #[cfg(test)]
 mod tests {
 	use super::*;
-	use trie_db::{Trie, TrieDBBuilder, TrieDBMutBuilder, TrieHash, TrieMut, TrieRecorder};
+	use crate::tests::create_trie;
+	use trie_db::{Trie, TrieDBBuilder, TrieRecorder};
 
 	type MemoryDB = crate::MemoryDB<sp_core::Blake2Hasher>;
 	type Layout = crate::LayoutV1<sp_core::Blake2Hasher>;
@@ -438,23 +439,9 @@ mod tests {
 	const TEST_DATA: &[(&[u8], &[u8])] =
 		&[(b"key1", &[1; 64]), (b"key2", &[2; 64]), (b"key3", &[3; 64]), (b"key4", &[4; 64])];
 
-	fn create_trie() -> (MemoryDB, TrieHash<Layout>) {
-		let mut db = MemoryDB::default();
-		let mut root = Default::default();
-
-		{
-			let mut trie = TrieDBMutBuilder::<Layout>::new(&mut db, &mut root).build();
-			for (k, v) in TEST_DATA {
-				trie.insert(k, v).expect("Inserts data");
-			}
-		}
-
-		(db, root)
-	}
-
 	#[test]
 	fn recorder_works() {
-		let (db, root) = create_trie();
+		let (db, root) = create_trie::<Layout>(TEST_DATA);
 
 		let recorder = Recorder::default();
 
@@ -498,7 +485,7 @@ mod tests {
 
 	#[test]
 	fn recorder_transactions_rollback_work() {
-		let (db, root) = create_trie();
+		let (db, root) = create_trie::<Layout>(TEST_DATA);
 
 		let recorder = Recorder::default();
 		let mut stats = vec![RecorderStats::default()];
@@ -547,7 +534,7 @@ mod tests {
 
 	#[test]
 	fn recorder_transactions_commit_work() {
-		let (db, root) = create_trie();
+		let (db, root) = create_trie::<Layout>(TEST_DATA);
 
 		let recorder = Recorder::default();
 
@@ -586,7 +573,7 @@ mod tests {
 
 	#[test]
 	fn recorder_transactions_commit_and_rollback_work() {
-		let (db, root) = create_trie();
+		let (db, root) = create_trie::<Layout>(TEST_DATA);
 
 		let recorder = Recorder::default();
 
@@ -645,7 +632,7 @@ mod tests {
 	#[test]
 	fn recorder_transaction_accessed_keys_works() {
 		let key = TEST_DATA[0].0;
-		let (db, root) = create_trie();
+		let (db, root) = create_trie::<Layout>(TEST_DATA);
 
 		let recorder = Recorder::default();
 
diff --git a/substrate/primitives/trie/src/recorder_ext.rs b/substrate/primitives/trie/src/recorder_ext.rs
new file mode 100644
index 0000000000000000000000000000000000000000..866d5b72c5d64f4fb28178645ea9425158bf0ffc
--- /dev/null
+++ b/substrate/primitives/trie/src/recorder_ext.rs
@@ -0,0 +1,47 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// 	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Extension for the default recorder.
+
+use crate::RawStorageProof;
+use alloc::{collections::BTreeSet, vec::Vec};
+use trie_db::{Recorder, TrieLayout};
+
+/// Convenience extension for the `Recorder` struct.
+///
+/// Used to deduplicate some logic.
+pub trait RecorderExt<L: TrieLayout>
+where
+	Self: Sized,
+{
+	/// Convert the recorder into a `BTreeSet`.
+	fn into_set(self) -> BTreeSet<Vec<u8>>;
+
+	/// Convert the recorder into a `RawStorageProof`, avoiding duplicate nodes.
+	fn into_raw_storage_proof(self) -> RawStorageProof {
+		// The recorder may record the same trie node multiple times,
+		// and we don't want duplicate nodes in our proofs
+		// => let's deduplicate it by collecting to a BTreeSet first
+		self.into_set().into_iter().collect()
+	}
+}
+
+impl<L: TrieLayout> RecorderExt<L> for Recorder<L> {
+	fn into_set(mut self) -> BTreeSet<Vec<u8>> {
+		self.drain().into_iter().map(|record| record.data).collect::<BTreeSet<_>>()
+	}
+}
diff --git a/substrate/primitives/trie/src/storage_proof.rs b/substrate/primitives/trie/src/storage_proof.rs
index e46c49be19cb84af46f63003abcdb56ca421d9d0..a9f6298742f648953ecd662dd27fca2b63c6c29e 100644
--- a/substrate/primitives/trie/src/storage_proof.rs
+++ b/substrate/primitives/trie/src/storage_proof.rs
@@ -25,6 +25,13 @@ use scale_info::TypeInfo;
 // with `LayoutV0`.
 use crate::LayoutV1 as Layout;
 
+/// Error associated with the `storage_proof` module.
+#[derive(Encode, Decode, Clone, Eq, PartialEq, Debug, TypeInfo)]
+pub enum StorageProofError {
+	/// The proof contains duplicate nodes.
+	DuplicateNodes,
+}
+
 /// A proof that some set of key-value pairs are included in the storage trie. The proof contains
 /// the storage values so that the partial storage backend can be reconstructed by a verifier that
 /// does not already have access to the key-value pairs.
@@ -43,6 +50,22 @@ impl StorageProof {
 		StorageProof { trie_nodes: BTreeSet::from_iter(trie_nodes) }
 	}
 
+	/// Constructs a storage proof from a subset of encoded trie nodes in a storage backend.
+	///
+	/// Returns an error if the provided subset of encoded trie nodes contains duplicates.
+	pub fn new_with_duplicate_nodes_check(
+		trie_nodes: impl IntoIterator<Item = Vec<u8>>,
+	) -> Result<Self, StorageProofError> {
+		let mut trie_nodes_set = BTreeSet::new();
+		for node in trie_nodes {
+			if !trie_nodes_set.insert(node) {
+				return Err(StorageProofError::DuplicateNodes);
+			}
+		}
+
+		Ok(StorageProof { trie_nodes: trie_nodes_set })
+	}
+
 	/// Returns a new empty proof.
 	///
 	/// An empty proof is capable of only proving trivial statements (ie. that an empty set of
@@ -56,6 +79,11 @@ impl StorageProof {
 		self.trie_nodes.is_empty()
 	}
 
+	/// Returns the number of nodes in the proof.
+	pub fn len(&self) -> usize {
+		self.trie_nodes.len()
+	}
+
 	/// Convert into an iterator over encoded trie nodes in lexicographical order constructed
 	/// from the proof.
 	pub fn into_iter_nodes(self) -> impl Sized + DoubleEndedIterator<Item = Vec<u8>> {
@@ -198,3 +226,23 @@ impl CompactProof {
 		Ok((db, root))
 	}
 }
+
+#[cfg(test)]
+pub mod tests {
+	use super::*;
+	use crate::{tests::create_storage_proof, StorageProof};
+
+	type Layout = crate::LayoutV1<sp_core::Blake2Hasher>;
+
+	const TEST_DATA: &[(&[u8], &[u8])] =
+		&[(b"key1", &[1; 64]), (b"key2", &[2; 64]), (b"key3", &[3; 64]), (b"key11", &[4; 64])];
+
+	#[test]
+	fn proof_with_duplicate_nodes_is_rejected() {
+		let (raw_proof, _root) = create_storage_proof::<Layout>(TEST_DATA);
+		assert!(matches!(
+			StorageProof::new_with_duplicate_nodes_check(raw_proof),
+			Err(StorageProofError::DuplicateNodes)
+		));
+	}
+}