diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock
index a65ca56cee3395e05280a899f1656f7005bc9c2b..a5fa0aafcfd4d5ab52e9e2d43dab248a61b7e65e 100644
--- a/substrate/Cargo.lock
+++ b/substrate/Cargo.lock
@@ -6691,7 +6691,6 @@ dependencies = [
  "sp-api",
  "sp-block-builder",
  "sp-blockchain",
- "sp-consensus",
  "sp-core",
  "sp-inherents",
  "sp-runtime",
diff --git a/substrate/bin/node/bench/src/construct.rs b/substrate/bin/node/bench/src/construct.rs
index a8a02f19c306eeee2c5d0ccab85b77bbb3fb3aa5..b64ffec641c229b3e06e3eabc3dd7cfc7b747c20 100644
--- a/substrate/bin/node/bench/src/construct.rs
+++ b/substrate/bin/node/bench/src/construct.rs
@@ -48,7 +48,7 @@ use sp_transaction_pool::{
 	TransactionStatusStreamFor,
 	TxHash,
 };
-use sp_consensus::{Environment, Proposer, RecordProof};
+use sp_consensus::{Environment, Proposer};
 
 use crate::{
 	common::SizeType,
@@ -170,7 +170,6 @@ impl core::Benchmark for ConstructionBenchmark {
 				inherent_data_providers.create_inherent_data().expect("Create inherent data failed"),
 				Default::default(),
 				std::time::Duration::from_secs(20),
-				RecordProof::Yes,
 			),
 		).map(|r| r.block).expect("Proposing failed");
 
diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs
index 312a0226fc3d9ffaa23b8ebcad26ab7f874da8d6..b6cad3c52de76f70698d8fe4409a6f49de29e3d8 100644
--- a/substrate/bin/node/cli/src/service.rs
+++ b/substrate/bin/node/cli/src/service.rs
@@ -478,7 +478,6 @@ mod tests {
 	use sc_consensus_epochs::descendent_query;
 	use sp_consensus::{
 		Environment, Proposer, BlockImportParams, BlockOrigin, ForkChoiceStrategy, BlockImport,
-		RecordProof,
 	};
 	use node_primitives::{Block, DigestItem, Signature};
 	use node_runtime::{BalancesCall, Call, UncheckedExtrinsic, Address};
@@ -611,7 +610,6 @@ mod tests {
 						inherent_data,
 						digest,
 						std::time::Duration::from_secs(1),
-						RecordProof::Yes,
 					).await
 				}).expect("Error making test block").block;
 
diff --git a/substrate/client/basic-authorship/README.md b/substrate/client/basic-authorship/README.md
index 1a20593c09eaa159a81ddd05c89abe77b7c1b57f..d29ce258e51341e771c551d0e29e2f7b1caa406f 100644
--- a/substrate/client/basic-authorship/README.md
+++ b/substrate/client/basic-authorship/README.md
@@ -20,7 +20,6 @@ let future = proposer.propose(
 	Default::default(),
 	Default::default(),
 	Duration::from_secs(2),
-	RecordProof::Yes,
 );
 
 // We wait until the proposition is performed.
@@ -29,4 +28,4 @@ println!("Generated block: {:?}", block.block);
 ```
 
 
-License: GPL-3.0-or-later WITH Classpath-exception-2.0
\ No newline at end of file
+License: GPL-3.0-or-later WITH Classpath-exception-2.0
diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs
index 067695e5a84da52da95405aa258d3afb3c7fa46b..0c5bb7abefa5bba128ec195e710fa5b505040285 100644
--- a/substrate/client/basic-authorship/src/basic_authorship.rs
+++ b/substrate/client/basic-authorship/src/basic_authorship.rs
@@ -23,7 +23,7 @@
 use std::{pin::Pin, time, sync::Arc};
 use sc_client_api::backend;
 use codec::Decode;
-use sp_consensus::{evaluation, Proposal, RecordProof};
+use sp_consensus::{evaluation, Proposal, ProofRecording, DisableProofRecording, EnableProofRecording};
 use sp_core::traits::SpawnNamed;
 use sp_inherents::InherentData;
 use log::{error, info, debug, trace, warn};
@@ -52,7 +52,7 @@ use sc_proposer_metrics::MetricsLink as PrometheusMetrics;
 pub const DEFAULT_MAX_BLOCK_SIZE: usize = 4 * 1024 * 1024 + 512;
 
 /// Proposer factory.
-pub struct ProposerFactory<A, B, C> {
+pub struct ProposerFactory<A, B, C, PR> {
 	spawn_handle: Box<dyn SpawnNamed>,
 	/// The client instance.
 	client: Arc<C>,
@@ -60,12 +60,15 @@ pub struct ProposerFactory<A, B, C> {
 	transaction_pool: Arc<A>,
 	/// Prometheus Link,
 	metrics: PrometheusMetrics,
-	/// phantom member to pin the `Backend` type.
-	_phantom: PhantomData<B>,
+	/// phantom member to pin the `Backend`/`ProofRecording` type.
+	_phantom: PhantomData<(B, PR)>,
 	max_block_size: usize,
 }
 
-impl<A, B, C> ProposerFactory<A, B, C> {
+impl<A, B, C> ProposerFactory<A, B, C, DisableProofRecording> {
+	/// Create a new proposer factory.
+	///
+	/// Proof recording will be disabled when using proposers built by this instance to build blocks.
 	pub fn new(
 		spawn_handle: impl SpawnNamed + 'static,
 		client: Arc<C>,
@@ -81,7 +84,30 @@ impl<A, B, C> ProposerFactory<A, B, C> {
 			max_block_size: DEFAULT_MAX_BLOCK_SIZE,
 		}
 	}
+}
+
+impl<A, B, C> ProposerFactory<A, B, C, EnableProofRecording> {
+	/// Create a new proposer factory with proof recording enabled.
+	///
+	/// Each proposer created by this instance will record a proof while building a block.
+	pub fn with_proof_recording(
+		spawn_handle: impl SpawnNamed + 'static,
+		client: Arc<C>,
+		transaction_pool: Arc<A>,
+		prometheus: Option<&PrometheusRegistry>,
+	) -> Self {
+		ProposerFactory {
+			spawn_handle: Box::new(spawn_handle),
+			client,
+			transaction_pool,
+			metrics: PrometheusMetrics::new(prometheus),
+			_phantom: PhantomData,
+			max_block_size: DEFAULT_MAX_BLOCK_SIZE,
+		}
+	}
+}
 
+impl<A, B, C, PR> ProposerFactory<A, B, C, PR> {
 	/// Set the maximum block size in bytes.
 	///
 	/// The default value for the maximum block size is:
@@ -91,7 +117,7 @@ impl<A, B, C> ProposerFactory<A, B, C> {
 	}
 }
 
-impl<B, Block, C, A> ProposerFactory<A, B, C>
+impl<B, Block, C, A, PR> ProposerFactory<A, B, C, PR>
 	where
 		A: TransactionPool<Block = Block> + 'static,
 		B: backend::Backend<Block> + Send + Sync + 'static,
@@ -101,18 +127,18 @@ impl<B, Block, C, A> ProposerFactory<A, B, C>
 		C::Api: ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>
 			+ BlockBuilderApi<Block>,
 {
-	pub fn init_with_now(
+	fn init_with_now(
 		&mut self,
 		parent_header: &<Block as BlockT>::Header,
 		now: Box<dyn Fn() -> time::Instant + Send + Sync>,
-	) -> Proposer<B, Block, C, A> {
+	) -> Proposer<B, Block, C, A, PR> {
 		let parent_hash = parent_header.hash();
 
 		let id = BlockId::hash(parent_hash);
 
 		info!("🙌 Starting consensus session on top of parent {:?}", parent_hash);
 
-		let proposer = Proposer {
+		let proposer = Proposer::<_, _, _, _, PR> {
 			spawn_handle: self.spawn_handle.clone(),
 			client: self.client.clone(),
 			parent_hash,
@@ -129,8 +155,8 @@ impl<B, Block, C, A> ProposerFactory<A, B, C>
 	}
 }
 
-impl<A, B, Block, C> sp_consensus::Environment<Block> for
-	ProposerFactory<A, B, C>
+impl<A, B, Block, C, PR> sp_consensus::Environment<Block> for
+	ProposerFactory<A, B, C, PR>
 		where
 			A: TransactionPool<Block = Block> + 'static,
 			B: backend::Backend<Block> + Send + Sync + 'static,
@@ -139,9 +165,10 @@ impl<A, B, Block, C> sp_consensus::Environment<Block> for
 				+ Send + Sync + 'static,
 			C::Api: ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>
 				+ BlockBuilderApi<Block>,
+			PR: ProofRecording,
 {
 	type CreateProposer = future::Ready<Result<Self::Proposer, Self::Error>>;
-	type Proposer = Proposer<B, Block, C, A>;
+	type Proposer = Proposer<B, Block, C, A, PR>;
 	type Error = sp_blockchain::Error;
 
 	fn init(
@@ -153,7 +180,7 @@ impl<A, B, Block, C> sp_consensus::Environment<Block> for
 }
 
 /// The proposer logic.
-pub struct Proposer<B, Block: BlockT, C, A: TransactionPool> {
+pub struct Proposer<B, Block: BlockT, C, A: TransactionPool, PR> {
 	spawn_handle: Box<dyn SpawnNamed>,
 	client: Arc<C>,
 	parent_hash: <Block as BlockT>::Hash,
@@ -162,12 +189,12 @@ pub struct Proposer<B, Block: BlockT, C, A: TransactionPool> {
 	transaction_pool: Arc<A>,
 	now: Box<dyn Fn() -> time::Instant + Send + Sync>,
 	metrics: PrometheusMetrics,
-	_phantom: PhantomData<B>,
+	_phantom: PhantomData<(B, PR)>,
 	max_block_size: usize,
 }
 
-impl<A, B, Block, C> sp_consensus::Proposer<Block> for
-	Proposer<B, Block, C, A>
+impl<A, B, Block, C, PR> sp_consensus::Proposer<Block> for
+	Proposer<B, Block, C, A, PR>
 		where
 			A: TransactionPool<Block = Block> + 'static,
 			B: backend::Backend<Block> + Send + Sync + 'static,
@@ -176,19 +203,21 @@ impl<A, B, Block, C> sp_consensus::Proposer<Block> for
 				+ Send + Sync + 'static,
 			C::Api: ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>
 				+ BlockBuilderApi<Block>,
+			PR: ProofRecording,
 {
 	type Transaction = backend::TransactionFor<B, Block>;
 	type Proposal = Pin<Box<dyn Future<
-		Output = Result<Proposal<Block, Self::Transaction>, Self::Error>
+		Output = Result<Proposal<Block, Self::Transaction, PR::Proof>, Self::Error>
 	> + Send>>;
 	type Error = sp_blockchain::Error;
+	type ProofRecording = PR;
+	type Proof = PR::Proof;
 
 	fn propose(
 		self,
 		inherent_data: InherentData,
 		inherent_digests: DigestFor<Block>,
 		max_duration: time::Duration,
-		record_proof: RecordProof,
 	) -> Self::Proposal {
 		let (tx, rx) = oneshot::channel();
 		let spawn_handle = self.spawn_handle.clone();
@@ -200,7 +229,6 @@ impl<A, B, Block, C> sp_consensus::Proposer<Block> for
 				inherent_data,
 				inherent_digests,
 				deadline,
-				record_proof,
 			).await;
 			if tx.send(res).is_err() {
 				trace!("Could not send block production result to proposer!");
@@ -213,7 +241,7 @@ impl<A, B, Block, C> sp_consensus::Proposer<Block> for
 	}
 }
 
-impl<A, B, Block, C> Proposer<B, Block, C, A>
+impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 	where
 		A: TransactionPool<Block = Block>,
 		B: backend::Backend<Block> + Send + Sync + 'static,
@@ -222,14 +250,14 @@ impl<A, B, Block, C> Proposer<B, Block, C, A>
 			+ Send + Sync + 'static,
 		C::Api: ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>
 			+ BlockBuilderApi<Block>,
+		PR: ProofRecording,
 {
 	async fn propose_with(
 		self,
 		inherent_data: InherentData,
 		inherent_digests: DigestFor<Block>,
 		deadline: time::Instant,
-		record_proof: RecordProof,
-	) -> Result<Proposal<Block, backend::TransactionFor<B, Block>>, sp_blockchain::Error> {
+	) -> Result<Proposal<Block, backend::TransactionFor<B, Block>, PR::Proof>, sp_blockchain::Error> {
 		/// If the block is full we will attempt to push at most
 		/// this number of transactions before quitting for real.
 		/// It allows us to increase block utilization.
@@ -238,7 +266,7 @@ impl<A, B, Block, C> Proposer<B, Block, C, A>
 		let mut block_builder = self.client.new_block_at(
 			&self.parent_id,
 			inherent_digests,
-			record_proof,
+			PR::ENABLED,
 		)?;
 
 		for inherent in block_builder.create_inherents(inherent_data)? {
@@ -361,6 +389,8 @@ impl<A, B, Block, C> Proposer<B, Block, C, A>
 			error!("Failed to evaluate authored block: {:?}", err);
 		}
 
+		let proof = PR::into_proof(proof)
+			.map_err(|e| sp_blockchain::Error::Application(Box::new(e)))?;
 		Ok(Proposal { block, proof, storage_changes })
 	}
 }
@@ -452,7 +482,7 @@ mod tests {
 		// when
 		let deadline = time::Duration::from_secs(3);
 		let block = futures::executor::block_on(
-			proposer.propose(Default::default(), Default::default(), deadline, RecordProof::No)
+			proposer.propose(Default::default(), Default::default(), deadline)
 		).map(|r| r.block).unwrap();
 
 		// then
@@ -497,7 +527,7 @@ mod tests {
 
 		let deadline = time::Duration::from_secs(1);
 		futures::executor::block_on(
-			proposer.propose(Default::default(), Default::default(), deadline, RecordProof::No)
+			proposer.propose(Default::default(), Default::default(), deadline)
 		).map(|r| r.block).unwrap();
 	}
 
@@ -543,7 +573,7 @@ mod tests {
 
 		let deadline = time::Duration::from_secs(9);
 		let proposal = futures::executor::block_on(
-			proposer.propose(Default::default(), Default::default(), deadline, RecordProof::No),
+			proposer.propose(Default::default(), Default::default(), deadline),
 		).unwrap();
 
 		assert_eq!(proposal.block.extrinsics().len(), 1);
@@ -624,7 +654,7 @@ mod tests {
 			// when
 			let deadline = time::Duration::from_secs(9);
 			let block = futures::executor::block_on(
-				proposer.propose(Default::default(), Default::default(), deadline, RecordProof::No)
+				proposer.propose(Default::default(), Default::default(), deadline)
 			).map(|r| r.block).unwrap();
 
 			// then
diff --git a/substrate/client/basic-authorship/src/lib.rs b/substrate/client/basic-authorship/src/lib.rs
index 224dccd36b5339bff698aab58e7b3a8a59d8a9d5..ccf73cc93f197997838ae813780e24d14d1d2cba 100644
--- a/substrate/client/basic-authorship/src/lib.rs
+++ b/substrate/client/basic-authorship/src/lib.rs
@@ -22,7 +22,7 @@
 //!
 //! ```
 //! # use sc_basic_authorship::ProposerFactory;
-//! # use sp_consensus::{Environment, Proposer, RecordProof};
+//! # use sp_consensus::{Environment, Proposer};
 //! # use sp_runtime::generic::BlockId;
 //! # use std::{sync::Arc, time::Duration};
 //! # use substrate_test_runtime_client::{
@@ -61,7 +61,6 @@
 //! 	Default::default(),
 //! 	Default::default(),
 //! 	Duration::from_secs(2),
-//! 	RecordProof::Yes,
 //! );
 //!
 //! // We wait until the proposition is performed.
diff --git a/substrate/client/block-builder/Cargo.toml b/substrate/client/block-builder/Cargo.toml
index dda5edde36db5b43cfe6b08962d84bd6587ec17e..1019e2411c68c51ed1c3a6908e85086ca9777907 100644
--- a/substrate/client/block-builder/Cargo.toml
+++ b/substrate/client/block-builder/Cargo.toml
@@ -17,7 +17,6 @@ targets = ["x86_64-unknown-linux-gnu"]
 sp-state-machine = { version = "0.9.0", path = "../../primitives/state-machine" }
 sp-runtime = { version = "3.0.0", path = "../../primitives/runtime" }
 sp-api = { version = "3.0.0", path = "../../primitives/api" }
-sp-consensus = { version = "0.9.0", path = "../../primitives/consensus/common" }
 sp-blockchain = { version = "3.0.0", path = "../../primitives/blockchain" }
 sp-core = { version = "3.0.0", path = "../../primitives/core" }
 sp-block-builder = { version = "3.0.0", path = "../../primitives/block-builder" }
diff --git a/substrate/client/block-builder/src/lib.rs b/substrate/client/block-builder/src/lib.rs
index 5f700da8914a3eba548c567664b881a93f3e99a2..4893072a713771d66c0465055645b5e227ca2e8a 100644
--- a/substrate/client/block-builder/src/lib.rs
+++ b/substrate/client/block-builder/src/lib.rs
@@ -37,12 +37,48 @@ use sp_core::ExecutionContext;
 use sp_api::{
 	Core, ApiExt, ApiRef, ProvideRuntimeApi, StorageChanges, StorageProof, TransactionOutcome,
 };
-use sp_consensus::RecordProof;
 
 pub use sp_block_builder::BlockBuilder as BlockBuilderApi;
 
 use sc_client_api::backend;
 
+/// Used as parameter to [`BlockBuilderProvider`] to express if proof recording should be enabled.
+///
+/// When `RecordProof::Yes` is given, all accessed trie nodes should be saved. These recorded
+/// trie nodes can be used by a third party to proof this proposal without having access to the
+/// full storage.
+#[derive(Copy, Clone, PartialEq)]
+pub enum RecordProof {
+	/// `Yes`, record a proof.
+	Yes,
+	/// `No`, don't record any proof.
+	No,
+}
+
+impl RecordProof {
+	/// Returns if `Self` == `Yes`.
+	pub fn yes(&self) -> bool {
+		matches!(self, Self::Yes)
+	}
+}
+
+/// Will return [`RecordProof::No`] as default value.
+impl Default for RecordProof {
+	fn default() -> Self {
+		Self::No
+	}
+}
+
+impl From<bool> for RecordProof {
+	fn from(val: bool) -> Self {
+		if val {
+			Self::Yes
+		} else {
+			Self::No
+		}
+	}
+}
+
 /// A block that was build by [`BlockBuilder`] plus some additional data.
 ///
 /// This additional data includes the `storage_changes`, these changes can be applied to the
diff --git a/substrate/client/consensus/aura/src/lib.rs b/substrate/client/consensus/aura/src/lib.rs
index 29c4a401551668a40a4eea4bdcdba0fd9884225f..746ee6597ea7db257fe617aa5e70e17b76c11e5e 100644
--- a/substrate/client/consensus/aura/src/lib.rs
+++ b/substrate/client/consensus/aura/src/lib.rs
@@ -179,7 +179,7 @@ pub fn start_aura<B, C, SC, E, I, P, SO, CAW, BS, Error>(
 		&inherent_data_providers,
 		slot_duration.slot_duration()
 	)?;
-	Ok(sc_consensus_slots::start_slot_worker::<_, _, _, _, _, AuraSlotCompatible, _>(
+	Ok(sc_consensus_slots::start_slot_worker::<_, _, _, _, _, AuraSlotCompatible, _, _>(
 		slot_duration,
 		select_chain,
 		worker,
@@ -877,7 +877,9 @@ pub fn import_queue<B, I, C, P, S, CAW>(
 #[cfg(test)]
 mod tests {
 	use super::*;
-	use sp_consensus::{NoNetwork as DummyOracle, Proposal, RecordProof, AlwaysCanAuthor};
+	use sp_consensus::{
+		NoNetwork as DummyOracle, Proposal, AlwaysCanAuthor, DisableProofRecording,
+	};
 	use sc_network_test::{Block as TestBlock, *};
 	use sp_runtime::traits::{Block as BlockT, DigestFor};
 	use sc_network::config::ProtocolConfig;
@@ -916,20 +918,21 @@ mod tests {
 			substrate_test_runtime_client::Backend,
 			TestBlock
 		>;
-		type Proposal = future::Ready<Result<Proposal<TestBlock, Self::Transaction>, Error>>;
+		type Proposal = future::Ready<Result<Proposal<TestBlock, Self::Transaction, ()>, Error>>;
+		type ProofRecording = DisableProofRecording;
+		type Proof = ();
 
 		fn propose(
 			self,
 			_: InherentData,
 			digests: DigestFor<TestBlock>,
 			_: Duration,
-			_: RecordProof,
 		) -> Self::Proposal {
 			let r = self.1.new_block(digests).unwrap().build().map_err(|e| e.into());
 
 			future::ready(r.map(|b| Proposal {
 				block: b.block,
-				proof: b.proof,
+				proof: (),
 				storage_changes: b.storage_changes,
 			}))
 		}
diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs
index 9d03a3266d615f1efef23f022df1cf4dd6fe6f2d..a33a509ddc3da00c7206ffaf9de5a7b75c621369 100644
--- a/substrate/client/consensus/babe/src/tests.rs
+++ b/substrate/client/consensus/babe/src/tests.rs
@@ -32,11 +32,10 @@ use sp_consensus_babe::{AuthorityPair, Slot, AllowedSlots, make_transcript, make
 use sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging;
 use sc_block_builder::{BlockBuilder, BlockBuilderProvider};
 use sp_consensus::{
-	NoNetwork as DummyOracle, Proposal, RecordProof, AlwaysCanAuthor,
+	NoNetwork as DummyOracle, Proposal, DisableProofRecording, AlwaysCanAuthor,
 	import_queue::{BoxBlockImport, BoxJustificationImport},
 };
-use sc_network_test::*;
-use sc_network_test::{Block as TestBlock, PeersClient};
+use sc_network_test::{Block as TestBlock, *};
 use sc_network::config::ProtocolConfig;
 use sp_runtime::{generic::DigestItem, traits::{Block as BlockT, DigestFor}};
 use sc_client_api::{BlockchainEvents, backend::TransactionFor};
@@ -44,8 +43,7 @@ use log::debug;
 use std::{time::Duration, cell::RefCell, task::Poll};
 use rand::RngCore;
 use rand_chacha::{
-	rand_core::SeedableRng,
-	ChaChaRng,
+	rand_core::SeedableRng, ChaChaRng,
 };
 use sc_keystore::LocalKeystore;
 use sp_application_crypto::key_types::BABE;
@@ -112,7 +110,8 @@ impl DummyProposer {
 			Result<
 				Proposal<
 					TestBlock,
-					sc_client_api::TransactionFor<substrate_test_runtime_client::Backend, TestBlock>
+					sc_client_api::TransactionFor<substrate_test_runtime_client::Backend, TestBlock>,
+					()
 				>,
 				Error
 			>
@@ -163,21 +162,22 @@ impl DummyProposer {
 		// mutate the block header according to the mutator.
 		(self.factory.mutator)(&mut block.header, Stage::PreSeal);
 
-		future::ready(Ok(Proposal { block, proof: None, storage_changes: Default::default() }))
+		future::ready(Ok(Proposal { block, proof: (), storage_changes: Default::default() }))
 	}
 }
 
 impl Proposer<TestBlock> for DummyProposer {
 	type Error = Error;
 	type Transaction = sc_client_api::TransactionFor<substrate_test_runtime_client::Backend, TestBlock>;
-	type Proposal = future::Ready<Result<Proposal<TestBlock, Self::Transaction>, Error>>;
+	type Proposal = future::Ready<Result<Proposal<TestBlock, Self::Transaction, ()>, Error>>;
+	type ProofRecording = DisableProofRecording;
+	type Proof = ();
 
 	fn propose(
 		mut self,
 		_: InherentData,
 		pre_digests: DigestFor<TestBlock>,
 		_: Duration,
-		_: RecordProof,
 	) -> Self::Proposal {
 		self.propose_with(pre_digests)
 	}
diff --git a/substrate/client/consensus/manual-seal/src/seal_block.rs b/substrate/client/consensus/manual-seal/src/seal_block.rs
index 59b99349bf9b2a6b6834d6d8f166be8738bfbd95..2176973f3a2984754348b49960f1239c607f22ac 100644
--- a/substrate/client/consensus/manual-seal/src/seal_block.rs
+++ b/substrate/client/consensus/manual-seal/src/seal_block.rs
@@ -123,8 +123,11 @@ pub async fn seal_block<B, BI, SC, C, E, P>(
 			Default::default()
 		};
 
-		let proposal = proposer.propose(id.clone(), digest, Duration::from_secs(MAX_PROPOSAL_DURATION), false.into())
-			.map_err(|err| Error::StringError(format!("{:?}", err))).await?;
+		let proposal = proposer.propose(
+			id.clone(),
+			digest,
+			Duration::from_secs(MAX_PROPOSAL_DURATION),
+		).map_err(|err| Error::StringError(format!("{:?}", err))).await?;
 
 		if proposal.block.extrinsics().len() == inherents_len && !create_empty {
 			return Err(Error::EmptyTransactionPool)
diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs
index 3c7f1a832d3cd98210a89e958cff52548b9b7c38..19f339cf10151972bdb8094657d8e5931db7ff65 100644
--- a/substrate/client/consensus/pow/src/lib.rs
+++ b/substrate/client/consensus/pow/src/lib.rs
@@ -52,8 +52,7 @@ use sp_consensus_pow::{Seal, TotalDifficulty, POW_ENGINE_ID};
 use sp_inherents::{InherentDataProviders, InherentData};
 use sp_consensus::{
 	BlockImportParams, BlockOrigin, ForkChoiceStrategy, SyncOracle, Environment, Proposer,
-	SelectChain, Error as ConsensusError, CanAuthorWith, RecordProof, BlockImport,
-	BlockCheckParams, ImportResult,
+	SelectChain, Error as ConsensusError, CanAuthorWith, BlockImport, BlockCheckParams, ImportResult,
 };
 use sp_consensus::import_queue::{
 	BoxBlockImport, BasicQueue, Verifier, BoxJustificationImport,
@@ -549,7 +548,10 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
 	timeout: Duration,
 	build_time: Duration,
 	can_author_with: CAW,
-) -> (Arc<Mutex<MiningWorker<Block, Algorithm, C>>>, impl Future<Output = ()>) where
+) -> (
+	Arc<Mutex<MiningWorker<Block, Algorithm, C, <E::Proposer as Proposer<Block>>::Proof>>>,
+	impl Future<Output = ()>,
+) where
 	Block: BlockT,
 	C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
 	S: SelectChain<Block> + 'static,
@@ -566,7 +568,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
 	}
 
 	let timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
-	let worker = Arc::new(Mutex::new(MiningWorker::<Block, Algorithm, C> {
+	let worker = Arc::new(Mutex::new(MiningWorker::<Block, Algorithm, C, _> {
 		build: None,
 		algorithm: algorithm.clone(),
 		block_import,
@@ -664,7 +666,6 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
 				inherent_data,
 				inherent_digest,
 				build_time.clone(),
-				RecordProof::No,
 			).await {
 				Ok(x) => x,
 				Err(err) => {
@@ -678,7 +679,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
 				},
 			};
 
-			let build = MiningBuild::<Block, Algorithm, C> {
+			let build = MiningBuild::<Block, Algorithm, C, _> {
 				metadata: MiningMetadata {
 					best_hash,
 					pre_hash: proposal.block.header().hash(),
diff --git a/substrate/client/consensus/pow/src/worker.rs b/substrate/client/consensus/pow/src/worker.rs
index c19c5524d9774cf84863845ed911c8a8e160d5c4..d64596e48cf1a64202dfb07f293647ef44aaa0bf 100644
--- a/substrate/client/consensus/pow/src/worker.rs
+++ b/substrate/client/consensus/pow/src/worker.rs
@@ -40,21 +40,31 @@ pub struct MiningMetadata<H, D> {
 }
 
 /// A build of mining, containing the metadata and the block proposal.
-pub struct MiningBuild<Block: BlockT, Algorithm: PowAlgorithm<Block>, C: sp_api::ProvideRuntimeApi<Block>> {
+pub struct MiningBuild<
+	Block: BlockT,
+	Algorithm: PowAlgorithm<Block>,
+	C: sp_api::ProvideRuntimeApi<Block>,
+	Proof
+> {
 	/// Mining metadata.
 	pub metadata: MiningMetadata<Block::Hash, Algorithm::Difficulty>,
 	/// Mining proposal.
-	pub proposal: Proposal<Block, sp_api::TransactionFor<C, Block>>,
+	pub proposal: Proposal<Block, sp_api::TransactionFor<C, Block>, Proof>,
 }
 
 /// Mining worker that exposes structs to query the current mining build and submit mined blocks.
-pub struct MiningWorker<Block: BlockT, Algorithm: PowAlgorithm<Block>, C: sp_api::ProvideRuntimeApi<Block>> {
-	pub(crate) build: Option<MiningBuild<Block, Algorithm, C>>,
+pub struct MiningWorker<
+	Block: BlockT,
+	Algorithm: PowAlgorithm<Block>,
+	C: sp_api::ProvideRuntimeApi<Block>,
+	Proof
+> {
+	pub(crate) build: Option<MiningBuild<Block, Algorithm, C, Proof>>,
 	pub(crate) algorithm: Algorithm,
 	pub(crate) block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
 }
 
-impl<Block, Algorithm, C> MiningWorker<Block, Algorithm, C> where
+impl<Block, Algorithm, C, Proof> MiningWorker<Block, Algorithm, C, Proof> where
 	Block: BlockT,
 	C: sp_api::ProvideRuntimeApi<Block>,
 	Algorithm: PowAlgorithm<Block>,
@@ -72,7 +82,7 @@ impl<Block, Algorithm, C> MiningWorker<Block, Algorithm, C> where
 
 	pub(crate) fn on_build(
 		&mut self,
-		build: MiningBuild<Block, Algorithm, C>,
+		build: MiningBuild<Block, Algorithm, C, Proof>,
 	) {
 		self.build = Some(build);
 	}
diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs
index 1df8378d514f3f3a069675fa64f08cb12cecdb19..564d5c28c5839282d6b6ef74f9753ce507e31908 100644
--- a/substrate/client/consensus/slots/src/lib.rs
+++ b/substrate/client/consensus/slots/src/lib.rs
@@ -40,7 +40,7 @@ use log::{debug, error, info, warn};
 use parking_lot::Mutex;
 use sp_api::{ProvideRuntimeApi, ApiRef};
 use sp_arithmetic::traits::BaseArithmetic;
-use sp_consensus::{BlockImport, Proposer, SyncOracle, SelectChain, CanAuthorWith, SlotData, RecordProof};
+use sp_consensus::{BlockImport, Proposer, SyncOracle, SelectChain, CanAuthorWith, SlotData};
 use sp_consensus_slots::Slot;
 use sp_inherents::{InherentData, InherentDataProviders};
 use sp_runtime::{
@@ -57,20 +57,18 @@ pub type StorageChanges<Transaction, Block> =
 
 /// The result of [`SlotWorker::on_slot`].
 #[derive(Debug, Clone)]
-pub struct SlotResult<Block: BlockT> {
+pub struct SlotResult<Block: BlockT, Proof> {
 	/// The block that was built.
 	pub block: Block,
-	/// The optional storage proof that was calculated while building the block.
-	///
-	/// This needs to be enabled for the proposer to get this storage proof.
-	pub storage_proof: Option<sp_trie::StorageProof>,
+	/// The storage proof that was recorded while building the block.
+	pub storage_proof: Proof,
 }
 
 /// A worker that should be invoked at every new slot.
 ///
 /// The implementation should not make any assumptions of the slot being bound to the time or
 /// similar. The only valid assumption is that the slot number is always increasing.
-pub trait SlotWorker<B: BlockT> {
+pub trait SlotWorker<B: BlockT, Proof> {
 	/// Called when a new slot is triggered.
 	///
 	/// Returns a future that resolves to a [`SlotResult`] iff a block was successfully built in
@@ -79,7 +77,7 @@ pub trait SlotWorker<B: BlockT> {
 		&mut self,
 		chain_head: B::Header,
 		slot_info: SlotInfo,
-	) -> Pin<Box<dyn Future<Output = Option<SlotResult<B>>> + Send>>;
+	) -> Pin<Box<dyn Future<Output = Option<SlotResult<B, Proof>>> + Send>>;
 }
 
 /// A skeleton implementation for `SlotWorker` which tries to claim a slot at
@@ -206,7 +204,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
 		&mut self,
 		chain_head: B::Header,
 		slot_info: SlotInfo,
-	) -> Pin<Box<dyn Future<Output = Option<SlotResult<B>>> + Send>>
+	) -> Pin<Box<dyn Future<Output = Option<SlotResult<B, <Self::Proposer as Proposer<B>>::Proof>>> + Send>>
 	where
 		<Self::Proposer as Proposer<B>>::Proposal: Unpin + Send + 'static,
 	{
@@ -307,7 +305,6 @@ pub trait SimpleSlotWorker<B: BlockT> {
 				logs,
 			},
 			slot_remaining_duration,
-			RecordProof::No,
 		).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))));
 
 		let proposal_work =
@@ -384,12 +381,13 @@ pub trait SimpleSlotWorker<B: BlockT> {
 	}
 }
 
-impl<B: BlockT, T: SimpleSlotWorker<B>> SlotWorker<B> for T {
+impl<B: BlockT, T: SimpleSlotWorker<B>> SlotWorker<B, <T::Proposer as Proposer<B>>::Proof> for T
+{
 	fn on_slot(
 		&mut self,
 		chain_head: B::Header,
 		slot_info: SlotInfo,
-	) -> Pin<Box<dyn Future<Output = Option<SlotResult<B>>> + Send>> {
+	) -> Pin<Box<dyn Future<Output = Option<SlotResult<B, <T::Proposer as Proposer<B>>::Proof>>> + Send>> {
 		SimpleSlotWorker::on_slot(self, chain_head, slot_info)
 	}
 }
@@ -407,7 +405,7 @@ pub trait SlotCompatible {
 ///
 /// Every time a new slot is triggered, `worker.on_slot` is called and the future it returns is
 /// polled until completion, unless we are major syncing.
-pub fn start_slot_worker<B, C, W, T, SO, SC, CAW>(
+pub fn start_slot_worker<B, C, W, T, SO, SC, CAW, Proof>(
 	slot_duration: SlotDuration<T>,
 	client: C,
 	mut worker: W,
@@ -419,7 +417,7 @@ pub fn start_slot_worker<B, C, W, T, SO, SC, CAW>(
 where
 	B: BlockT,
 	C: SelectChain<B>,
-	W: SlotWorker<B>,
+	W: SlotWorker<B, Proof>,
 	SO: SyncOracle + Send,
 	SC: SlotCompatible + Unpin,
 	T: SlotData + Clone,
diff --git a/substrate/client/finality-grandpa/rpc/src/lib.rs b/substrate/client/finality-grandpa/rpc/src/lib.rs
index 204bea4c18e2cc005b3aeda5515af787223a5d47..2e7354e5fda6852da3d3301f5933c627a5b6a2d1 100644
--- a/substrate/client/finality-grandpa/rpc/src/lib.rs
+++ b/substrate/client/finality-grandpa/rpc/src/lib.rs
@@ -193,13 +193,12 @@ mod tests {
 	use jsonrpc_core::{Notification, Output, types::Params};
 
 	use parity_scale_codec::{Encode, Decode};
-	use sc_block_builder::BlockBuilder;
+	use sc_block_builder::{BlockBuilder, RecordProof};
 	use sc_finality_grandpa::{
 		report, AuthorityId, GrandpaJustificationSender, GrandpaJustification,
 		FinalityProof,
 	};
 	use sp_blockchain::HeaderBackend;
-	use sp_consensus::RecordProof;
 	use sp_core::crypto::Public;
 	use sp_keyring::Ed25519Keyring;
 	use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
@@ -438,7 +437,7 @@ mod tests {
 			&*client,
 			client.info().best_hash,
 			client.info().best_number,
-			RecordProof::Yes,
+			RecordProof::No,
 			Default::default(),
 			&*backend,
 		).unwrap().build().unwrap();
diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs
index b1ff0678ee9a4b5db44d109be793ec21bb9c071a..263ff7b9c569638033c0cc1d71c33b095b0c2fb4 100644
--- a/substrate/client/service/src/client/client.rs
+++ b/substrate/client/service/src/client/client.rs
@@ -52,7 +52,7 @@ use sp_state_machine::{
 use sc_executor::RuntimeVersion;
 use sp_consensus::{
 	Error as ConsensusError, BlockStatus, BlockImportParams, BlockCheckParams,
-	ImportResult, BlockOrigin, ForkChoiceStrategy, RecordProof,
+	ImportResult, BlockOrigin, ForkChoiceStrategy,
 };
 use sp_blockchain::{
 	self as blockchain,
@@ -66,7 +66,7 @@ use sp_api::{
 	CallApiAt, ConstructRuntimeApi, Core as CoreApi, ApiExt, ApiRef, ProvideRuntimeApi,
 	CallApiAtParams,
 };
-use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider};
+use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof};
 use sc_client_api::{
 	backend::{
 		self, BlockImportOperation, PrunableStateChangesTrieStorage,
diff --git a/substrate/primitives/consensus/common/src/lib.rs b/substrate/primitives/consensus/common/src/lib.rs
index 43edf4f7776c24dcbce91022bc03d4504cf75d65..b3aceb45e180fa3c45571265a5873119412c6a1f 100644
--- a/substrate/primitives/consensus/common/src/lib.rs
+++ b/substrate/primitives/consensus/common/src/lib.rs
@@ -36,7 +36,7 @@ use sp_runtime::{
 	generic::BlockId, traits::{Block as BlockT, DigestFor, NumberFor, HashFor},
 };
 use futures::prelude::*;
-pub use sp_inherents::InherentData;
+use sp_state_machine::StorageProof;
 
 pub mod block_validation;
 pub mod offline_tracker;
@@ -55,6 +55,7 @@ pub use block_import::{
 pub use select_chain::SelectChain;
 pub use sp_state_machine::Backend as StateBackend;
 pub use import_queue::DefaultImportQueue;
+pub use sp_inherents::InherentData;
 
 /// Block status.
 #[derive(Debug, PartialEq, Eq)]
@@ -89,53 +90,81 @@ pub trait Environment<B: BlockT> {
 }
 
 /// A proposal that is created by a [`Proposer`].
-pub struct Proposal<Block: BlockT, Transaction> {
+pub struct Proposal<Block: BlockT, Transaction, Proof> {
 	/// The block that was build.
 	pub block: Block,
-	/// Optional proof that was recorded while building the block.
-	pub proof: Option<sp_state_machine::StorageProof>,
+	/// Proof that was recorded while building the block.
+	pub proof: Proof,
 	/// The storage changes while building this block.
 	pub storage_changes: sp_state_machine::StorageChanges<Transaction, HashFor<Block>, NumberFor<Block>>,
 }
 
-/// Used as parameter to [`Proposer`] to tell the requirement on recording a proof.
+/// Error that is returned when [`ProofRecording`] requested to record a proof,
+/// but no proof was recorded.
+#[derive(Debug, thiserror::Error)]
+#[error("Proof should be recorded, but no proof was provided.")]
+pub struct NoProofRecorded;
+
+/// A trait to express the state of proof recording on type system level.
 ///
-/// When `RecordProof::Yes` is given, all accessed trie nodes should be saved. These recorded
-/// trie nodes can be used by a third party to proof this proposal without having access to the
-/// full storage.
-#[derive(Copy, Clone, PartialEq)]
-pub enum RecordProof {
-	/// `Yes`, record a proof.
-	Yes,
-	/// `No`, don't record any proof.
-	No,
+/// This is used by [`Proposer`] to signal if proof recording is enabled. This can be used by
+/// downstream users of the [`Proposer`] trait to enforce that proof recording is activated when
+/// required. The only two implementations of this trait are [`DisableProofRecording`] and
+/// [`EnableProofRecording`].
+///
+/// This trait is sealed and can not be implemented outside of this crate!
+pub trait ProofRecording: Send + Sync + private::Sealed + 'static {
+	/// The proof type that will be used internally.
+	type Proof: Send + Sync + 'static;
+	/// Is proof recording enabled?
+	const ENABLED: bool;
+	/// Convert the given `storage_proof` into [`Self::Proof`].
+	///
+	/// Internally Substrate uses `Option<StorageProof>` to express the both states of proof
+	/// recording (for now) and as [`Self::Proof`] is some different type, we need to provide a
+	/// function to convert this value.
+	///
+	/// If the proof recording was requested, but `None` is given, this will return
+	/// `Err(NoProofRecorded)`.
+	fn into_proof(storage_proof: Option<StorageProof>) -> Result<Self::Proof, NoProofRecorded>;
 }
 
-impl RecordProof {
-	/// Returns if `Self` == `Yes`.
-	pub fn yes(&self) -> bool {
-		match self {
-			Self::Yes => true,
-			Self::No => false,
-		}
+/// Express that proof recording is disabled.
+///
+/// For more information see [`ProofRecording`].
+pub struct DisableProofRecording;
+
+impl ProofRecording for DisableProofRecording {
+	type Proof = ();
+	const ENABLED: bool = false;
+
+	fn into_proof(_: Option<StorageProof>) -> Result<Self::Proof, NoProofRecorded> {
+		Ok(())
 	}
 }
 
-/// Will return [`RecordProof::No`] as default value.
-impl Default for RecordProof {
-	fn default() -> Self {
-		Self::No
+/// Express that proof recording is enabled.
+///
+/// For more information see [`ProofRecording`].
+pub struct EnableProofRecording;
+
+impl ProofRecording for EnableProofRecording {
+	type Proof = sp_state_machine::StorageProof;
+	const ENABLED: bool = true;
+
+	fn into_proof(proof: Option<StorageProof>) -> Result<Self::Proof, NoProofRecorded> {
+		proof.ok_or_else(|| NoProofRecorded)
 	}
 }
 
-impl From<bool> for RecordProof {
-	fn from(val: bool) -> Self {
-		if val {
-			Self::Yes
-		} else {
-			Self::No
-		}
-	}
+/// Provides `Sealed` trait to prevent implementing trait [`ProofRecording`] outside of this crate.
+mod private {
+	/// Special trait that prevents the implementation of [`super::ProofRecording`] outside of this
+	/// crate.
+	pub trait Sealed {}
+
+	impl Sealed for super::DisableProofRecording {}
+	impl Sealed for super::EnableProofRecording {}
 }
 
 /// Logic for a proposer.
@@ -150,8 +179,16 @@ pub trait Proposer<B: BlockT> {
 	/// The transaction type used by the backend.
 	type Transaction: Default + Send + 'static;
 	/// Future that resolves to a committed proposal with an optional proof.
-	type Proposal: Future<Output = Result<Proposal<B, Self::Transaction>, Self::Error>> +
-		Send + Unpin + 'static;
+	type Proposal:
+		Future<Output = Result<Proposal<B, Self::Transaction, Self::Proof>, Self::Error>>
+		+ Send
+		+ Unpin
+		+ 'static;
+	/// The supported proof recording by the implementator of this trait. See [`ProofRecording`]
+	/// for more information.
+	type ProofRecording: self::ProofRecording<Proof = Self::Proof> + Send + Sync + 'static;
+	/// The proof type used by [`Self::ProofRecording`].
+	type Proof: Send + Sync + 'static;
 
 	/// Create a proposal.
 	///
@@ -167,7 +204,6 @@ pub trait Proposer<B: BlockT> {
 		inherent_data: InherentData,
 		inherent_digests: DigestFor<B>,
 		max_duration: Duration,
-		record_proof: RecordProof,
 	) -> Self::Proposal;
 }