From 9a650c46fdf52809bd9b5d687a4e1f77748521d4 Mon Sep 17 00:00:00 2001
From: Sebastian Kunert <skunert49@gmail.com>
Date: Thu, 30 Nov 2023 15:56:34 +0100
Subject: [PATCH] PoV Reclaim (Clawback) Node Side (#1462)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This PR provides the infrastructure for the pov-reclaim mechanism
discussed in #209. The goal is to provide the current proof size to the
runtime so it can be used to reclaim storage weight.

## New Host Function
- A new host function is provided
[here](https://github.com/skunert/polkadot-sdk/blob/5b317fda3be205f4136f10d4490387ccd4f9765d/cumulus/primitives/pov-reclaim/src/lib.rs#L23).
It returns the size of the current proof size to the runtime. If
recording is not enabled, it returns 0.

## Implementation Overview
- Implement option to enable proof recording during import in the
client. This is currently enabled for `polkadot-parachain`,
`parachain-template` and the cumulus test node.
- Make the proof recorder ready for no-std. It was previously only
enabled for std environments, but we need to record the proof size in
`validate_block` too.
- Provide a recorder implementation that only the records the size of
incoming nodes and does not store the nodes itself.
- Fix benchmarks that were broken by async backing changes
- Provide new externalities extension that is registered by default if
proof recording is enabled.
- I think we should discuss the naming, pov-reclaim was more intuitive
to me, but we could also go with clawback like in the issue.

## Impact of proof recording during import
With proof recording: 6.3058 Kelem/s
Without proof recording: 6.3427 Kelem/s

The measured impact on the importing performance is quite low on my
machine using the block import benchmark. With proof recording I am
seeing a performance hit of 0.585%.

---------

Co-authored-by: command-bot <>
Co-authored-by: Davide Galassi <davxy@datawok.net>
Co-authored-by: Bastian Köcher <git@kchr.de>
---
 Cargo.lock                                    |  19 ++
 Cargo.toml                                    |   1 +
 cumulus/client/service/Cargo.toml             |   1 +
 cumulus/client/service/src/lib.rs             |   2 +
 cumulus/pallets/parachain-system/Cargo.toml   |   3 +
 .../src/validate_block/mod.rs                 |   4 +
 .../src/validate_block/trie_recorder.rs       | 286 ++++++++++++++++++
 .../proof-size-hostfunction/Cargo.toml        |  21 ++
 .../proof-size-hostfunction/src/lib.rs        | 107 +++++++
 cumulus/test/client/Cargo.toml                |   1 +
 cumulus/test/client/src/lib.rs                |   3 +-
 cumulus/test/service/benches/block_import.rs  |  92 +++---
 .../service/benches/block_import_glutton.rs   |  37 ++-
 .../test/service/benches/validate_block.rs    |  15 +-
 cumulus/test/service/src/bench_utils.rs       |   7 +-
 cumulus/test/service/src/lib.rs               |  54 ++--
 cumulus/test/service/src/main.rs              |   3 +-
 .../client/api/src/execution_extensions.rs    |   5 +-
 substrate/client/block-builder/Cargo.toml     |   1 +
 substrate/client/block-builder/src/lib.rs     |   5 +
 substrate/client/service/src/builder.rs       |  26 +-
 substrate/client/service/src/client/client.rs |  15 +-
 substrate/client/service/src/lib.rs           |   7 +-
 .../procedural/src/construct_runtime/mod.rs   |   2 +-
 .../api/proc-macro/src/decl_runtime_apis.rs   |   2 +-
 .../api/proc-macro/src/impl_runtime_apis.rs   |   2 +-
 .../runtime-interface/proc-macro/Cargo.toml   |   1 +
 .../proc-macro/src/runtime_interface/mod.rs   |   6 +
 .../state-machine/src/trie_backend.rs         | 150 +++++----
 .../state-machine/src/trie_backend_essence.rs | 154 +++++-----
 substrate/primitives/trie/Cargo.toml          |   2 +
 substrate/primitives/trie/src/lib.rs          |  26 ++
 .../trie/src/proof_size_extension.rs          |  39 +++
 substrate/primitives/trie/src/recorder.rs     |  41 ++-
 34 files changed, 899 insertions(+), 241 deletions(-)
 create mode 100644 cumulus/pallets/parachain-system/src/validate_block/trie_recorder.rs
 create mode 100644 cumulus/primitives/proof-size-hostfunction/Cargo.toml
 create mode 100644 cumulus/primitives/proof-size-hostfunction/src/lib.rs
 create mode 100644 substrate/primitives/trie/src/proof_size_extension.rs

diff --git a/Cargo.lock b/Cargo.lock
index de4471783ab..e7caa46cad8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3495,6 +3495,7 @@ dependencies = [
  "cumulus-client-network",
  "cumulus-client-pov-recovery",
  "cumulus-primitives-core",
+ "cumulus-primitives-proof-size-hostfunction",
  "cumulus-relay-chain-inprocess-interface",
  "cumulus-relay-chain-interface",
  "cumulus-relay-chain-minimal-node",
@@ -3564,6 +3565,7 @@ dependencies = [
  "cumulus-pallet-parachain-system-proc-macro",
  "cumulus-primitives-core",
  "cumulus-primitives-parachain-inherent",
+ "cumulus-primitives-proof-size-hostfunction",
  "cumulus-test-client",
  "cumulus-test-relay-sproof-builder",
  "environmental",
@@ -3595,6 +3597,7 @@ dependencies = [
  "sp-version",
  "staging-xcm",
  "trie-db",
+ "trie-standardmap",
 ]
 
 [[package]]
@@ -3743,6 +3746,18 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "cumulus-primitives-proof-size-hostfunction"
+version = "0.1.0"
+dependencies = [
+ "sp-core",
+ "sp-externalities 0.19.0",
+ "sp-io",
+ "sp-runtime-interface 17.0.0",
+ "sp-state-machine",
+ "sp-trie",
+]
+
 [[package]]
 name = "cumulus-primitives-timestamp"
 version = "0.1.0"
@@ -3903,6 +3918,7 @@ version = "0.1.0"
 dependencies = [
  "cumulus-primitives-core",
  "cumulus-primitives-parachain-inherent",
+ "cumulus-primitives-proof-size-hostfunction",
  "cumulus-test-relay-sproof-builder",
  "cumulus-test-runtime",
  "cumulus-test-service",
@@ -14766,6 +14782,7 @@ dependencies = [
  "sp-inherents",
  "sp-runtime",
  "sp-state-machine",
+ "sp-trie",
  "substrate-test-runtime-client",
 ]
 
@@ -17612,6 +17629,7 @@ name = "sp-runtime-interface-proc-macro"
 version = "11.0.0"
 dependencies = [
  "Inflector",
+ "expander 2.0.0",
  "proc-macro-crate",
  "proc-macro2",
  "quote",
@@ -17864,6 +17882,7 @@ dependencies = [
  "scale-info",
  "schnellru",
  "sp-core",
+ "sp-externalities 0.19.0",
  "sp-runtime",
  "sp-std 8.0.0",
  "thiserror",
diff --git a/Cargo.toml b/Cargo.toml
index a295aca819c..b585ceb5b44 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -98,6 +98,7 @@ members = [
 	"cumulus/primitives/aura",
 	"cumulus/primitives/core",
 	"cumulus/primitives/parachain-inherent",
+	"cumulus/primitives/proof-size-hostfunction",
 	"cumulus/primitives/timestamp",
 	"cumulus/primitives/utility",
 	"cumulus/test/client",
diff --git a/cumulus/client/service/Cargo.toml b/cumulus/client/service/Cargo.toml
index f80c65128d5..cc2f22e6565 100644
--- a/cumulus/client/service/Cargo.toml
+++ b/cumulus/client/service/Cargo.toml
@@ -38,6 +38,7 @@ cumulus-client-consensus-common = { path = "../consensus/common" }
 cumulus-client-pov-recovery = { path = "../pov-recovery" }
 cumulus-client-network = { path = "../network" }
 cumulus-primitives-core = { path = "../../primitives/core" }
+cumulus-primitives-proof-size-hostfunction = { path = "../../primitives/proof-size-hostfunction" }
 cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
 cumulus-relay-chain-inprocess-interface = { path = "../relay-chain-inprocess-interface" }
 cumulus-relay-chain-minimal-node = { path = "../relay-chain-minimal-node" }
diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs
index f8ebca11c8c..950e59aff24 100644
--- a/cumulus/client/service/src/lib.rs
+++ b/cumulus/client/service/src/lib.rs
@@ -52,6 +52,8 @@ use sp_core::{traits::SpawnNamed, Decode};
 use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header};
 use std::{sync::Arc, time::Duration};
 
+pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size;
+
 // Given the sporadic nature of the explicit recovery operation and the
 // possibility to retry infinite times this value is more than enough.
 // In practice here we expect no more than one queued messages.
diff --git a/cumulus/pallets/parachain-system/Cargo.toml b/cumulus/pallets/parachain-system/Cargo.toml
index 5600c95a2a6..cf6af4b4786 100644
--- a/cumulus/pallets/parachain-system/Cargo.toml
+++ b/cumulus/pallets/parachain-system/Cargo.toml
@@ -39,11 +39,13 @@ xcm = { package = "staging-xcm", path = "../../../polkadot/xcm", default-feature
 cumulus-pallet-parachain-system-proc-macro = { path = "proc-macro", default-features = false }
 cumulus-primitives-core = { path = "../../primitives/core", default-features = false }
 cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent", default-features = false }
+cumulus-primitives-proof-size-hostfunction = { path = "../../primitives/proof-size-hostfunction", default-features = false }
 
 [dev-dependencies]
 assert_matches = "1.5"
 hex-literal = "0.4.1"
 lazy_static = "1.4"
+trie-standardmap = "0.16.0"
 rand = "0.8.5"
 futures = "0.3.28"
 
@@ -65,6 +67,7 @@ std = [
 	"cumulus-pallet-parachain-system-proc-macro/std",
 	"cumulus-primitives-core/std",
 	"cumulus-primitives-parachain-inherent/std",
+	"cumulus-primitives-proof-size-hostfunction/std",
 	"environmental/std",
 	"frame-benchmarking/std",
 	"frame-support/std",
diff --git a/cumulus/pallets/parachain-system/src/validate_block/mod.rs b/cumulus/pallets/parachain-system/src/validate_block/mod.rs
index db149401638..763a4cffd77 100644
--- a/cumulus/pallets/parachain-system/src/validate_block/mod.rs
+++ b/cumulus/pallets/parachain-system/src/validate_block/mod.rs
@@ -26,6 +26,10 @@ mod tests;
 #[doc(hidden)]
 mod trie_cache;
 
+#[cfg(any(test, not(feature = "std")))]
+#[doc(hidden)]
+mod trie_recorder;
+
 #[cfg(not(feature = "std"))]
 #[doc(hidden)]
 pub use bytes;
diff --git a/cumulus/pallets/parachain-system/src/validate_block/trie_recorder.rs b/cumulus/pallets/parachain-system/src/validate_block/trie_recorder.rs
new file mode 100644
index 00000000000..e73aef70aa4
--- /dev/null
+++ b/cumulus/pallets/parachain-system/src/validate_block/trie_recorder.rs
@@ -0,0 +1,286 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
+
+//! Provide a specialized trie-recorder and provider for use in validate-block.
+//!
+//! This file defines two main structs, [`SizeOnlyRecorder`] and
+//! [`SizeOnlyRecorderProvider`]. They are used to track the current
+//! proof-size without actually recording the accessed nodes themselves.
+
+use codec::Encode;
+
+use sp_std::{
+	cell::{RefCell, RefMut},
+	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
+	rc::Rc,
+};
+use sp_trie::{NodeCodec, ProofSizeProvider, StorageProof};
+use trie_db::{Hasher, RecordedForKey, TrieAccess};
+
+/// A trie recorder that only keeps track of the proof size.
+///
+/// The internal size counting logic should align
+/// with ['sp_trie::recorder::Recorder'].
+pub(crate) struct SizeOnlyRecorder<'a, H: Hasher> {
+	seen_nodes: RefMut<'a, BTreeSet<H::Out>>,
+	encoded_size: RefMut<'a, usize>,
+	recorded_keys: RefMut<'a, BTreeMap<Rc<[u8]>, RecordedForKey>>,
+}
+
+impl<'a, H: trie_db::Hasher> trie_db::TrieRecorder<H::Out> for SizeOnlyRecorder<'a, H> {
+	fn record(&mut self, access: TrieAccess<'_, H::Out>) {
+		let mut encoded_size_update = 0;
+		match access {
+			TrieAccess::NodeOwned { hash, node_owned } =>
+				if self.seen_nodes.insert(hash) {
+					let node = node_owned.to_encoded::<NodeCodec<H>>();
+					encoded_size_update += node.encoded_size();
+				},
+			TrieAccess::EncodedNode { hash, encoded_node } =>
+				if self.seen_nodes.insert(hash) {
+					encoded_size_update += encoded_node.encoded_size();
+				},
+			TrieAccess::Value { hash, value, full_key } => {
+				if self.seen_nodes.insert(hash) {
+					encoded_size_update += value.encoded_size();
+				}
+				self.recorded_keys
+					.entry(full_key.into())
+					.and_modify(|e| *e = RecordedForKey::Value)
+					.or_insert_with(|| RecordedForKey::Value);
+			},
+			TrieAccess::Hash { full_key } => {
+				self.recorded_keys
+					.entry(full_key.into())
+					.or_insert_with(|| RecordedForKey::Hash);
+			},
+			TrieAccess::NonExisting { full_key } => {
+				self.recorded_keys
+					.entry(full_key.into())
+					.and_modify(|e| *e = RecordedForKey::Value)
+					.or_insert_with(|| RecordedForKey::Value);
+			},
+			TrieAccess::InlineValue { full_key } => {
+				self.recorded_keys
+					.entry(full_key.into())
+					.and_modify(|e| *e = RecordedForKey::Value)
+					.or_insert_with(|| RecordedForKey::Value);
+			},
+		};
+
+		*self.encoded_size += encoded_size_update;
+	}
+
+	fn trie_nodes_recorded_for_key(&self, key: &[u8]) -> RecordedForKey {
+		self.recorded_keys.get(key).copied().unwrap_or(RecordedForKey::None)
+	}
+}
+
+#[derive(Clone)]
+pub(crate) struct SizeOnlyRecorderProvider<H: Hasher> {
+	seen_nodes: Rc<RefCell<BTreeSet<H::Out>>>,
+	encoded_size: Rc<RefCell<usize>>,
+	recorded_keys: Rc<RefCell<BTreeMap<Rc<[u8]>, RecordedForKey>>>,
+}
+
+impl<H: Hasher> SizeOnlyRecorderProvider<H> {
+	pub fn new() -> Self {
+		Self {
+			seen_nodes: Default::default(),
+			encoded_size: Default::default(),
+			recorded_keys: Default::default(),
+		}
+	}
+}
+
+impl<H: trie_db::Hasher> sp_trie::TrieRecorderProvider<H> for SizeOnlyRecorderProvider<H> {
+	type Recorder<'a> = SizeOnlyRecorder<'a, H> where H: 'a;
+
+	fn drain_storage_proof(self) -> Option<StorageProof> {
+		None
+	}
+
+	fn as_trie_recorder(&self, _storage_root: H::Out) -> Self::Recorder<'_> {
+		SizeOnlyRecorder {
+			encoded_size: self.encoded_size.borrow_mut(),
+			seen_nodes: self.seen_nodes.borrow_mut(),
+			recorded_keys: self.recorded_keys.borrow_mut(),
+		}
+	}
+}
+
+impl<H: trie_db::Hasher> ProofSizeProvider for SizeOnlyRecorderProvider<H> {
+	fn estimate_encoded_size(&self) -> usize {
+		*self.encoded_size.borrow()
+	}
+}
+
+// This is safe here since we are single-threaded in WASM
+unsafe impl<H: Hasher> Send for SizeOnlyRecorderProvider<H> {}
+unsafe impl<H: Hasher> Sync for SizeOnlyRecorderProvider<H> {}
+
+#[cfg(test)]
+mod tests {
+	use rand::Rng;
+	use sp_trie::{
+		cache::{CacheSize, SharedTrieCache},
+		MemoryDB, ProofSizeProvider, TrieRecorderProvider,
+	};
+	use trie_db::{Trie, TrieDBBuilder, TrieDBMutBuilder, TrieHash, TrieMut, TrieRecorder};
+	use trie_standardmap::{Alphabet, StandardMap, ValueMode};
+
+	use super::*;
+
+	type Recorder = sp_trie::recorder::Recorder<sp_core::Blake2Hasher>;
+
+	fn create_trie() -> (
+		sp_trie::MemoryDB<sp_core::Blake2Hasher>,
+		TrieHash<sp_trie::LayoutV1<sp_core::Blake2Hasher>>,
+		Vec<(Vec<u8>, Vec<u8>)>,
+	) {
+		let mut db = MemoryDB::default();
+		let mut root = Default::default();
+
+		let mut seed = Default::default();
+		let test_data: Vec<(Vec<u8>, Vec<u8>)> = StandardMap {
+			alphabet: Alphabet::Low,
+			min_key: 16,
+			journal_key: 0,
+			value_mode: ValueMode::Random,
+			count: 1000,
+		}
+		.make_with(&mut seed)
+		.into_iter()
+		.map(|(k, v)| {
+			// Double the length so we end up with some values of 2 bytes and some of 64
+			let v = [v.clone(), v].concat();
+			(k, v)
+		})
+		.collect();
+
+		// Fill database with values
+		{
+			let mut trie = TrieDBMutBuilder::<sp_trie::LayoutV1<sp_core::Blake2Hasher>>::new(
+				&mut db, &mut root,
+			)
+			.build();
+			for (k, v) in &test_data {
+				trie.insert(k, v).expect("Inserts data");
+			}
+		}
+
+		(db, root, test_data)
+	}
+
+	#[test]
+	fn recorder_equivalence_cache() {
+		let (db, root, test_data) = create_trie();
+
+		let mut rng = rand::thread_rng();
+		for _ in 1..10 {
+			let reference_recorder = Recorder::default();
+			let recorder_for_test: SizeOnlyRecorderProvider<sp_core::Blake2Hasher> =
+				SizeOnlyRecorderProvider::new();
+			let reference_cache: SharedTrieCache<sp_core::Blake2Hasher> =
+				SharedTrieCache::new(CacheSize::new(1024 * 5));
+			let cache_for_test: SharedTrieCache<sp_core::Blake2Hasher> =
+				SharedTrieCache::new(CacheSize::new(1024 * 5));
+			{
+				let local_cache = cache_for_test.local_cache();
+				let mut trie_cache_for_reference = local_cache.as_trie_db_cache(root);
+				let mut reference_trie_recorder = reference_recorder.as_trie_recorder(root);
+				let reference_trie =
+					TrieDBBuilder::<sp_trie::LayoutV1<sp_core::Blake2Hasher>>::new(&db, &root)
+						.with_recorder(&mut reference_trie_recorder)
+						.with_cache(&mut trie_cache_for_reference)
+						.build();
+
+				let local_cache_for_test = reference_cache.local_cache();
+				let mut trie_cache_for_test = local_cache_for_test.as_trie_db_cache(root);
+				let mut trie_recorder_under_test = recorder_for_test.as_trie_recorder(root);
+				let test_trie =
+					TrieDBBuilder::<sp_trie::LayoutV1<sp_core::Blake2Hasher>>::new(&db, &root)
+						.with_recorder(&mut trie_recorder_under_test)
+						.with_cache(&mut trie_cache_for_test)
+						.build();
+
+				// Access random values from the test data
+				for _ in 0..100 {
+					let index: usize = rng.gen_range(0..test_data.len());
+					test_trie.get(&test_data[index].0).unwrap().unwrap();
+					reference_trie.get(&test_data[index].0).unwrap().unwrap();
+				}
+
+				// Check that we have the same nodes recorded for both recorders
+				for (key, _) in test_data.iter() {
+					let reference = reference_trie_recorder.trie_nodes_recorded_for_key(key);
+					let test_value = trie_recorder_under_test.trie_nodes_recorded_for_key(key);
+					assert_eq!(format!("{:?}", reference), format!("{:?}", test_value));
+				}
+			}
+
+			// Check that we have the same size recorded for both recorders
+			assert_eq!(
+				reference_recorder.estimate_encoded_size(),
+				recorder_for_test.estimate_encoded_size()
+			);
+		}
+	}
+
+	#[test]
+	fn recorder_equivalence_no_cache() {
+		let (db, root, test_data) = create_trie();
+
+		let mut rng = rand::thread_rng();
+		for _ in 1..10 {
+			let reference_recorder = Recorder::default();
+			let recorder_for_test: SizeOnlyRecorderProvider<sp_core::Blake2Hasher> =
+				SizeOnlyRecorderProvider::new();
+			{
+				let mut reference_trie_recorder = reference_recorder.as_trie_recorder(root);
+				let reference_trie =
+					TrieDBBuilder::<sp_trie::LayoutV1<sp_core::Blake2Hasher>>::new(&db, &root)
+						.with_recorder(&mut reference_trie_recorder)
+						.build();
+
+				let mut trie_recorder_under_test = recorder_for_test.as_trie_recorder(root);
+				let test_trie =
+					TrieDBBuilder::<sp_trie::LayoutV1<sp_core::Blake2Hasher>>::new(&db, &root)
+						.with_recorder(&mut trie_recorder_under_test)
+						.build();
+
+				for _ in 0..200 {
+					let index: usize = rng.gen_range(0..test_data.len());
+					test_trie.get(&test_data[index].0).unwrap().unwrap();
+					reference_trie.get(&test_data[index].0).unwrap().unwrap();
+				}
+
+				// Check that we have the same nodes recorded for both recorders
+				for (key, _) in test_data.iter() {
+					let reference = reference_trie_recorder.trie_nodes_recorded_for_key(key);
+					let test_value = trie_recorder_under_test.trie_nodes_recorded_for_key(key);
+					assert_eq!(format!("{:?}", reference), format!("{:?}", test_value));
+				}
+			}
+
+			// Check that we have the same size recorded for both recorders
+			assert_eq!(
+				reference_recorder.estimate_encoded_size(),
+				recorder_for_test.estimate_encoded_size()
+			);
+		}
+	}
+}
diff --git a/cumulus/primitives/proof-size-hostfunction/Cargo.toml b/cumulus/primitives/proof-size-hostfunction/Cargo.toml
new file mode 100644
index 00000000000..83dad428d00
--- /dev/null
+++ b/cumulus/primitives/proof-size-hostfunction/Cargo.toml
@@ -0,0 +1,21 @@
+[package]
+name = "cumulus-primitives-proof-size-hostfunction"
+version = "0.1.0"
+authors.workspace = true
+edition.workspace = true
+description = "Hostfunction exposing storage proof size to the runtime."
+license = "Apache-2.0"
+
+[dependencies]
+sp-runtime-interface = { path = "../../../substrate/primitives/runtime-interface", default-features = false }
+sp-externalities = { path = "../../../substrate/primitives/externalities", default-features = false }
+sp-trie = { path = "../../../substrate/primitives/trie", default-features = false }
+
+[dev-dependencies]
+sp-state-machine = { path = "../../../substrate/primitives/state-machine" }
+sp-core = { path = "../../../substrate/primitives/core" }
+sp-io = { path = "../../../substrate/primitives/io" }
+
+[features]
+default = [ "std" ]
+std = [ "sp-externalities/std", "sp-runtime-interface/std", "sp-trie/std" ]
diff --git a/cumulus/primitives/proof-size-hostfunction/src/lib.rs b/cumulus/primitives/proof-size-hostfunction/src/lib.rs
new file mode 100644
index 00000000000..6da6235e585
--- /dev/null
+++ b/cumulus/primitives/proof-size-hostfunction/src/lib.rs
@@ -0,0 +1,107 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Cumulus.
+
+// Cumulus is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Tools for reclaiming PoV weight in parachain runtimes.
+
+#![cfg_attr(not(feature = "std"), no_std)]
+
+use sp_externalities::ExternalitiesExt;
+
+use sp_runtime_interface::runtime_interface;
+
+#[cfg(feature = "std")]
+use sp_trie::proof_size_extension::ProofSizeExt;
+
+pub const PROOF_RECORDING_DISABLED: u64 = u64::MAX;
+
+/// Interface that provides access to the current storage proof size.
+///
+/// Should return the current storage proof size if [`ProofSizeExt`] is registered. Otherwise, needs
+/// to return u64::MAX.
+#[runtime_interface]
+pub trait StorageProofSize {
+	/// Returns the current storage proof size.
+	fn storage_proof_size(&mut self) -> u64 {
+		self.extension::<ProofSizeExt>().map_or(u64::MAX, |e| e.storage_proof_size())
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use sp_core::Blake2Hasher;
+	use sp_state_machine::TestExternalities;
+	use sp_trie::{
+		proof_size_extension::ProofSizeExt, recorder::Recorder, LayoutV1, PrefixedMemoryDB,
+		TrieDBMutBuilder, TrieMut,
+	};
+
+	use crate::{storage_proof_size, PROOF_RECORDING_DISABLED};
+
+	const TEST_DATA: &[(&[u8], &[u8])] = &[(b"key1", &[1; 64]), (b"key2", &[2; 64])];
+
+	type TestLayout = LayoutV1<sp_core::Blake2Hasher>;
+
+	fn get_prepared_test_externalities() -> (TestExternalities<Blake2Hasher>, Recorder<Blake2Hasher>)
+	{
+		let mut db = PrefixedMemoryDB::default();
+		let mut root = Default::default();
+
+		{
+			let mut trie = TrieDBMutBuilder::<TestLayout>::new(&mut db, &mut root).build();
+			for (k, v) in TEST_DATA {
+				trie.insert(k, v).expect("Inserts data");
+			}
+		}
+
+		let recorder: sp_trie::recorder::Recorder<Blake2Hasher> = Default::default();
+		let trie_backend = sp_state_machine::TrieBackendBuilder::new(db, root)
+			.with_recorder(recorder.clone())
+			.build();
+
+		let mut ext: TestExternalities<Blake2Hasher> = TestExternalities::default();
+		ext.backend = trie_backend;
+		(ext, recorder)
+	}
+
+	#[test]
+	fn host_function_returns_size_from_recorder() {
+		let (mut ext, recorder) = get_prepared_test_externalities();
+		ext.register_extension(ProofSizeExt::new(recorder));
+
+		ext.execute_with(|| {
+			assert_eq!(storage_proof_size::storage_proof_size(), 0);
+			sp_io::storage::get(b"key1");
+			assert_eq!(storage_proof_size::storage_proof_size(), 175);
+			sp_io::storage::get(b"key2");
+			assert_eq!(storage_proof_size::storage_proof_size(), 275);
+			sp_io::storage::get(b"key2");
+			assert_eq!(storage_proof_size::storage_proof_size(), 275);
+		});
+	}
+
+	#[test]
+	fn host_function_returns_max_without_extension() {
+		let (mut ext, _) = get_prepared_test_externalities();
+
+		ext.execute_with(|| {
+			assert_eq!(storage_proof_size::storage_proof_size(), PROOF_RECORDING_DISABLED);
+			sp_io::storage::get(b"key1");
+			assert_eq!(storage_proof_size::storage_proof_size(), PROOF_RECORDING_DISABLED);
+			sp_io::storage::get(b"key2");
+			assert_eq!(storage_proof_size::storage_proof_size(), PROOF_RECORDING_DISABLED);
+		});
+	}
+}
diff --git a/cumulus/test/client/Cargo.toml b/cumulus/test/client/Cargo.toml
index b760b796ec9..6ab873d320c 100644
--- a/cumulus/test/client/Cargo.toml
+++ b/cumulus/test/client/Cargo.toml
@@ -36,6 +36,7 @@ cumulus-test-runtime = { path = "../runtime" }
 cumulus-test-service = { path = "../service" }
 cumulus-test-relay-sproof-builder = { path = "../relay-sproof-builder" }
 cumulus-primitives-core = { path = "../../primitives/core" }
+cumulus-primitives-proof-size-hostfunction = { path = "../../primitives/proof-size-hostfunction" }
 cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent" }
 
 [features]
diff --git a/cumulus/test/client/src/lib.rs b/cumulus/test/client/src/lib.rs
index a3c79158f49..df63f683de6 100644
--- a/cumulus/test/client/src/lib.rs
+++ b/cumulus/test/client/src/lib.rs
@@ -44,7 +44,8 @@ mod local_executor {
 	pub struct LocalExecutor;
 
 	impl sc_executor::NativeExecutionDispatch for LocalExecutor {
-		type ExtendHostFunctions = ();
+		type ExtendHostFunctions =
+			cumulus_primitives_proof_size_hostfunction::storage_proof_size::HostFunctions;
 
 		fn dispatch(method: &str, data: &[u8]) -> Option<Vec<u8>> {
 			cumulus_test_runtime::api::dispatch(method, data)
diff --git a/cumulus/test/service/benches/block_import.rs b/cumulus/test/service/benches/block_import.rs
index 254e03b9263..9d6485d74c5 100644
--- a/cumulus/test/service/benches/block_import.rs
+++ b/cumulus/test/service/benches/block_import.rs
@@ -24,7 +24,7 @@ use core::time::Duration;
 use cumulus_primitives_core::ParaId;
 
 use sp_api::{Core, ProvideRuntimeApi};
-use sp_keyring::Sr25519Keyring::Alice;
+use sp_keyring::Sr25519Keyring::{Alice, Bob};
 
 use cumulus_test_service::bench_utils as utils;
 
@@ -32,51 +32,69 @@ fn benchmark_block_import(c: &mut Criterion) {
 	sp_tracing::try_init_simple();
 
 	let runtime = tokio::runtime::Runtime::new().expect("creating tokio runtime doesn't fail; qed");
-	let para_id = ParaId::from(100);
+
+	let para_id = ParaId::from(cumulus_test_runtime::PARACHAIN_ID);
 	let tokio_handle = runtime.handle();
 
 	// Create enough accounts to fill the block with transactions.
 	// Each account should only be included in one transfer.
 	let (src_accounts, dst_accounts, account_ids) = utils::create_benchmark_accounts();
 
-	let alice = runtime.block_on(
-		cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Alice)
+	for bench_parameters in &[(true, Alice), (false, Bob)] {
+		let node = runtime.block_on(
+			cumulus_test_service::TestNodeBuilder::new(
+				para_id,
+				tokio_handle.clone(),
+				bench_parameters.1,
+			)
 			// Preload all accounts with funds for the transfers
-			.endowed_accounts(account_ids)
+			.endowed_accounts(account_ids.clone())
+			.import_proof_recording(bench_parameters.0)
 			.build(),
-	);
-
-	let client = alice.client;
-
-	let (max_transfer_count, extrinsics) =
-		utils::create_benchmarking_transfer_extrinsics(&client, &src_accounts, &dst_accounts);
-
-	let parent_hash = client.usage_info().chain.best_hash;
-	let mut block_builder = BlockBuilderBuilder::new(&*client)
-		.on_parent_block(parent_hash)
-		.fetch_parent_block_number(&*client)
-		.unwrap()
-		.build()
-		.unwrap();
-	for extrinsic in extrinsics {
-		block_builder.push(extrinsic).unwrap();
-	}
-	let benchmark_block = block_builder.build().unwrap();
-
-	let mut group = c.benchmark_group("Block import");
-	group.sample_size(20);
-	group.measurement_time(Duration::from_secs(120));
-	group.throughput(Throughput::Elements(max_transfer_count as u64));
-
-	group.bench_function(format!("(transfers = {}) block import", max_transfer_count), |b| {
-		b.iter_batched(
-			|| benchmark_block.block.clone(),
-			|block| {
-				client.runtime_api().execute_block(parent_hash, block).unwrap();
+		);
+
+		let client = node.client;
+		let backend = node.backend;
+
+		let (max_transfer_count, extrinsics) =
+			utils::create_benchmarking_transfer_extrinsics(&client, &src_accounts, &dst_accounts);
+
+		let parent_hash = client.usage_info().chain.best_hash;
+		let mut block_builder = BlockBuilderBuilder::new(&*client)
+			.on_parent_block(parent_hash)
+			.fetch_parent_block_number(&*client)
+			.unwrap()
+			.build()
+			.unwrap();
+		for extrinsic in extrinsics {
+			block_builder.push(extrinsic).unwrap();
+		}
+		let benchmark_block = block_builder.build().unwrap();
+
+		let mut group = c.benchmark_group("Block import");
+		group.sample_size(20);
+		group.measurement_time(Duration::from_secs(120));
+		group.throughput(Throughput::Elements(max_transfer_count as u64));
+
+		group.bench_function(
+			format!(
+				"(transfers = {max_transfer_count}, proof_recording = {}) block import",
+				bench_parameters.0
+			),
+			|b| {
+				b.iter_batched(
+					|| {
+						backend.reset_trie_cache();
+						benchmark_block.block.clone()
+					},
+					|block| {
+						client.runtime_api().execute_block(parent_hash, block).unwrap();
+					},
+					BatchSize::SmallInput,
+				)
 			},
-			BatchSize::SmallInput,
-		)
-	});
+		);
+	}
 }
 
 criterion_group!(benches, benchmark_block_import);
diff --git a/cumulus/test/service/benches/block_import_glutton.rs b/cumulus/test/service/benches/block_import_glutton.rs
index aeaf0722e72..6295fd68286 100644
--- a/cumulus/test/service/benches/block_import_glutton.rs
+++ b/cumulus/test/service/benches/block_import_glutton.rs
@@ -27,7 +27,7 @@ use core::time::Duration;
 use cumulus_primitives_core::ParaId;
 
 use sc_block_builder::BlockBuilderBuilder;
-use sp_keyring::Sr25519Keyring::Alice;
+use sp_keyring::Sr25519Keyring::{Alice, Bob, Charlie, Ferdie};
 
 use cumulus_test_service::bench_utils as utils;
 
@@ -38,17 +38,29 @@ fn benchmark_block_import(c: &mut Criterion) {
 	let para_id = ParaId::from(100);
 	let tokio_handle = runtime.handle();
 
-	let alice = runtime.block_on(
-		cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Alice).build(),
-	);
-	let client = alice.client;
+	let mut initialize_glutton_pallet = true;
+	for (compute_ratio, storage_ratio, proof_on_import, keyring_identity) in &[
+		(One::one(), Zero::zero(), true, Alice),
+		(One::one(), One::one(), true, Bob),
+		(One::one(), Zero::zero(), false, Charlie),
+		(One::one(), One::one(), false, Ferdie),
+	] {
+		let node = runtime.block_on(
+			cumulus_test_service::TestNodeBuilder::new(
+				para_id,
+				tokio_handle.clone(),
+				*keyring_identity,
+			)
+			.import_proof_recording(*proof_on_import)
+			.build(),
+		);
+		let client = node.client;
+		let backend = node.backend;
 
-	let mut group = c.benchmark_group("Block import");
-	group.sample_size(20);
-	group.measurement_time(Duration::from_secs(120));
+		let mut group = c.benchmark_group("Block import");
+		group.sample_size(20);
+		group.measurement_time(Duration::from_secs(120));
 
-	let mut initialize_glutton_pallet = true;
-	for (compute_ratio, storage_ratio) in &[(One::one(), Zero::zero()), (One::one(), One::one())] {
 		let block = utils::set_glutton_parameters(
 			&client,
 			initialize_glutton_pallet,
@@ -82,7 +94,10 @@ fn benchmark_block_import(c: &mut Criterion) {
 			),
 			|b| {
 				b.iter_batched(
-					|| benchmark_block.block.clone(),
+					|| {
+						backend.reset_trie_cache();
+						benchmark_block.block.clone()
+					},
 					|block| {
 						client.runtime_api().execute_block(parent_hash, block).unwrap();
 					},
diff --git a/cumulus/test/service/benches/validate_block.rs b/cumulus/test/service/benches/validate_block.rs
index 11a7c4376d4..a614863803e 100644
--- a/cumulus/test/service/benches/validate_block.rs
+++ b/cumulus/test/service/benches/validate_block.rs
@@ -18,7 +18,9 @@
 use codec::{Decode, Encode};
 use core::time::Duration;
 use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
-use cumulus_primitives_core::{relay_chain::AccountId, PersistedValidationData, ValidationParams};
+use cumulus_primitives_core::{
+	relay_chain::AccountId, ParaId, PersistedValidationData, ValidationParams,
+};
 use cumulus_test_client::{
 	generate_extrinsic_with_pair, BuildParachainBlockData, InitBlockBuilder, TestClientBuilder,
 	ValidationResult,
@@ -83,6 +85,7 @@ fn benchmark_block_validation(c: &mut Criterion) {
 	// Each account should only be included in one transfer.
 	let (src_accounts, dst_accounts, account_ids) = utils::create_benchmark_accounts();
 
+	let para_id = ParaId::from(cumulus_test_runtime::PARACHAIN_ID);
 	let mut test_client_builder = TestClientBuilder::with_default_backend();
 	let genesis_init = test_client_builder.genesis_init_mut();
 	*genesis_init = cumulus_test_client::GenesisParameters { endowed_accounts: account_ids };
@@ -98,7 +101,14 @@ fn benchmark_block_validation(c: &mut Criterion) {
 		..Default::default()
 	};
 
-	let mut block_builder = client.init_block_builder(Some(validation_data), Default::default());
+	let sproof_builder = RelayStateSproofBuilder {
+		included_para_head: Some(parent_header.clone().encode().into()),
+		para_id,
+		..Default::default()
+	};
+
+	let mut block_builder =
+		client.init_block_builder(Some(validation_data), sproof_builder.clone());
 	for extrinsic in extrinsics {
 		block_builder.push(extrinsic).unwrap();
 	}
@@ -108,7 +118,6 @@ fn benchmark_block_validation(c: &mut Criterion) {
 	let proof_size_in_kb = parachain_block.storage_proof().encode().len() as f64 / 1024f64;
 	let runtime = utils::get_wasm_module();
 
-	let sproof_builder: RelayStateSproofBuilder = Default::default();
 	let (relay_parent_storage_root, _) = sproof_builder.into_state_root_and_proof();
 	let encoded_params = ValidationParams {
 		block_data: cumulus_test_client::BlockData(parachain_block.encode()),
diff --git a/cumulus/test/service/src/bench_utils.rs b/cumulus/test/service/src/bench_utils.rs
index 82142f21695..1894835caec 100644
--- a/cumulus/test/service/src/bench_utils.rs
+++ b/cumulus/test/service/src/bench_utils.rs
@@ -81,8 +81,13 @@ pub fn extrinsic_set_time(client: &TestClient) -> OpaqueExtrinsic {
 pub fn extrinsic_set_validation_data(
 	parent_header: cumulus_test_runtime::Header,
 ) -> OpaqueExtrinsic {
-	let sproof_builder = RelayStateSproofBuilder { para_id: 100.into(), ..Default::default() };
 	let parent_head = HeadData(parent_header.encode());
+	let sproof_builder = RelayStateSproofBuilder {
+		para_id: cumulus_test_runtime::PARACHAIN_ID.into(),
+		included_para_head: parent_head.clone().into(),
+		..Default::default()
+	};
+
 	let (relay_parent_storage_root, relay_chain_state) = sproof_builder.into_state_root_and_proof();
 	let data = ParachainInherentData {
 		validation_data: PersistedValidationData {
diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs
index fb858ce0b71..627d060d8a0 100644
--- a/cumulus/test/service/src/lib.rs
+++ b/cumulus/test/service/src/lib.rs
@@ -187,6 +187,7 @@ impl RecoveryHandle for FailingRecoveryHandle {
 /// be able to perform chain operations.
 pub fn new_partial(
 	config: &mut Configuration,
+	enable_import_proof_record: bool,
 ) -> Result<
 	PartialComponents<
 		Client,
@@ -214,7 +215,12 @@ pub fn new_partial(
 		sc_executor::NativeElseWasmExecutor::<RuntimeExecutor>::new_with_wasm_executor(wasm);
 
 	let (client, backend, keystore_container, task_manager) =
-		sc_service::new_full_parts::<Block, RuntimeApi, _>(config, None, executor)?;
+		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
+			config,
+			None,
+			executor,
+			enable_import_proof_record,
+		)?;
 	let client = Arc::new(client);
 
 	let block_import =
@@ -309,19 +315,21 @@ pub async fn start_node_impl<RB>(
 	rpc_ext_builder: RB,
 	consensus: Consensus,
 	collator_options: CollatorOptions,
+	proof_recording_during_import: bool,
 ) -> sc_service::error::Result<(
 	TaskManager,
 	Arc<Client>,
 	Arc<NetworkService<Block, H256>>,
 	RpcHandlers,
 	TransactionPool,
+	Arc<Backend>,
 )>
 where
 	RB: Fn(Arc<Client>) -> Result<jsonrpsee::RpcModule<()>, sc_service::Error> + Send + 'static,
 {
 	let mut parachain_config = prepare_node_config(parachain_config);
 
-	let params = new_partial(&mut parachain_config)?;
+	let params = new_partial(&mut parachain_config, proof_recording_during_import)?;
 
 	let transaction_pool = params.transaction_pool.clone();
 	let mut task_manager = params.task_manager;
@@ -477,7 +485,7 @@ where
 
 	start_network.start_network();
 
-	Ok((task_manager, client, network, rpc_handlers, transaction_pool))
+	Ok((task_manager, client, network, rpc_handlers, transaction_pool, backend))
 }
 
 /// A Cumulus test node instance used for testing.
@@ -495,6 +503,8 @@ pub struct TestNode {
 	pub rpc_handlers: RpcHandlers,
 	/// Node's transaction pool
 	pub transaction_pool: TransactionPool,
+	/// Node's backend
+	pub backend: Arc<Backend>,
 }
 
 #[allow(missing_docs)]
@@ -520,6 +530,7 @@ pub struct TestNodeBuilder {
 	consensus: Consensus,
 	relay_chain_mode: RelayChainMode,
 	endowed_accounts: Vec<AccountId>,
+	record_proof_during_import: bool,
 }
 
 impl TestNodeBuilder {
@@ -544,6 +555,7 @@ impl TestNodeBuilder {
 			consensus: Consensus::RelayChain,
 			endowed_accounts: Default::default(),
 			relay_chain_mode: RelayChainMode::Embedded,
+			record_proof_during_import: true,
 		}
 	}
 
@@ -656,6 +668,12 @@ impl TestNodeBuilder {
 		self
 	}
 
+	/// Record proofs during import.
+	pub fn import_proof_recording(mut self, should_record_proof: bool) -> TestNodeBuilder {
+		self.record_proof_during_import = should_record_proof;
+		self
+	}
+
 	/// Build the [`TestNode`].
 	pub async fn build(self) -> TestNode {
 		let parachain_config = node_config(
@@ -684,24 +702,26 @@ impl TestNodeBuilder {
 			format!("{} (relay chain)", relay_chain_config.network.node_name);
 
 		let multiaddr = parachain_config.network.listen_addresses[0].clone();
-		let (task_manager, client, network, rpc_handlers, transaction_pool) = start_node_impl(
-			parachain_config,
-			self.collator_key,
-			relay_chain_config,
-			self.para_id,
-			self.wrap_announce_block,
-			false,
-			|_| Ok(jsonrpsee::RpcModule::new(())),
-			self.consensus,
-			collator_options,
-		)
-		.await
-		.expect("could not create Cumulus test service");
+		let (task_manager, client, network, rpc_handlers, transaction_pool, backend) =
+			start_node_impl(
+				parachain_config,
+				self.collator_key,
+				relay_chain_config,
+				self.para_id,
+				self.wrap_announce_block,
+				false,
+				|_| Ok(jsonrpsee::RpcModule::new(())),
+				self.consensus,
+				collator_options,
+				self.record_proof_during_import,
+			)
+			.await
+			.expect("could not create Cumulus test service");
 
 		let peer_id = network.local_peer_id();
 		let addr = MultiaddrWithPeerId { multiaddr, peer_id };
 
-		TestNode { task_manager, client, network, addr, rpc_handlers, transaction_pool }
+		TestNode { task_manager, client, network, addr, rpc_handlers, transaction_pool, backend }
 	}
 }
 
diff --git a/cumulus/test/service/src/main.rs b/cumulus/test/service/src/main.rs
index 16b68796bd3..55a0f12d671 100644
--- a/cumulus/test/service/src/main.rs
+++ b/cumulus/test/service/src/main.rs
@@ -128,7 +128,7 @@ fn main() -> Result<(), sc_cli::Error> {
 				})
 				.unwrap_or(cumulus_test_service::Consensus::RelayChain);
 
-			let (mut task_manager, _, _, _, _) = tokio_runtime
+			let (mut task_manager, _, _, _, _, _) = tokio_runtime
 				.block_on(cumulus_test_service::start_node_impl(
 					config,
 					collator_key,
@@ -139,6 +139,7 @@ fn main() -> Result<(), sc_cli::Error> {
 					|_| Ok(jsonrpsee::RpcModule::new(())),
 					consensus,
 					collator_options,
+					true,
 				))
 				.expect("could not create Cumulus test service");
 
diff --git a/substrate/client/api/src/execution_extensions.rs b/substrate/client/api/src/execution_extensions.rs
index 6f927105df0..26d3ae73f69 100644
--- a/substrate/client/api/src/execution_extensions.rs
+++ b/substrate/client/api/src/execution_extensions.rs
@@ -91,7 +91,6 @@ impl<Block: BlockT, Ext: Default + Extension> ExtensionsFactory<Block>
 ///
 /// This crate aggregates extensions available for the offchain calls
 /// and is responsible for producing a correct `Extensions` object.
-/// for each call, based on required `Capabilities`.
 pub struct ExecutionExtensions<Block: BlockT> {
 	extensions_factory: RwLock<Box<dyn ExtensionsFactory<Block>>>,
 	read_runtime_version: Arc<dyn ReadRuntimeVersion>,
@@ -116,8 +115,7 @@ impl<Block: BlockT> ExecutionExtensions<Block> {
 		*self.extensions_factory.write() = Box::new(maker);
 	}
 
-	/// Based on the execution context and capabilities it produces
-	/// the extensions object to support desired set of APIs.
+	/// Produces default extensions based on the input parameters.
 	pub fn extensions(
 		&self,
 		block_hash: Block::Hash,
@@ -127,7 +125,6 @@ impl<Block: BlockT> ExecutionExtensions<Block> {
 			self.extensions_factory.read().extensions_for(block_hash, block_number);
 
 		extensions.register(ReadRuntimeVersionExt::new(self.read_runtime_version.clone()));
-
 		extensions
 	}
 }
diff --git a/substrate/client/block-builder/Cargo.toml b/substrate/client/block-builder/Cargo.toml
index 2492c4101b2..852ee84f89b 100644
--- a/substrate/client/block-builder/Cargo.toml
+++ b/substrate/client/block-builder/Cargo.toml
@@ -20,6 +20,7 @@ sp-api = { path = "../../primitives/api" }
 sp-block-builder = { path = "../../primitives/block-builder" }
 sp-blockchain = { path = "../../primitives/blockchain" }
 sp-core = { path = "../../primitives/core" }
+sp-trie = { path = "../../primitives/trie" }
 sp-inherents = { path = "../../primitives/inherents" }
 sp-runtime = { path = "../../primitives/runtime" }
 
diff --git a/substrate/client/block-builder/src/lib.rs b/substrate/client/block-builder/src/lib.rs
index f62b941fdb1..258e39d962b 100644
--- a/substrate/client/block-builder/src/lib.rs
+++ b/substrate/client/block-builder/src/lib.rs
@@ -42,6 +42,7 @@ use sp_runtime::{
 use std::marker::PhantomData;
 
 pub use sp_block_builder::BlockBuilder as BlockBuilderApi;
+use sp_trie::proof_size_extension::ProofSizeExt;
 
 /// A builder for creating an instance of [`BlockBuilder`].
 pub struct BlockBuilderBuilder<'a, B, C> {
@@ -235,6 +236,10 @@ where
 
 		if record_proof {
 			api.record_proof();
+			let recorder = api
+				.proof_recorder()
+				.expect("Proof recording is enabled in the line above; qed.");
+			api.register_extension(ProofSizeExt::new(recorder));
 		}
 
 		api.set_call_context(CallContext::Onchain);
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index d078f44f198..1a3a679c519 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -130,10 +130,11 @@ where
 }
 
 /// Create the initial parts of a full node with the default genesis block builder.
-pub fn new_full_parts<TBl, TRtApi, TExec>(
+pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
 	config: &Configuration,
 	telemetry: Option<TelemetryHandle>,
 	executor: TExec,
+	enable_import_proof_recording: bool,
 ) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
 where
 	TBl: BlockT,
@@ -148,7 +149,26 @@ where
 		executor.clone(),
 	)?;
 
-	new_full_parts_with_genesis_builder(config, telemetry, executor, backend, genesis_block_builder)
+	new_full_parts_with_genesis_builder(
+		config,
+		telemetry,
+		executor,
+		backend,
+		genesis_block_builder,
+		enable_import_proof_recording,
+	)
+}
+/// Create the initial parts of a full node with the default genesis block builder.
+pub fn new_full_parts<TBl, TRtApi, TExec>(
+	config: &Configuration,
+	telemetry: Option<TelemetryHandle>,
+	executor: TExec,
+) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
+where
+	TBl: BlockT,
+	TExec: CodeExecutor + RuntimeVersionOf + Clone,
+{
+	new_full_parts_record_import(config, telemetry, executor, false)
 }
 
 /// Create the initial parts of a full node.
@@ -158,6 +178,7 @@ pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBloc
 	executor: TExec,
 	backend: Arc<TFullBackend<TBl>>,
 	genesis_block_builder: TBuildGenesisBlock,
+	enable_import_proof_recording: bool,
 ) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
 where
 	TBl: BlockT,
@@ -225,6 +246,7 @@ where
 					SyncMode::LightState { .. } | SyncMode::Warp { .. }
 				),
 				wasm_runtime_substitutes,
+				enable_import_proof_recording,
 			},
 		)?;
 
diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs
index 9d51aae55b2..aa9c1b80a29 100644
--- a/substrate/client/service/src/client/client.rs
+++ b/substrate/client/service/src/client/client.rs
@@ -77,7 +77,7 @@ use sp_state_machine::{
 	ChildStorageCollection, KeyValueStates, KeyValueStorageLevel, StorageCollection,
 	MAX_NESTED_TRIE_DEPTH,
 };
-use sp_trie::{CompactProof, MerkleValue, StorageProof};
+use sp_trie::{proof_size_extension::ProofSizeExt, CompactProof, MerkleValue, StorageProof};
 use std::{
 	collections::{HashMap, HashSet},
 	marker::PhantomData,
@@ -184,7 +184,7 @@ where
 	)
 }
 
-/// Relevant client configuration items relevant for the client.
+/// Client configuration items.
 #[derive(Debug, Clone)]
 pub struct ClientConfig<Block: BlockT> {
 	/// Enable the offchain worker db.
@@ -198,6 +198,8 @@ pub struct ClientConfig<Block: BlockT> {
 	/// Map of WASM runtime substitute starting at the child of the given block until the runtime
 	/// version doesn't match anymore.
 	pub wasm_runtime_substitutes: HashMap<NumberFor<Block>, Vec<u8>>,
+	/// Enable recording of storage proofs during block import
+	pub enable_import_proof_recording: bool,
 }
 
 impl<Block: BlockT> Default for ClientConfig<Block> {
@@ -208,6 +210,7 @@ impl<Block: BlockT> Default for ClientConfig<Block> {
 			wasm_runtime_overrides: None,
 			no_genesis: false,
 			wasm_runtime_substitutes: HashMap::new(),
+			enable_import_proof_recording: false,
 		}
 	}
 }
@@ -858,6 +861,14 @@ where
 
 				runtime_api.set_call_context(CallContext::Onchain);
 
+				if self.config.enable_import_proof_recording {
+					runtime_api.record_proof();
+					let recorder = runtime_api
+						.proof_recorder()
+						.expect("Proof recording is enabled in the line above; qed.");
+					runtime_api.register_extension(ProofSizeExt::new(recorder));
+				}
+
 				runtime_api.execute_block(
 					*parent_hash,
 					Block::new(import_block.header.clone(), body.clone()),
diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs
index ff9eb982b86..0c7e138ce90 100644
--- a/substrate/client/service/src/lib.rs
+++ b/substrate/client/service/src/lib.rs
@@ -53,9 +53,10 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
 pub use self::{
 	builder::{
 		build_network, new_client, new_db_backend, new_full_client, new_full_parts,
-		new_full_parts_with_genesis_builder, new_native_or_wasm_executor, new_wasm_executor,
-		spawn_tasks, BuildNetworkParams, KeystoreContainer, NetworkStarter, SpawnTasksParams,
-		TFullBackend, TFullCallExecutor, TFullClient,
+		new_full_parts_record_import, new_full_parts_with_genesis_builder,
+		new_native_or_wasm_executor, new_wasm_executor, spawn_tasks, BuildNetworkParams,
+		KeystoreContainer, NetworkStarter, SpawnTasksParams, TFullBackend, TFullCallExecutor,
+		TFullClient,
 	},
 	client::{ClientConfig, LocalCallExecutor},
 	error::Error,
diff --git a/substrate/frame/support/procedural/src/construct_runtime/mod.rs b/substrate/frame/support/procedural/src/construct_runtime/mod.rs
index d1d6040d33a..010143574ed 100644
--- a/substrate/frame/support/procedural/src/construct_runtime/mod.rs
+++ b/substrate/frame/support/procedural/src/construct_runtime/mod.rs
@@ -253,7 +253,7 @@ pub fn construct_runtime(input: TokenStream) -> TokenStream {
 	let res = res.unwrap_or_else(|e| e.to_compile_error());
 
 	let res = expander::Expander::new("construct_runtime")
-		.dry(std::env::var("FRAME_EXPAND").is_err())
+		.dry(std::env::var("EXPAND_MACROS").is_err())
 		.verbose(true)
 		.write_to_out_dir(res)
 		.expect("Does not fail because of IO in OUT_DIR; qed");
diff --git a/substrate/primitives/api/proc-macro/src/decl_runtime_apis.rs b/substrate/primitives/api/proc-macro/src/decl_runtime_apis.rs
index 370735819f9..2b1e65ec885 100644
--- a/substrate/primitives/api/proc-macro/src/decl_runtime_apis.rs
+++ b/substrate/primitives/api/proc-macro/src/decl_runtime_apis.rs
@@ -729,7 +729,7 @@ fn decl_runtime_apis_impl_inner(api_decls: &[ItemTrait]) -> Result<TokenStream>
 	};
 
 	let decl = expander::Expander::new("decl_runtime_apis")
-		.dry(std::env::var("SP_API_EXPAND").is_err())
+		.dry(std::env::var("EXPAND_MACROS").is_err())
 		.verbose(true)
 		.write_to_out_dir(decl)
 		.expect("Does not fail because of IO in OUT_DIR; qed");
diff --git a/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs b/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs
index e97291bc58a..fd81fdb624c 100644
--- a/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs
+++ b/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs
@@ -846,7 +846,7 @@ fn impl_runtime_apis_impl_inner(api_impls: &[ItemImpl]) -> Result<TokenStream> {
 	);
 
 	let impl_ = expander::Expander::new("impl_runtime_apis")
-		.dry(std::env::var("SP_API_EXPAND").is_err())
+		.dry(std::env::var("EXPAND_MACROS").is_err())
 		.verbose(true)
 		.write_to_out_dir(impl_)
 		.expect("Does not fail because of IO in OUT_DIR; qed");
diff --git a/substrate/primitives/runtime-interface/proc-macro/Cargo.toml b/substrate/primitives/runtime-interface/proc-macro/Cargo.toml
index fbc49785ae9..11236de91ce 100644
--- a/substrate/primitives/runtime-interface/proc-macro/Cargo.toml
+++ b/substrate/primitives/runtime-interface/proc-macro/Cargo.toml
@@ -20,4 +20,5 @@ Inflector = "0.11.4"
 proc-macro-crate = "1.1.3"
 proc-macro2 = "1.0.56"
 quote = "1.0.28"
+expander = "2.0.0"
 syn = { version = "2.0.38", features = ["full", "visit", "fold", "extra-traits"] }
diff --git a/substrate/primitives/runtime-interface/proc-macro/src/runtime_interface/mod.rs b/substrate/primitives/runtime-interface/proc-macro/src/runtime_interface/mod.rs
index 008d69b3210..d0cc9e7b96b 100644
--- a/substrate/primitives/runtime-interface/proc-macro/src/runtime_interface/mod.rs
+++ b/substrate/primitives/runtime-interface/proc-macro/src/runtime_interface/mod.rs
@@ -68,5 +68,11 @@ pub fn runtime_interface_impl(
 		}
 	};
 
+	let res = expander::Expander::new("runtime_interface")
+		.dry(std::env::var("EXPAND_MACROS").is_err())
+		.verbose(true)
+		.write_to_out_dir(res)
+		.expect("Does not fail because of IO in OUT_DIR; qed");
+
 	Ok(res)
 }
diff --git a/substrate/primitives/state-machine/src/trie_backend.rs b/substrate/primitives/state-machine/src/trie_backend.rs
index 7b337b5fd54..7496463e642 100644
--- a/substrate/primitives/state-machine/src/trie_backend.rs
+++ b/substrate/primitives/state-machine/src/trie_backend.rs
@@ -33,12 +33,12 @@ use sp_core::storage::{ChildInfo, StateVersion};
 #[cfg(feature = "std")]
 use sp_trie::{
 	cache::{LocalTrieCache, TrieCache},
-	recorder::Recorder,
-	MemoryDB, StorageProof,
+	MemoryDB,
 };
 #[cfg(not(feature = "std"))]
 use sp_trie::{Error, NodeCodec};
-use sp_trie::{MerkleValue, PrefixedMemoryDB};
+use sp_trie::{MerkleValue, PrefixedMemoryDB, StorageProof, TrieRecorderProvider};
+
 use trie_db::TrieCache as TrieCacheT;
 #[cfg(not(feature = "std"))]
 use trie_db::{node::NodeOwned, CachedValue};
@@ -112,8 +112,6 @@ pub struct UnimplementedCacheProvider<H> {
 	// Not strictly necessary, but the H bound allows to use this as a drop-in
 	// replacement for the `LocalTrieCache` in no-std contexts.
 	_phantom: core::marker::PhantomData<H>,
-	// Statically prevents construction.
-	_infallible: core::convert::Infallible,
 }
 
 #[cfg(not(feature = "std"))]
@@ -156,52 +154,83 @@ impl<H: Hasher> TrieCacheProvider<H> for UnimplementedCacheProvider<H> {
 	}
 }
 
+/// Recorder provider that allows construction of a [`TrieBackend`] and satisfies the requirements,
+/// but can never be instantiated.
+#[cfg(not(feature = "std"))]
+pub struct UnimplementedRecorderProvider<H> {
+	// Not strictly necessary, but the H bound allows to use this as a drop-in
+	// replacement for the [`sp_trie::recorder::Recorder`] in no-std contexts.
+	_phantom: core::marker::PhantomData<H>,
+}
+
+#[cfg(not(feature = "std"))]
+impl<H: Hasher> trie_db::TrieRecorder<H::Out> for UnimplementedRecorderProvider<H> {
+	fn record<'a>(&mut self, _access: trie_db::TrieAccess<'a, H::Out>) {
+		unimplemented!()
+	}
+
+	fn trie_nodes_recorded_for_key(&self, _key: &[u8]) -> trie_db::RecordedForKey {
+		unimplemented!()
+	}
+}
+
+#[cfg(not(feature = "std"))]
+impl<H: Hasher> TrieRecorderProvider<H> for UnimplementedRecorderProvider<H> {
+	type Recorder<'a> = UnimplementedRecorderProvider<H> where H: 'a;
+
+	fn drain_storage_proof(self) -> Option<StorageProof> {
+		unimplemented!()
+	}
+
+	fn as_trie_recorder(&self, _storage_root: H::Out) -> Self::Recorder<'_> {
+		unimplemented!()
+	}
+}
+
 #[cfg(feature = "std")]
 type DefaultCache<H> = LocalTrieCache<H>;
 
 #[cfg(not(feature = "std"))]
 type DefaultCache<H> = UnimplementedCacheProvider<H>;
 
+#[cfg(feature = "std")]
+type DefaultRecorder<H> = sp_trie::recorder::Recorder<H>;
+
+#[cfg(not(feature = "std"))]
+type DefaultRecorder<H> = UnimplementedRecorderProvider<H>;
+
 /// Builder for creating a [`TrieBackend`].
-pub struct TrieBackendBuilder<S: TrieBackendStorage<H>, H: Hasher, C = DefaultCache<H>> {
+pub struct TrieBackendBuilder<
+	S: TrieBackendStorage<H>,
+	H: Hasher,
+	C = DefaultCache<H>,
+	R = DefaultRecorder<H>,
+> {
 	storage: S,
 	root: H::Out,
-	#[cfg(feature = "std")]
-	recorder: Option<Recorder<H>>,
+	recorder: Option<R>,
 	cache: Option<C>,
 }
 
-impl<S, H> TrieBackendBuilder<S, H, DefaultCache<H>>
+impl<S, H> TrieBackendBuilder<S, H>
 where
 	S: TrieBackendStorage<H>,
 	H: Hasher,
 {
 	/// Create a new builder instance.
 	pub fn new(storage: S, root: H::Out) -> Self {
-		Self {
-			storage,
-			root,
-			#[cfg(feature = "std")]
-			recorder: None,
-			cache: None,
-		}
+		Self { storage, root, recorder: None, cache: None }
 	}
 }
 
-impl<S, H, C> TrieBackendBuilder<S, H, C>
+impl<S, H, C, R> TrieBackendBuilder<S, H, C, R>
 where
 	S: TrieBackendStorage<H>,
 	H: Hasher,
 {
 	/// Create a new builder instance.
 	pub fn new_with_cache(storage: S, root: H::Out, cache: C) -> Self {
-		Self {
-			storage,
-			root,
-			#[cfg(feature = "std")]
-			recorder: None,
-			cache: Some(cache),
-		}
+		Self { storage, root, recorder: None, cache: Some(cache) }
 	}
 	/// Wrap the given [`TrieBackend`].
 	///
@@ -210,53 +239,47 @@ where
 	/// backend.
 	///
 	/// The backend storage and the cache will be taken from `other`.
-	pub fn wrap(other: &TrieBackend<S, H, C>) -> TrieBackendBuilder<&S, H, &C> {
+	pub fn wrap(other: &TrieBackend<S, H, C, R>) -> TrieBackendBuilder<&S, H, &C, R> {
 		TrieBackendBuilder {
 			storage: other.essence.backend_storage(),
 			root: *other.essence.root(),
-			#[cfg(feature = "std")]
 			recorder: None,
 			cache: other.essence.trie_node_cache.as_ref(),
 		}
 	}
 
 	/// Use the given optional `recorder` for the to be configured [`TrieBackend`].
-	#[cfg(feature = "std")]
-	pub fn with_optional_recorder(self, recorder: Option<Recorder<H>>) -> Self {
+	pub fn with_optional_recorder(self, recorder: Option<R>) -> Self {
 		Self { recorder, ..self }
 	}
 
 	/// Use the given `recorder` for the to be configured [`TrieBackend`].
-	#[cfg(feature = "std")]
-	pub fn with_recorder(self, recorder: Recorder<H>) -> Self {
+	pub fn with_recorder(self, recorder: R) -> Self {
 		Self { recorder: Some(recorder), ..self }
 	}
 
 	/// Use the given optional `cache` for the to be configured [`TrieBackend`].
-	pub fn with_optional_cache<LC>(self, cache: Option<LC>) -> TrieBackendBuilder<S, H, LC> {
+	pub fn with_optional_cache<LC>(self, cache: Option<LC>) -> TrieBackendBuilder<S, H, LC, R> {
 		TrieBackendBuilder {
 			cache,
 			root: self.root,
 			storage: self.storage,
-			#[cfg(feature = "std")]
 			recorder: self.recorder,
 		}
 	}
 
 	/// Use the given `cache` for the to be configured [`TrieBackend`].
-	pub fn with_cache<LC>(self, cache: LC) -> TrieBackendBuilder<S, H, LC> {
+	pub fn with_cache<LC>(self, cache: LC) -> TrieBackendBuilder<S, H, LC, R> {
 		TrieBackendBuilder {
 			cache: Some(cache),
 			root: self.root,
 			storage: self.storage,
-			#[cfg(feature = "std")]
 			recorder: self.recorder,
 		}
 	}
 
 	/// Build the configured [`TrieBackend`].
-	#[cfg(feature = "std")]
-	pub fn build(self) -> TrieBackend<S, H, C> {
+	pub fn build(self) -> TrieBackend<S, H, C, R> {
 		TrieBackend {
 			essence: TrieBackendEssence::new_with_cache_and_recorder(
 				self.storage,
@@ -267,27 +290,18 @@ where
 			next_storage_key_cache: Default::default(),
 		}
 	}
-
-	/// Build the configured [`TrieBackend`].
-	#[cfg(not(feature = "std"))]
-	pub fn build(self) -> TrieBackend<S, H, C> {
-		TrieBackend {
-			essence: TrieBackendEssence::new_with_cache(self.storage, self.root, self.cache),
-			next_storage_key_cache: Default::default(),
-		}
-	}
 }
 
 /// A cached iterator.
-struct CachedIter<S, H, C>
+struct CachedIter<S, H, C, R>
 where
 	H: Hasher,
 {
 	last_key: sp_std::vec::Vec<u8>,
-	iter: RawIter<S, H, C>,
+	iter: RawIter<S, H, C, R>,
 }
 
-impl<S, H, C> Default for CachedIter<S, H, C>
+impl<S, H, C, R> Default for CachedIter<S, H, C, R>
 where
 	H: Hasher,
 {
@@ -313,23 +327,32 @@ fn access_cache<T, R>(cell: &CacheCell<T>, callback: impl FnOnce(&mut T) -> R) -
 }
 
 /// Patricia trie-based backend. Transaction type is an overlay of changes to commit.
-pub struct TrieBackend<S: TrieBackendStorage<H>, H: Hasher, C = DefaultCache<H>> {
-	pub(crate) essence: TrieBackendEssence<S, H, C>,
-	next_storage_key_cache: CacheCell<Option<CachedIter<S, H, C>>>,
+pub struct TrieBackend<
+	S: TrieBackendStorage<H>,
+	H: Hasher,
+	C = DefaultCache<H>,
+	R = DefaultRecorder<H>,
+> {
+	pub(crate) essence: TrieBackendEssence<S, H, C, R>,
+	next_storage_key_cache: CacheCell<Option<CachedIter<S, H, C, R>>>,
 }
 
-impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H> + Send + Sync>
-	TrieBackend<S, H, C>
+impl<
+		S: TrieBackendStorage<H>,
+		H: Hasher,
+		C: TrieCacheProvider<H> + Send + Sync,
+		R: TrieRecorderProvider<H> + Send + Sync,
+	> TrieBackend<S, H, C, R>
 where
 	H::Out: Codec,
 {
 	#[cfg(test)]
-	pub(crate) fn from_essence(essence: TrieBackendEssence<S, H, C>) -> Self {
+	pub(crate) fn from_essence(essence: TrieBackendEssence<S, H, C, R>) -> Self {
 		Self { essence, next_storage_key_cache: Default::default() }
 	}
 
 	/// Get backend essence reference.
-	pub fn essence(&self) -> &TrieBackendEssence<S, H, C> {
+	pub fn essence(&self) -> &TrieBackendEssence<S, H, C, R> {
 		&self.essence
 	}
 
@@ -361,28 +384,31 @@ where
 	/// Extract the [`StorageProof`].
 	///
 	/// This only returns `Some` when there was a recorder set.
-	#[cfg(feature = "std")]
 	pub fn extract_proof(mut self) -> Option<StorageProof> {
-		self.essence.recorder.take().map(|r| r.drain_storage_proof())
+		self.essence.recorder.take().and_then(|r| r.drain_storage_proof())
 	}
 }
 
-impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H>> sp_std::fmt::Debug
-	for TrieBackend<S, H, C>
+impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H>, R: TrieRecorderProvider<H>>
+	sp_std::fmt::Debug for TrieBackend<S, H, C, R>
 {
 	fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result {
 		write!(f, "TrieBackend")
 	}
 }
 
-impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H> + Send + Sync> Backend<H>
-	for TrieBackend<S, H, C>
+impl<
+		S: TrieBackendStorage<H>,
+		H: Hasher,
+		C: TrieCacheProvider<H> + Send + Sync,
+		R: TrieRecorderProvider<H> + Send + Sync,
+	> Backend<H> for TrieBackend<S, H, C, R>
 where
 	H::Out: Ord + Codec,
 {
 	type Error = crate::DefaultError;
 	type TrieBackendStorage = S;
-	type RawIter = crate::trie_backend_essence::RawIter<S, H, C>;
+	type RawIter = crate::trie_backend_essence::RawIter<S, H, C, R>;
 
 	fn storage_hash(&self, key: &[u8]) -> Result<Option<H::Out>, Self::Error> {
 		self.essence.storage_hash(key)
diff --git a/substrate/primitives/state-machine/src/trie_backend_essence.rs b/substrate/primitives/state-machine/src/trie_backend_essence.rs
index ad7aeab899c..3f789111dee 100644
--- a/substrate/primitives/state-machine/src/trie_backend_essence.rs
+++ b/substrate/primitives/state-machine/src/trie_backend_essence.rs
@@ -28,19 +28,19 @@ use hash_db::{self, AsHashDB, HashDB, HashDBRef, Hasher, Prefix};
 #[cfg(feature = "std")]
 use parking_lot::RwLock;
 use sp_core::storage::{ChildInfo, ChildType, StateVersion};
-use sp_std::{boxed::Box, marker::PhantomData, vec::Vec};
 #[cfg(feature = "std")]
-use sp_trie::recorder::Recorder;
+use sp_std::sync::Arc;
+use sp_std::{boxed::Box, marker::PhantomData, vec::Vec};
 use sp_trie::{
 	child_delta_trie_root, delta_trie_root, empty_child_trie_root,
 	read_child_trie_first_descedant_value, read_child_trie_hash, read_child_trie_value,
 	read_trie_first_descedant_value, read_trie_value,
 	trie_types::{TrieDBBuilder, TrieError},
 	DBValue, KeySpacedDB, MerkleValue, NodeCodec, PrefixedMemoryDB, Trie, TrieCache,
-	TrieDBRawIterator, TrieRecorder,
+	TrieDBRawIterator, TrieRecorder, TrieRecorderProvider,
 };
 #[cfg(feature = "std")]
-use std::{collections::HashMap, sync::Arc};
+use std::collections::HashMap;
 // In this module, we only use layout for read operation and empty root,
 // where V1 and V0 are equivalent.
 use sp_trie::LayoutV1 as Layout;
@@ -83,7 +83,7 @@ enum IterState {
 }
 
 /// A raw iterator over the storage.
-pub struct RawIter<S, H, C>
+pub struct RawIter<S, H, C, R>
 where
 	H: Hasher,
 {
@@ -93,25 +93,26 @@ where
 	child_info: Option<ChildInfo>,
 	trie_iter: TrieDBRawIterator<Layout<H>>,
 	state: IterState,
-	_phantom: PhantomData<(S, C)>,
+	_phantom: PhantomData<(S, C, R)>,
 }
 
-impl<S, H, C> RawIter<S, H, C>
+impl<S, H, C, R> RawIter<S, H, C, R>
 where
 	H: Hasher,
 	S: TrieBackendStorage<H>,
 	H::Out: Codec + Ord,
 	C: TrieCacheProvider<H> + Send + Sync,
+	R: TrieRecorderProvider<H> + Send + Sync,
 {
 	#[inline]
-	fn prepare<R>(
+	fn prepare<RE>(
 		&mut self,
-		backend: &TrieBackendEssence<S, H, C>,
+		backend: &TrieBackendEssence<S, H, C, R>,
 		callback: impl FnOnce(
 			&sp_trie::TrieDB<Layout<H>>,
 			&mut TrieDBRawIterator<Layout<H>>,
-		) -> Option<core::result::Result<R, Box<TrieError<<H as Hasher>::Out>>>>,
-	) -> Option<Result<R>> {
+		) -> Option<core::result::Result<RE, Box<TrieError<<H as Hasher>::Out>>>>,
+	) -> Option<Result<RE>> {
 		if !matches!(self.state, IterState::Pending) {
 			return None
 		}
@@ -139,7 +140,7 @@ where
 	}
 }
 
-impl<S, H, C> Default for RawIter<S, H, C>
+impl<S, H, C, R> Default for RawIter<S, H, C, R>
 where
 	H: Hasher,
 {
@@ -156,14 +157,15 @@ where
 	}
 }
 
-impl<S, H, C> StorageIterator<H> for RawIter<S, H, C>
+impl<S, H, C, R> StorageIterator<H> for RawIter<S, H, C, R>
 where
 	H: Hasher,
 	S: TrieBackendStorage<H>,
 	H::Out: Codec + Ord,
 	C: TrieCacheProvider<H> + Send + Sync,
+	R: TrieRecorderProvider<H> + Send + Sync,
 {
-	type Backend = crate::TrieBackend<S, H, C>;
+	type Backend = crate::TrieBackend<S, H, C, R>;
 	type Error = crate::DefaultError;
 
 	#[inline]
@@ -204,18 +206,17 @@ where
 }
 
 /// Patricia trie-based pairs storage essence.
-pub struct TrieBackendEssence<S: TrieBackendStorage<H>, H: Hasher, C> {
+pub struct TrieBackendEssence<S: TrieBackendStorage<H>, H: Hasher, C, R> {
 	storage: S,
 	root: H::Out,
 	empty: H::Out,
 	#[cfg(feature = "std")]
 	pub(crate) cache: Arc<RwLock<Cache<H::Out>>>,
 	pub(crate) trie_node_cache: Option<C>,
-	#[cfg(feature = "std")]
-	pub(crate) recorder: Option<Recorder<H>>,
+	pub(crate) recorder: Option<R>,
 }
 
-impl<S: TrieBackendStorage<H>, H: Hasher, C> TrieBackendEssence<S, H, C> {
+impl<S: TrieBackendStorage<H>, H: Hasher, C, R> TrieBackendEssence<S, H, C, R> {
 	/// Create new trie-based backend.
 	pub fn new(storage: S, root: H::Out) -> Self {
 		Self::new_with_cache(storage, root, None)
@@ -230,23 +231,22 @@ impl<S: TrieBackendStorage<H>, H: Hasher, C> TrieBackendEssence<S, H, C> {
 			#[cfg(feature = "std")]
 			cache: Arc::new(RwLock::new(Cache::new())),
 			trie_node_cache: cache,
-			#[cfg(feature = "std")]
 			recorder: None,
 		}
 	}
 
 	/// Create new trie-based backend.
-	#[cfg(feature = "std")]
 	pub fn new_with_cache_and_recorder(
 		storage: S,
 		root: H::Out,
 		cache: Option<C>,
-		recorder: Option<Recorder<H>>,
+		recorder: Option<R>,
 	) -> Self {
 		TrieBackendEssence {
 			storage,
 			root,
 			empty: H::hash(&[0u8]),
+			#[cfg(feature = "std")]
 			cache: Arc::new(RwLock::new(Cache::new())),
 			trie_node_cache: cache,
 			recorder,
@@ -289,37 +289,31 @@ impl<S: TrieBackendStorage<H>, H: Hasher, C> TrieBackendEssence<S, H, C> {
 	}
 }
 
-impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H>> TrieBackendEssence<S, H, C> {
+impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H>, R: TrieRecorderProvider<H>>
+	TrieBackendEssence<S, H, C, R>
+{
 	/// Call the given closure passing it the recorder and the cache.
 	///
 	/// If the given `storage_root` is `None`, `self.root` will be used.
 	#[inline]
-	fn with_recorder_and_cache<R>(
+	fn with_recorder_and_cache<RE>(
 		&self,
 		storage_root: Option<H::Out>,
 		callback: impl FnOnce(
 			Option<&mut dyn TrieRecorder<H::Out>>,
 			Option<&mut dyn TrieCache<NodeCodec<H>>>,
-		) -> R,
-	) -> R {
+		) -> RE,
+	) -> RE {
 		let storage_root = storage_root.unwrap_or_else(|| self.root);
 		let mut cache = self.trie_node_cache.as_ref().map(|c| c.as_trie_db_cache(storage_root));
 		let cache = cache.as_mut().map(|c| c as _);
 
-		#[cfg(feature = "std")]
-		{
-			let mut recorder = self.recorder.as_ref().map(|r| r.as_trie_recorder(storage_root));
-			let recorder = match recorder.as_mut() {
-				Some(recorder) => Some(recorder as &mut dyn TrieRecorder<H::Out>),
-				None => None,
-			};
-			callback(recorder, cache)
-		}
-
-		#[cfg(not(feature = "std"))]
-		{
-			callback(None, cache)
-		}
+		let mut recorder = self.recorder.as_ref().map(|r| r.as_trie_recorder(storage_root));
+		let recorder = match recorder.as_mut() {
+			Some(recorder) => Some(recorder as &mut dyn TrieRecorder<H::Out>),
+			None => None,
+		};
+		callback(recorder, cache)
 	}
 
 	/// Call the given closure passing it the recorder and the cache.
@@ -329,15 +323,14 @@ impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H>> TrieBackendEs
 	/// the new storage root. This is required to register the changes in the cache
 	/// for the correct storage root. The given `storage_root` corresponds to the root of the "old"
 	/// trie. If the value is not given, `self.root` is used.
-	#[cfg(feature = "std")]
-	fn with_recorder_and_cache_for_storage_root<R>(
+	fn with_recorder_and_cache_for_storage_root<RE>(
 		&self,
 		storage_root: Option<H::Out>,
 		callback: impl FnOnce(
 			Option<&mut dyn TrieRecorder<H::Out>>,
 			Option<&mut dyn TrieCache<NodeCodec<H>>>,
-		) -> (Option<H::Out>, R),
-	) -> R {
+		) -> (Option<H::Out>, RE),
+	) -> RE {
 		let storage_root = storage_root.unwrap_or_else(|| self.root);
 		let mut recorder = self.recorder.as_ref().map(|r| r.as_trie_recorder(storage_root));
 		let recorder = match recorder.as_mut() {
@@ -361,46 +354,26 @@ impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H>> TrieBackendEs
 
 		result
 	}
-
-	#[cfg(not(feature = "std"))]
-	fn with_recorder_and_cache_for_storage_root<R>(
-		&self,
-		_storage_root: Option<H::Out>,
-		callback: impl FnOnce(
-			Option<&mut dyn TrieRecorder<H::Out>>,
-			Option<&mut dyn TrieCache<NodeCodec<H>>>,
-		) -> (Option<H::Out>, R),
-	) -> R {
-		if let Some(local_cache) = self.trie_node_cache.as_ref() {
-			let mut cache = local_cache.as_trie_db_mut_cache();
-
-			let (new_root, r) = callback(None, Some(&mut cache));
-
-			if let Some(new_root) = new_root {
-				local_cache.merge(cache, new_root);
-			}
-
-			r
-		} else {
-			callback(None, None).1
-		}
-	}
 }
 
-impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H> + Send + Sync>
-	TrieBackendEssence<S, H, C>
+impl<
+		S: TrieBackendStorage<H>,
+		H: Hasher,
+		C: TrieCacheProvider<H> + Send + Sync,
+		R: TrieRecorderProvider<H> + Send + Sync,
+	> TrieBackendEssence<S, H, C, R>
 where
 	H::Out: Codec + Ord,
 {
 	/// Calls the given closure with a [`TrieDb`] constructed for the given
 	/// storage root and (optionally) child trie.
 	#[inline]
-	fn with_trie_db<R>(
+	fn with_trie_db<RE>(
 		&self,
 		root: H::Out,
 		child_info: Option<&ChildInfo>,
-		callback: impl FnOnce(&sp_trie::TrieDB<Layout<H>>) -> R,
-	) -> R {
+		callback: impl FnOnce(&sp_trie::TrieDB<Layout<H>>) -> RE,
+	) -> RE {
 		let backend = self as &dyn HashDBRef<H, Vec<u8>>;
 		let db = child_info
 			.as_ref()
@@ -609,7 +582,7 @@ where
 	}
 
 	/// Create a raw iterator over the storage.
-	pub fn raw_iter(&self, args: IterArgs) -> Result<RawIter<S, H, C>> {
+	pub fn raw_iter(&self, args: IterArgs) -> Result<RawIter<S, H, C, R>> {
 		let root = if let Some(child_info) = args.child_info.as_ref() {
 			let root = match self.child_root(&child_info)? {
 				Some(root) => root,
@@ -831,19 +804,28 @@ where
 	}
 }
 
-impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H> + Send + Sync>
-	AsHashDB<H, DBValue> for TrieBackendEssence<S, H, C>
+impl<
+		S: TrieBackendStorage<H>,
+		H: Hasher,
+		C: TrieCacheProvider<H> + Send + Sync,
+		R: TrieRecorderProvider<H> + Send + Sync,
+	> AsHashDB<H, DBValue> for TrieBackendEssence<S, H, C, R>
 {
 	fn as_hash_db<'b>(&'b self) -> &'b (dyn HashDB<H, DBValue> + 'b) {
 		self
 	}
+
 	fn as_hash_db_mut<'b>(&'b mut self) -> &'b mut (dyn HashDB<H, DBValue> + 'b) {
 		self
 	}
 }
 
-impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H> + Send + Sync> HashDB<H, DBValue>
-	for TrieBackendEssence<S, H, C>
+impl<
+		S: TrieBackendStorage<H>,
+		H: Hasher,
+		C: TrieCacheProvider<H> + Send + Sync,
+		R: TrieRecorderProvider<H> + Send + Sync,
+	> HashDB<H, DBValue> for TrieBackendEssence<S, H, C, R>
 {
 	fn get(&self, key: &H::Out, prefix: Prefix) -> Option<DBValue> {
 		if *key == self.empty {
@@ -875,8 +857,12 @@ impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H> + Send + Sync>
 	}
 }
 
-impl<S: TrieBackendStorage<H>, H: Hasher, C: TrieCacheProvider<H> + Send + Sync>
-	HashDBRef<H, DBValue> for TrieBackendEssence<S, H, C>
+impl<
+		S: TrieBackendStorage<H>,
+		H: Hasher,
+		C: TrieCacheProvider<H> + Send + Sync,
+		R: TrieRecorderProvider<H> + Send + Sync,
+	> HashDBRef<H, DBValue> for TrieBackendEssence<S, H, C, R>
 {
 	fn get(&self, key: &H::Out, prefix: Prefix) -> Option<DBValue> {
 		HashDB::get(self, key, prefix)
@@ -928,7 +914,10 @@ mod test {
 				.expect("insert failed");
 		};
 
-		let essence_1 = TrieBackendEssence::<_, _, LocalTrieCache<_>>::new(mdb, root_1);
+		let essence_1 =
+			TrieBackendEssence::<_, _, LocalTrieCache<_>, sp_trie::recorder::Recorder<_>>::new(
+				mdb, root_1,
+			);
 		let mdb = essence_1.backend_storage().clone();
 		let essence_1 = TrieBackend::from_essence(essence_1);
 
@@ -938,7 +927,10 @@ mod test {
 		assert_eq!(essence_1.next_storage_key(b"5"), Ok(Some(b"6".to_vec())));
 		assert_eq!(essence_1.next_storage_key(b"6"), Ok(None));
 
-		let essence_2 = TrieBackendEssence::<_, _, LocalTrieCache<_>>::new(mdb, root_2);
+		let essence_2 =
+			TrieBackendEssence::<_, _, LocalTrieCache<_>, sp_trie::recorder::Recorder<_>>::new(
+				mdb, root_2,
+			);
 
 		assert_eq!(essence_2.next_child_storage_key(child_info, b"2"), Ok(Some(b"3".to_vec())));
 		assert_eq!(essence_2.next_child_storage_key(child_info, b"3"), Ok(Some(b"4".to_vec())));
diff --git a/substrate/primitives/trie/Cargo.toml b/substrate/primitives/trie/Cargo.toml
index 0822d84a76e..931ccce2044 100644
--- a/substrate/primitives/trie/Cargo.toml
+++ b/substrate/primitives/trie/Cargo.toml
@@ -34,6 +34,7 @@ trie-db = { version = "0.28.0", default-features = false }
 trie-root = { version = "0.18.0", default-features = false }
 sp-core = { path = "../core", default-features = false}
 sp-std = { path = "../std", default-features = false}
+sp-externalities = { path = "../externalities", default-features = false }
 schnellru = { version = "0.2.1", optional = true }
 
 [dev-dependencies]
@@ -58,6 +59,7 @@ std = [
 	"scale-info/std",
 	"schnellru",
 	"sp-core/std",
+	"sp-externalities/std",
 	"sp-runtime/std",
 	"sp-std/std",
 	"thiserror",
diff --git a/substrate/primitives/trie/src/lib.rs b/substrate/primitives/trie/src/lib.rs
index 1a1ed670454..fd1320b3fbc 100644
--- a/substrate/primitives/trie/src/lib.rs
+++ b/substrate/primitives/trie/src/lib.rs
@@ -30,6 +30,9 @@ mod storage_proof;
 mod trie_codec;
 mod trie_stream;
 
+#[cfg(feature = "std")]
+pub mod proof_size_extension;
+
 /// Our `NodeCodec`-specific error.
 pub use error::Error;
 /// Various re-exports from the `hash-db` crate.
@@ -146,6 +149,29 @@ where
 	}
 }
 
+/// Type that is able to provide a [`trie_db::TrieRecorder`].
+///
+/// Types implementing this trait can be used to maintain recorded state
+/// across operations on different [`trie_db::TrieDB`] instances.
+pub trait TrieRecorderProvider<H: Hasher> {
+	/// Recorder type that is going to be returned by implementors of this trait.
+	type Recorder<'a>: trie_db::TrieRecorder<H::Out> + 'a
+	where
+		Self: 'a;
+
+	/// Create a [`StorageProof`] derived from the internal state.
+	fn drain_storage_proof(self) -> Option<StorageProof>;
+
+	/// Provide a recorder implementing [`trie_db::TrieRecorder`].
+	fn as_trie_recorder(&self, storage_root: H::Out) -> Self::Recorder<'_>;
+}
+
+/// Type that is able to provide a proof size estimation.
+pub trait ProofSizeProvider {
+	/// Returns the storage proof size.
+	fn estimate_encoded_size(&self) -> usize;
+}
+
 /// TrieDB error over `TrieConfiguration` trait.
 pub type TrieError<L> = trie_db::TrieError<TrieHash<L>, CError<L>>;
 /// Reexport from `hash_db`, with genericity set for `Hasher` trait.
diff --git a/substrate/primitives/trie/src/proof_size_extension.rs b/substrate/primitives/trie/src/proof_size_extension.rs
new file mode 100644
index 00000000000..c97f334494a
--- /dev/null
+++ b/substrate/primitives/trie/src/proof_size_extension.rs
@@ -0,0 +1,39 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// 	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Externalities extension that provides access to the current proof size
+//! of the underlying recorder.
+
+use crate::ProofSizeProvider;
+
+sp_externalities::decl_extension! {
+	/// The proof size extension to fetch the current storage proof size
+	/// in externalities.
+	pub struct ProofSizeExt(Box<dyn ProofSizeProvider + 'static + Sync + Send>);
+}
+
+impl ProofSizeExt {
+	/// Creates a new instance of [`ProofSizeExt`].
+	pub fn new<T: ProofSizeProvider + Sync + Send + 'static>(recorder: T) -> Self {
+		ProofSizeExt(Box::new(recorder))
+	}
+
+	/// Returns the storage proof size.
+	pub fn storage_proof_size(&self) -> u64 {
+		self.0.estimate_encoded_size() as _
+	}
+}
diff --git a/substrate/primitives/trie/src/recorder.rs b/substrate/primitives/trie/src/recorder.rs
index 154cee3f37d..b236f281bb1 100644
--- a/substrate/primitives/trie/src/recorder.rs
+++ b/substrate/primitives/trie/src/recorder.rs
@@ -23,7 +23,7 @@
 use crate::{NodeCodec, StorageProof};
 use codec::Encode;
 use hash_db::Hasher;
-use parking_lot::Mutex;
+use parking_lot::{Mutex, MutexGuard};
 use std::{
 	collections::{HashMap, HashSet},
 	marker::PhantomData,
@@ -80,7 +80,9 @@ impl<H> Default for RecorderInner<H> {
 
 /// The trie recorder.
 ///
-/// It can be used to record accesses to the trie and then to convert them into a [`StorageProof`].
+/// Owns the recorded data. Is used to transform data into a storage
+/// proof and to provide transaction support. The `as_trie_recorder` method provides a
+/// [`trie_db::TrieDB`] compatible recorder that implements the actual recording logic.
 pub struct Recorder<H: Hasher> {
 	inner: Arc<Mutex<RecorderInner<H::Out>>>,
 	/// The estimated encoded size of the storage proof this recorder will produce.
@@ -112,11 +114,8 @@ impl<H: Hasher> Recorder<H> {
 	///
 	/// NOTE: This locks a mutex that stays locked until the return value is dropped.
 	#[inline]
-	pub fn as_trie_recorder(
-		&self,
-		storage_root: H::Out,
-	) -> impl trie_db::TrieRecorder<H::Out> + '_ {
-		TrieRecorder::<H, _> {
+	pub fn as_trie_recorder(&self, storage_root: H::Out) -> TrieRecorder<'_, H> {
+		TrieRecorder::<H> {
 			inner: self.inner.lock(),
 			storage_root,
 			encoded_size_estimation: self.encoded_size_estimation.clone(),
@@ -231,15 +230,33 @@ impl<H: Hasher> Recorder<H> {
 	}
 }
 
+impl<H: Hasher> crate::ProofSizeProvider for Recorder<H> {
+	fn estimate_encoded_size(&self) -> usize {
+		Recorder::estimate_encoded_size(self)
+	}
+}
+
 /// The [`TrieRecorder`](trie_db::TrieRecorder) implementation.
-struct TrieRecorder<H: Hasher, I> {
-	inner: I,
+pub struct TrieRecorder<'a, H: Hasher> {
+	inner: MutexGuard<'a, RecorderInner<H::Out>>,
 	storage_root: H::Out,
 	encoded_size_estimation: Arc<AtomicUsize>,
 	_phantom: PhantomData<H>,
 }
 
-impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> TrieRecorder<H, I> {
+impl<H: Hasher> crate::TrieRecorderProvider<H> for Recorder<H> {
+	type Recorder<'a> = TrieRecorder<'a, H> where H: 'a;
+
+	fn drain_storage_proof(self) -> Option<StorageProof> {
+		Some(Recorder::drain_storage_proof(self))
+	}
+
+	fn as_trie_recorder(&self, storage_root: H::Out) -> Self::Recorder<'_> {
+		Recorder::as_trie_recorder(&self, storage_root)
+	}
+}
+
+impl<'a, H: Hasher> TrieRecorder<'a, H> {
 	/// Update the recorded keys entry for the given `full_key`.
 	fn update_recorded_keys(&mut self, full_key: &[u8], access: RecordedForKey) {
 		let inner = self.inner.deref_mut();
@@ -283,9 +300,7 @@ impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> TrieRecorder<H, I>
 	}
 }
 
-impl<H: Hasher, I: DerefMut<Target = RecorderInner<H::Out>>> trie_db::TrieRecorder<H::Out>
-	for TrieRecorder<H, I>
-{
+impl<'a, H: Hasher> trie_db::TrieRecorder<H::Out> for TrieRecorder<'a, H> {
 	fn record(&mut self, access: TrieAccess<H::Out>) {
 		let mut encoded_size_update = 0;
 
-- 
GitLab