From 14b5acab867a36a0333ba7a691eb58b59c86c3e8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bastian=20K=C3=B6cher?= <bkchr@users.noreply.github.com>
Date: Wed, 14 Apr 2021 19:56:22 +0200
Subject: [PATCH] Introduce a "dynamic" block size limit for proposing (#8588)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Introduce a "dynamic" block size limit for proposing

This adds support for using a dynamic block size limit per call to
`propose`. This is required for Cumulus/Parachains to always use stay in
the limits of the maximum allowed PoV size.

As described in the docs, the block limit is only checked in the process
of pushing transactions. As we normally do some other operations in
`on_finalize`, it can happen that the block size still grows when there
is some proof being collected (as we do for parachains). This means,
that the given block limit needs to be rather conservative on the actual
value and should not be the upper limit.

* Update client/basic-authorship/src/basic_authorship.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* More future proof encoded size updating

* Use `ProofRecorderInner`

* Update client/basic-authorship/src/basic_authorship.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/basic-authorship/src/basic_authorship.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/basic-authorship/src/basic_authorship.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/consensus/slots/src/lib.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/consensus/slots/src/slots.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/basic-authorship/src/basic_authorship.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/basic-authorship/src/basic_authorship.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update client/basic-authorship/src/basic_authorship.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

Co-authored-by: Andronik Ordian <write@reusable.software>
Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
---
 substrate/bin/node/bench/src/construct.rs     |   1 +
 substrate/bin/node/cli/src/service.rs         |   1 +
 .../basic-authorship/src/basic_authorship.rs  | 181 +++++++++++++++---
 substrate/client/basic-authorship/src/lib.rs  |   3 +-
 substrate/client/block-builder/src/lib.rs     |  19 ++
 substrate/client/consensus/aura/src/lib.rs    |   2 +
 substrate/client/consensus/babe/src/tests.rs  |   1 +
 .../consensus/manual-seal/src/seal_block.rs   |   1 +
 substrate/client/consensus/pow/src/lib.rs     |   1 +
 substrate/client/consensus/slots/src/lib.rs   |   4 +-
 substrate/client/consensus/slots/src/slots.rs |   7 +
 substrate/client/db/src/bench.rs              |  29 +--
 .../api/proc-macro/src/impl_runtime_apis.rs   |  12 +-
 .../proc-macro/src/mock_impl_runtime_apis.rs  |   4 +
 substrate/primitives/api/src/lib.rs           |   5 +-
 .../consensus/common/src/evaluation.rs        |   8 -
 .../primitives/consensus/common/src/lib.rs    |   8 +
 .../state-machine/src/proving_backend.rs      | 127 ++++++++++--
 substrate/test-utils/runtime/src/lib.rs       |   2 +-
 19 files changed, 336 insertions(+), 80 deletions(-)

diff --git a/substrate/bin/node/bench/src/construct.rs b/substrate/bin/node/bench/src/construct.rs
index 8469ec62893..65246623171 100644
--- a/substrate/bin/node/bench/src/construct.rs
+++ b/substrate/bin/node/bench/src/construct.rs
@@ -171,6 +171,7 @@ impl core::Benchmark for ConstructionBenchmark {
 				inherent_data_providers.create_inherent_data().expect("Create inherent data failed"),
 				Default::default(),
 				std::time::Duration::from_secs(20),
+				None,
 			),
 		).map(|r| r.block).expect("Proposing failed");
 
diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs
index ce0ffb2cecc..5fa7aa00df5 100644
--- a/substrate/bin/node/cli/src/service.rs
+++ b/substrate/bin/node/cli/src/service.rs
@@ -679,6 +679,7 @@ mod tests {
 						inherent_data,
 						digest,
 						std::time::Duration::from_secs(1),
+						None,
 					).await
 				}).expect("Error making test block").block;
 
diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs
index 910abfad5ae..c8277d3b5d3 100644
--- a/substrate/client/basic-authorship/src/basic_authorship.rs
+++ b/substrate/client/basic-authorship/src/basic_authorship.rs
@@ -22,7 +22,7 @@
 
 use std::{pin::Pin, time, sync::Arc};
 use sc_client_api::backend;
-use codec::Decode;
+use codec::{Decode, Encode};
 use sp_consensus::{evaluation, Proposal, ProofRecording, DisableProofRecording, EnableProofRecording};
 use sp_core::traits::SpawnNamed;
 use sp_inherents::InherentData;
@@ -42,14 +42,14 @@ use std::marker::PhantomData;
 use prometheus_endpoint::Registry as PrometheusRegistry;
 use sc_proposer_metrics::MetricsLink as PrometheusMetrics;
 
-/// Default maximum block size in bytes used by [`Proposer`].
+/// Default block size limit in bytes used by [`Proposer`].
 ///
-/// Can be overwritten by [`ProposerFactory::set_maximum_block_size`].
+/// Can be overwritten by [`ProposerFactory::set_block_size_limit`].
 ///
 /// Be aware that there is also an upper packet size on what the networking code
 /// will accept. If the block doesn't fit in such a package, it can not be
 /// transferred to other nodes.
-pub const DEFAULT_MAX_BLOCK_SIZE: usize = 4 * 1024 * 1024 + 512;
+pub const DEFAULT_BLOCK_SIZE_LIMIT: usize = 4 * 1024 * 1024 + 512;
 
 /// Proposer factory.
 pub struct ProposerFactory<A, B, C, PR> {
@@ -60,8 +60,14 @@ pub struct ProposerFactory<A, B, C, PR> {
 	transaction_pool: Arc<A>,
 	/// Prometheus Link,
 	metrics: PrometheusMetrics,
-	max_block_size: usize,
+	/// The default block size limit.
+	///
+	/// If no `block_size_limit` is passed to [`Proposer::propose`], this block size limit will be
+	/// used.
+	default_block_size_limit: usize,
 	telemetry: Option<TelemetryHandle>,
+	/// When estimating the block size, should the proof be included?
+	include_proof_in_block_size_estimation: bool,
 	/// phantom member to pin the `Backend`/`ProofRecording` type.
 	_phantom: PhantomData<(B, PR)>,
 }
@@ -81,9 +87,10 @@ impl<A, B, C> ProposerFactory<A, B, C, DisableProofRecording> {
 			spawn_handle: Box::new(spawn_handle),
 			transaction_pool,
 			metrics: PrometheusMetrics::new(prometheus),
-			max_block_size: DEFAULT_MAX_BLOCK_SIZE,
+			default_block_size_limit: DEFAULT_BLOCK_SIZE_LIMIT,
 			telemetry,
 			client,
+			include_proof_in_block_size_estimation: false,
 			_phantom: PhantomData,
 		}
 	}
@@ -93,6 +100,9 @@ impl<A, B, C> ProposerFactory<A, B, C, EnableProofRecording> {
 	/// Create a new proposer factory with proof recording enabled.
 	///
 	/// Each proposer created by this instance will record a proof while building a block.
+	///
+	/// This will also include the proof into the estimation of the block size. This can be disabled
+	/// by calling [`ProposerFactory::disable_proof_in_block_size_estimation`].
 	pub fn with_proof_recording(
 		spawn_handle: impl SpawnNamed + 'static,
 		client: Arc<C>,
@@ -101,24 +111,32 @@ impl<A, B, C> ProposerFactory<A, B, C, EnableProofRecording> {
 		telemetry: Option<TelemetryHandle>,
 	) -> Self {
 		ProposerFactory {
-			spawn_handle: Box::new(spawn_handle),
 			client,
+			spawn_handle: Box::new(spawn_handle),
 			transaction_pool,
 			metrics: PrometheusMetrics::new(prometheus),
-			max_block_size: DEFAULT_MAX_BLOCK_SIZE,
+			default_block_size_limit: DEFAULT_BLOCK_SIZE_LIMIT,
 			telemetry,
+			include_proof_in_block_size_estimation: true,
 			_phantom: PhantomData,
 		}
 	}
+
+	/// Disable the proof inclusion when estimating the block size.
+	pub fn disable_proof_in_block_size_estimation(&mut self) {
+		self.include_proof_in_block_size_estimation = false;
+	}
 }
 
 impl<A, B, C, PR> ProposerFactory<A, B, C, PR> {
-	/// Set the maximum block size in bytes.
+	/// Set the default block size limit in bytes.
+	///
+	/// The default value for the block size limit is:
+	/// [`DEFAULT_BLOCK_SIZE_LIMIT`].
 	///
-	/// The default value for the maximum block size is:
-	/// [`DEFAULT_MAX_BLOCK_SIZE`].
-	pub fn set_maximum_block_size(&mut self, size: usize) {
-		self.max_block_size = size;
+	/// If there is no block size limit passed to [`Proposer::propose`], this value will be used.
+	pub fn set_default_block_size_limit(&mut self, limit: usize) {
+		self.default_block_size_limit = limit;
 	}
 }
 
@@ -152,9 +170,10 @@ impl<B, Block, C, A, PR> ProposerFactory<A, B, C, PR>
 			transaction_pool: self.transaction_pool.clone(),
 			now,
 			metrics: self.metrics.clone(),
-			max_block_size: self.max_block_size,
+			default_block_size_limit: self.default_block_size_limit,
 			telemetry: self.telemetry.clone(),
 			_phantom: PhantomData,
+			include_proof_in_block_size_estimation: self.include_proof_in_block_size_estimation,
 		};
 
 		proposer
@@ -195,7 +214,8 @@ pub struct Proposer<B, Block: BlockT, C, A: TransactionPool, PR> {
 	transaction_pool: Arc<A>,
 	now: Box<dyn Fn() -> time::Instant + Send + Sync>,
 	metrics: PrometheusMetrics,
-	max_block_size: usize,
+	default_block_size_limit: usize,
+	include_proof_in_block_size_estimation: bool,
 	telemetry: Option<TelemetryHandle>,
 	_phantom: PhantomData<(B, PR)>,
 }
@@ -225,6 +245,7 @@ impl<A, B, Block, C, PR> sp_consensus::Proposer<Block> for
 		inherent_data: InherentData,
 		inherent_digests: DigestFor<Block>,
 		max_duration: time::Duration,
+		block_size_limit: Option<usize>,
 	) -> Self::Proposal {
 		let (tx, rx) = oneshot::channel();
 		let spawn_handle = self.spawn_handle.clone();
@@ -236,6 +257,7 @@ impl<A, B, Block, C, PR> sp_consensus::Proposer<Block> for
 				inherent_data,
 				inherent_digests,
 				deadline,
+				block_size_limit,
 			).await;
 			if tx.send(res).is_err() {
 				trace!("Could not send block production result to proposer!");
@@ -264,6 +286,7 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 		inherent_data: InherentData,
 		inherent_digests: DigestFor<Block>,
 		deadline: time::Instant,
+		block_size_limit: Option<usize>,
 	) -> Result<Proposal<Block, backend::TransactionFor<B, Block>, PR::Proof>, sp_blockchain::Error> {
 		/// If the block is full we will attempt to push at most
 		/// this number of transactions before quitting for real.
@@ -297,7 +320,9 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 		let mut unqueue_invalid = Vec::new();
 
 		let mut t1 = self.transaction_pool.ready_at(self.parent_number).fuse();
-		let mut t2 = futures_timer::Delay::new(deadline.saturating_duration_since((self.now)()) / 8).fuse();
+		let mut t2 = futures_timer::Delay::new(
+			deadline.saturating_duration_since((self.now)()) / 8,
+		).fuse();
 
 		let pending_iterator = select! {
 			res = t1 => res,
@@ -311,8 +336,13 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 			},
 		};
 
+		let block_size_limit = block_size_limit.unwrap_or(self.default_block_size_limit);
+
 		debug!("Attempting to push transactions from the pool.");
 		debug!("Pool status: {:?}", self.transaction_pool.status());
+		let mut transaction_pushed = false;
+		let mut hit_block_size_limit = false;
+
 		for pending_tx in pending_iterator {
 			if (self.now)() > deadline {
 				debug!(
@@ -324,9 +354,30 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 
 			let pending_tx_data = pending_tx.data().clone();
 			let pending_tx_hash = pending_tx.hash().clone();
+
+			let block_size = block_builder.estimate_block_size(
+				self.include_proof_in_block_size_estimation,
+			);
+			if block_size + pending_tx_data.encoded_size() > block_size_limit {
+				if skipped < MAX_SKIPPED_TRANSACTIONS {
+					skipped += 1;
+					debug!(
+						"Transaction would overflow the block size limit, \
+						 but will try {} more transactions before quitting.",
+						MAX_SKIPPED_TRANSACTIONS - skipped,
+					);
+					continue;
+				} else {
+					debug!("Reached block size limit, proceeding with proposing.");
+					hit_block_size_limit = true;
+					break;
+				}
+			}
+
 			trace!("[{:?}] Pushing to the block.", pending_tx_hash);
 			match sc_block_builder::BlockBuilder::push(&mut block_builder, pending_tx_data) {
 				Ok(()) => {
+					transaction_pushed = true;
 					debug!("[{:?}] Pushed to the block.", pending_tx_hash);
 				}
 				Err(ApplyExtrinsicFailed(Validity(e)))
@@ -356,6 +407,13 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 			}
 		}
 
+		if hit_block_size_limit && !transaction_pushed {
+			warn!(
+				"Hit block size limit of `{}` without including any transaction!",
+				block_size_limit,
+			);
+		}
+
 		self.transaction_pool.remove_invalid(&unqueue_invalid);
 
 		let (block, storage_changes, proof) = block_builder.build()?.into_inner();
@@ -367,7 +425,8 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 			}
 		);
 
-		info!("🎁 Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
+		info!(
+			"🎁 Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
 			block.header().number(),
 			<Block as BlockT>::Hash::from(block.header().hash()),
 			block.header().parent_hash(),
@@ -394,7 +453,6 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 			&block,
 			&self.parent_hash,
 			self.parent_number,
-			self.max_block_size,
 		) {
 			error!("Failed to evaluate authored block: {:?}", err);
 		}
@@ -421,6 +479,7 @@ mod tests {
 	use sp_runtime::traits::NumberFor;
 	use sc_client_api::Backend;
 	use futures::executor::block_on;
+	use sp_consensus::Environment;
 
 	const SOURCE: TransactionSource = TransactionSource::External;
 
@@ -494,7 +553,7 @@ mod tests {
 		// when
 		let deadline = time::Duration::from_secs(3);
 		let block = block_on(
-			proposer.propose(Default::default(), Default::default(), deadline)
+			proposer.propose(Default::default(), Default::default(), deadline, None)
 		).map(|r| r.block).unwrap();
 
 		// then
@@ -540,7 +599,7 @@ mod tests {
 
 		let deadline = time::Duration::from_secs(1);
 		block_on(
-			proposer.propose(Default::default(), Default::default(), deadline)
+			proposer.propose(Default::default(), Default::default(), deadline, None)
 		).map(|r| r.block).unwrap();
 	}
 
@@ -587,7 +646,7 @@ mod tests {
 
 		let deadline = time::Duration::from_secs(9);
 		let proposal = block_on(
-			proposer.propose(Default::default(), Default::default(), deadline),
+			proposer.propose(Default::default(), Default::default(), deadline, None),
 		).unwrap();
 
 		assert_eq!(proposal.block.extrinsics().len(), 1);
@@ -669,7 +728,7 @@ mod tests {
 			// when
 			let deadline = time::Duration::from_secs(9);
 			let block = block_on(
-				proposer.propose(Default::default(), Default::default(), deadline)
+				proposer.propose(Default::default(), Default::default(), deadline, None)
 			).map(|r| r.block).unwrap();
 
 			// then
@@ -704,4 +763,82 @@ mod tests {
 		let block = propose_block(&client, 1, 2, 5);
 		block_on(client.import(BlockOrigin::Own, block)).unwrap();
 	}
+
+	#[test]
+	fn should_cease_building_block_when_block_limit_is_reached() {
+		let client = Arc::new(substrate_test_runtime_client::new());
+		let spawner = sp_core::testing::TaskExecutor::new();
+		let txpool = BasicPool::new_full(
+			Default::default(),
+			true.into(),
+			None,
+			spawner.clone(),
+			client.clone(),
+		);
+		let genesis_header = client.header(&BlockId::Number(0u64))
+			.expect("header get error")
+			.expect("there should be header");
+
+		let extrinsics_num = 4;
+		let extrinsics = (0..extrinsics_num)
+			.map(|v| Extrinsic::IncludeData(vec![v as u8; 10]))
+			.collect::<Vec<_>>();
+
+		let block_limit = genesis_header.encoded_size()
+			+ extrinsics.iter().take(extrinsics_num - 1).map(Encode::encoded_size).sum::<usize>()
+			+ Vec::<Extrinsic>::new().encoded_size();
+
+		block_on(
+			txpool.submit_at(&BlockId::number(0), SOURCE, extrinsics)
+		).unwrap();
+
+		block_on(txpool.maintain(chain_event(genesis_header.clone())));
+
+		let mut proposer_factory = ProposerFactory::new(
+			spawner.clone(),
+			client.clone(),
+			txpool.clone(),
+			None,
+			None,
+		);
+
+		let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
+
+		// Give it enough time
+		let deadline = time::Duration::from_secs(300);
+		let block = block_on(
+			proposer.propose(Default::default(), Default::default(), deadline, Some(block_limit))
+		).map(|r| r.block).unwrap();
+
+		// Based on the block limit, one transaction shouldn't be included.
+		assert_eq!(block.extrinsics().len(), extrinsics_num - 1);
+
+		let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
+
+		let block = block_on(
+			proposer.propose(Default::default(), Default::default(), deadline, None,
+		)).map(|r| r.block).unwrap();
+
+		// Without a block limit we should include all of them
+		assert_eq!(block.extrinsics().len(), extrinsics_num);
+
+		let mut proposer_factory = ProposerFactory::with_proof_recording(
+			spawner.clone(),
+			client.clone(),
+			txpool.clone(),
+			None,
+			None,
+		);
+
+		let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
+
+		// Give it enough time
+		let block = block_on(
+			proposer.propose(Default::default(), Default::default(), deadline, Some(block_limit))
+		).map(|r| r.block).unwrap();
+
+		// The block limit didn't changed, but we now include the proof in the estimation of the
+		// block size and thus, one less transaction should fit into the limit.
+		assert_eq!(block.extrinsics().len(), extrinsics_num - 2);
+	}
 }
diff --git a/substrate/client/basic-authorship/src/lib.rs b/substrate/client/basic-authorship/src/lib.rs
index acaf85db763..133b833cddd 100644
--- a/substrate/client/basic-authorship/src/lib.rs
+++ b/substrate/client/basic-authorship/src/lib.rs
@@ -62,6 +62,7 @@
 //! 	Default::default(),
 //! 	Default::default(),
 //! 	Duration::from_secs(2),
+//! 	None,
 //! );
 //!
 //! // We wait until the proposition is performed.
@@ -72,4 +73,4 @@
 
 mod basic_authorship;
 
-pub use crate::basic_authorship::{ProposerFactory, Proposer, DEFAULT_MAX_BLOCK_SIZE};
+pub use crate::basic_authorship::{ProposerFactory, Proposer, DEFAULT_BLOCK_SIZE_LIMIT};
diff --git a/substrate/client/block-builder/src/lib.rs b/substrate/client/block-builder/src/lib.rs
index 4893072a713..7d391f8fb85 100644
--- a/substrate/client/block-builder/src/lib.rs
+++ b/substrate/client/block-builder/src/lib.rs
@@ -135,6 +135,8 @@ pub struct BlockBuilder<'a, Block: BlockT, A: ProvideRuntimeApi<Block>, B> {
 	block_id: BlockId<Block>,
 	parent_hash: Block::Hash,
 	backend: &'a B,
+	/// The estimated size of the block header.
+	estimated_header_size: usize,
 }
 
 impl<'a, Block, A, B> BlockBuilder<'a, Block, A, B>
@@ -165,6 +167,8 @@ where
 			inherent_digests,
 		);
 
+		let estimated_header_size = header.encoded_size();
+
 		let mut api = api.runtime_api();
 
 		if record_proof.yes() {
@@ -183,6 +187,7 @@ where
 			api,
 			block_id,
 			backend,
+			estimated_header_size,
 		})
 	}
 
@@ -270,6 +275,20 @@ where
 			))
 		}).map_err(|e| Error::Application(Box::new(e)))
 	}
+
+	/// Estimate the size of the block in the current state.
+	///
+	/// If `include_proof` is `true`, the estimated size of the storage proof will be added
+	/// to the estimation.
+	pub fn estimate_block_size(&self, include_proof: bool) -> usize {
+		let size = self.estimated_header_size + self.extrinsics.encoded_size();
+
+		if include_proof {
+			size + self.api.proof_recorder().map(|pr| pr.estimate_encoded_size()).unwrap_or(0)
+		} else {
+			size
+		}
+	}
 }
 
 #[cfg(test)]
diff --git a/substrate/client/consensus/aura/src/lib.rs b/substrate/client/consensus/aura/src/lib.rs
index 77dac0f7544..3c72f359f8f 100644
--- a/substrate/client/consensus/aura/src/lib.rs
+++ b/substrate/client/consensus/aura/src/lib.rs
@@ -629,6 +629,7 @@ mod tests {
 			_: InherentData,
 			digests: DigestFor<TestBlock>,
 			_: Duration,
+			_: Option<usize>,
 		) -> Self::Proposal {
 			let r = self.1.new_block(digests).unwrap().build().map_err(|e| e.into());
 
@@ -887,6 +888,7 @@ mod tests {
 				ends_at: Instant::now() + Duration::from_secs(100),
 				inherent_data: InherentData::new(),
 				duration: Duration::from_millis(1000),
+				block_size_limit: None,
 			},
 		)).unwrap();
 
diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs
index 839d38b94a9..9949da61da5 100644
--- a/substrate/client/consensus/babe/src/tests.rs
+++ b/substrate/client/consensus/babe/src/tests.rs
@@ -182,6 +182,7 @@ impl Proposer<TestBlock> for DummyProposer {
 		_: InherentData,
 		pre_digests: DigestFor<TestBlock>,
 		_: Duration,
+		_: Option<usize>,
 	) -> Self::Proposal {
 		self.propose_with(pre_digests)
 	}
diff --git a/substrate/client/consensus/manual-seal/src/seal_block.rs b/substrate/client/consensus/manual-seal/src/seal_block.rs
index 23a560cebd5..b21630f0377 100644
--- a/substrate/client/consensus/manual-seal/src/seal_block.rs
+++ b/substrate/client/consensus/manual-seal/src/seal_block.rs
@@ -127,6 +127,7 @@ pub async fn seal_block<B, BI, SC, C, E, P>(
 			id.clone(),
 			digest,
 			Duration::from_secs(MAX_PROPOSAL_DURATION),
+			None,
 		).map_err(|err| Error::StringError(format!("{:?}", err))).await?;
 
 		if proposal.block.extrinsics().len() == inherents_len && !create_empty {
diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs
index ea2e30afdc4..bcbc2009321 100644
--- a/substrate/client/consensus/pow/src/lib.rs
+++ b/substrate/client/consensus/pow/src/lib.rs
@@ -669,6 +669,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
 				inherent_data,
 				inherent_digest,
 				build_time.clone(),
+				None,
 			).await {
 				Ok(x) => x,
 				Err(err) => {
diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs
index c1f13fea1f9..5157f381e6f 100644
--- a/substrate/client/consensus/slots/src/lib.rs
+++ b/substrate/client/consensus/slots/src/lib.rs
@@ -313,6 +313,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
 				logs,
 			},
 			proposing_remaining_duration.mul_f32(0.98),
+			None,
 		).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)));
 
 		let proposal = match futures::future::select(proposing, proposing_remaining).await {
@@ -535,7 +536,7 @@ pub enum Error<T> where T: Debug {
 	SlotDurationInvalid(SlotDuration<T>),
 }
 
-/// A slot duration. Create with `get_or_compute`.
+/// A slot duration. Create with [`get_or_compute`](Self::get_or_compute).
 // The internal member should stay private here to maintain invariants of
 // `get_or_compute`.
 #[derive(Clone, Copy, Debug, Encode, Decode, Hash, PartialOrd, Ord, PartialEq, Eq)]
@@ -793,6 +794,7 @@ mod test {
 			timestamp: Default::default(),
 			inherent_data: Default::default(),
 			ends_at: Instant::now(),
+			block_size_limit: None,
 		}
 	}
 
diff --git a/substrate/client/consensus/slots/src/slots.rs b/substrate/client/consensus/slots/src/slots.rs
index d7ed1eda64c..4057a6d0d15 100644
--- a/substrate/client/consensus/slots/src/slots.rs
+++ b/substrate/client/consensus/slots/src/slots.rs
@@ -58,6 +58,10 @@ pub struct SlotInfo {
 	pub inherent_data: InherentData,
 	/// Slot duration.
 	pub duration: Duration,
+	/// Some potential block size limit for the block to be authored at this slot.
+	///
+	/// For more information see [`Proposer::propose`](sp_consensus::Proposer::propose).
+	pub block_size_limit: Option<usize>,
 }
 
 impl SlotInfo {
@@ -69,12 +73,14 @@ impl SlotInfo {
 		timestamp: sp_timestamp::Timestamp,
 		inherent_data: InherentData,
 		duration: Duration,
+		block_size_limit: Option<usize>,
 	) -> Self {
 		Self {
 			slot,
 			timestamp,
 			inherent_data,
 			duration,
+			block_size_limit,
 			ends_at: Instant::now() + time_until_next(timestamp.as_duration(), duration),
 		}
 	}
@@ -147,6 +153,7 @@ impl<SC: SlotCompatible> Slots<SC> {
 					timestamp,
 					inherent_data,
 					self.slot_duration,
+					None,
 				))
 			}
 		}
diff --git a/substrate/client/db/src/bench.rs b/substrate/client/db/src/bench.rs
index 2704676207b..a2501891b31 100644
--- a/substrate/client/db/src/bench.rs
+++ b/substrate/client/db/src/bench.rs
@@ -23,7 +23,7 @@ use std::cell::{Cell, RefCell};
 use std::collections::HashMap;
 
 use hash_db::{Prefix, Hasher};
-use sp_trie::{MemoryDB, prefixed_key, StorageProof};
+use sp_trie::{MemoryDB, prefixed_key};
 use sp_core::{
 	storage::{ChildInfo, TrackedStorageKey},
 	hexdisplay::HexDisplay
@@ -34,7 +34,6 @@ use sp_state_machine::{
 	DBValue, backend::Backend as StateBackend, StorageCollection, ChildStorageCollection, ProofRecorder,
 };
 use kvdb::{KeyValueDB, DBTransaction};
-use codec::Encode;
 use crate::storage_cache::{CachingState, SharedCache, new_shared_cache};
 
 type DbState<B> = sp_state_machine::TrieBackend<
@@ -45,7 +44,7 @@ type State<B> = CachingState<DbState<B>, B>;
 
 struct StorageDb<Block: BlockT> {
 	db: Arc<dyn KeyValueDB>,
-	proof_recorder: Option<ProofRecorder<HashFor<Block>>>,
+	proof_recorder: Option<ProofRecorder<Block::Hash>>,
 	_block: std::marker::PhantomData<Block>,
 }
 
@@ -53,12 +52,12 @@ impl<Block: BlockT> sp_state_machine::Storage<HashFor<Block>> for StorageDb<Bloc
 	fn get(&self, key: &Block::Hash, prefix: Prefix) -> Result<Option<DBValue>, String> {
 		let prefixed_key = prefixed_key::<HashFor<Block>>(key, prefix);
 		if let Some(recorder) = &self.proof_recorder {
-			if let Some(v) = recorder.read().get(&key) {
+			if let Some(v) = recorder.get(&key) {
 				return Ok(v.clone());
 			}
 			let backend_value = self.db.get(0, &prefixed_key)
 				.map_err(|e| format!("Database backend error: {:?}", e))?;
-			recorder.write().insert(key.clone(), backend_value.clone());
+			recorder.record(key.clone(), backend_value.clone());
 			Ok(backend_value)
 		} else {
 			self.db.get(0, &prefixed_key)
@@ -117,7 +116,7 @@ pub struct BenchmarkingState<B: BlockT> {
 	child_key_tracker: RefCell<HashMap<Vec<u8>, HashMap<Vec<u8>, KeyTracker>>>,
 	read_write_tracker: RefCell<ReadWriteTracker>,
 	whitelist: RefCell<Vec<TrackedStorageKey>>,
-	proof_recorder: Option<ProofRecorder<HashFor<B>>>,
+	proof_recorder: Option<ProofRecorder<B::Hash>>,
 }
 
 impl<B: BlockT> BenchmarkingState<B> {
@@ -164,12 +163,10 @@ impl<B: BlockT> BenchmarkingState<B> {
 		*self.state.borrow_mut() = None;
 		let db = match self.db.take() {
 			Some(db) => db,
-			None => Arc::new(::kvdb_memorydb::create(1)),
+			None => Arc::new(kvdb_memorydb::create(1)),
 		};
 		self.db.set(Some(db.clone()));
-		if let Some(recorder) = &self.proof_recorder {
-			recorder.write().clear();
-		}
+		self.proof_recorder.as_ref().map(|r| r.reset());
 		let storage_db = Arc::new(StorageDb::<B> {
 			db,
 			proof_recorder: self.proof_recorder.clone(),
@@ -429,7 +426,8 @@ impl<B: BlockT> StateBackend<HashFor<B>> for BenchmarkingState<B> {
 		None
 	}
 
-	fn commit(&self,
+	fn commit(
+		&self,
 		storage_root: <HashFor<B> as Hasher>::Out,
 		mut transaction: Self::Transaction,
 		main_storage_changes: StorageCollection,
@@ -518,14 +516,7 @@ impl<B: BlockT> StateBackend<HashFor<B>> for BenchmarkingState<B> {
 	}
 
 	fn proof_size(&self) -> Option<u32> {
-		self.proof_recorder.as_ref().map(|recorder| {
-			let proof = StorageProof::new(recorder
-				.read()
-				.iter()
-				.filter_map(|(_k, v)| v.as_ref().map(|v| v.to_vec()))
-				.collect());
-			proof.encoded_size() as u32
-		})
+		self.proof_recorder.as_ref().map(|recorder| recorder.estimate_encoded_size() as u32)
 	}
 }
 
diff --git a/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs b/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs
index 2be8545a81d..642da2c465e 100644
--- a/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs
+++ b/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs
@@ -282,16 +282,14 @@ fn generate_runtime_api_base_structures() -> Result<TokenStream> {
 				self.recorder = Some(Default::default());
 			}
 
+			fn proof_recorder(&self) -> Option<#crate_::ProofRecorder<Block>> {
+				self.recorder.clone()
+			}
+
 			fn extract_proof(&mut self) -> Option<#crate_::StorageProof> {
 				self.recorder
 					.take()
-					.map(|recorder| {
-						let trie_nodes = recorder.read()
-							.iter()
-							.filter_map(|(_k, v)| v.as_ref().map(|v| v.to_vec()))
-							.collect();
-						#crate_::StorageProof::new(trie_nodes)
-					})
+					.map(|recorder| recorder.to_storage_proof())
 			}
 
 			fn into_storage_changes(
diff --git a/substrate/primitives/api/proc-macro/src/mock_impl_runtime_apis.rs b/substrate/primitives/api/proc-macro/src/mock_impl_runtime_apis.rs
index 62a03a59baa..383cd4f635e 100644
--- a/substrate/primitives/api/proc-macro/src/mock_impl_runtime_apis.rs
+++ b/substrate/primitives/api/proc-macro/src/mock_impl_runtime_apis.rs
@@ -102,6 +102,10 @@ fn implement_common_api_traits(
 				unimplemented!("`extract_proof` not implemented for runtime api mocks")
 			}
 
+			fn proof_recorder(&self) -> Option<#crate_::ProofRecorder<#block_type>> {
+				unimplemented!("`proof_recorder` not implemented for runtime api mocks")
+			}
+
 			fn into_storage_changes(
 				&self,
 				_: &Self::StateBackend,
diff --git a/substrate/primitives/api/src/lib.rs b/substrate/primitives/api/src/lib.rs
index afb9af343ba..155bb899a2e 100644
--- a/substrate/primitives/api/src/lib.rs
+++ b/substrate/primitives/api/src/lib.rs
@@ -362,7 +362,7 @@ pub use sp_api_proc_macro::mock_impl_runtime_apis;
 
 /// A type that records all accessed trie nodes and generates a proof out of it.
 #[cfg(feature = "std")]
-pub type ProofRecorder<B> = sp_state_machine::ProofRecorder<HashFor<B>>;
+pub type ProofRecorder<B> = sp_state_machine::ProofRecorder<<B as BlockT>::Hash>;
 
 /// A type that is used as cache for the storage transactions.
 #[cfg(feature = "std")]
@@ -471,6 +471,9 @@ pub trait ApiExt<Block: BlockT> {
 	/// If `record_proof` was not called before, this will return `None`.
 	fn extract_proof(&mut self) -> Option<StorageProof>;
 
+	/// Returns the current active proof recorder.
+	fn proof_recorder(&self) -> Option<ProofRecorder<Block>>;
+
 	/// Convert the api object into the storage changes that were done while executing runtime
 	/// api functions.
 	///
diff --git a/substrate/primitives/consensus/common/src/evaluation.rs b/substrate/primitives/consensus/common/src/evaluation.rs
index be930fa4a00..c18c8b127f9 100644
--- a/substrate/primitives/consensus/common/src/evaluation.rs
+++ b/substrate/primitives/consensus/common/src/evaluation.rs
@@ -39,9 +39,6 @@ pub enum Error {
 	/// Proposal had wrong number.
 	#[error("Proposal had wrong number. Expected {expected:?}, got {got:?}")]
 	WrongNumber { expected: BlockNumber, got: BlockNumber },
-	/// Proposal exceeded the maximum size.
-	#[error("Proposal size {block_size} exceeds maximum allowed size of {max_block_size}.")]
-	ProposalTooLarge { block_size: usize, max_block_size: usize },
 }
 
 /// Attempt to evaluate a substrate block as a node block, returning error
@@ -50,17 +47,12 @@ pub fn evaluate_initial<Block: BlockT>(
 	proposal: &Block,
 	parent_hash: &<Block as BlockT>::Hash,
 	parent_number: <<Block as BlockT>::Header as HeaderT>::Number,
-	max_block_size: usize,
 ) -> Result<()> {
 
 	let encoded = Encode::encode(proposal);
 	let proposal = Block::decode(&mut &encoded[..])
 		.map_err(|e| Error::BadProposalFormat(e))?;
 
-	if encoded.len() > max_block_size {
-		return Err(Error::ProposalTooLarge { max_block_size, block_size: encoded.len() })
-	}
-
 	if *parent_hash != *proposal.header().parent_hash() {
 		return Err(Error::WrongParentHash {
 			expected: format!("{:?}", *parent_hash),
diff --git a/substrate/primitives/consensus/common/src/lib.rs b/substrate/primitives/consensus/common/src/lib.rs
index 27a43dbe022..642b6b12e7d 100644
--- a/substrate/primitives/consensus/common/src/lib.rs
+++ b/substrate/primitives/consensus/common/src/lib.rs
@@ -196,6 +196,13 @@ pub trait Proposer<B: BlockT> {
 	/// a maximum duration for building this proposal is given. If building the proposal takes
 	/// longer than this maximum, the proposal will be very likely discarded.
 	///
+	/// If `block_size_limit` is given, the proposer should push transactions until the block size
+	/// limit is hit. Depending on the `finalize_block` implementation of the runtime, it probably
+	/// incorporates other operations (that are happening after the block limit is hit). So,
+	/// when the block size estimation also includes a proof that is recorded alongside the block
+	/// production, the proof can still grow. This means that the `block_size_limit` should not be
+	/// the hard limit of what is actually allowed.
+	///
 	/// # Return
 	///
 	/// Returns a future that resolves to a [`Proposal`] or to [`Error`].
@@ -204,6 +211,7 @@ pub trait Proposer<B: BlockT> {
 		inherent_data: InherentData,
 		inherent_digests: DigestFor<B>,
 		max_duration: Duration,
+		block_size_limit: Option<usize>,
 	) -> Self::Proposal;
 }
 
diff --git a/substrate/primitives/state-machine/src/proving_backend.rs b/substrate/primitives/state-machine/src/proving_backend.rs
index 6b87aa12eb1..28672659fa1 100644
--- a/substrate/primitives/state-machine/src/proving_backend.rs
+++ b/substrate/primitives/state-machine/src/proving_backend.rs
@@ -17,9 +17,9 @@
 
 //! Proving state machine backend.
 
-use std::{sync::Arc, collections::HashMap};
+use std::{sync::Arc, collections::{HashMap, hash_map::Entry}};
 use parking_lot::RwLock;
-use codec::{Decode, Codec};
+use codec::{Decode, Codec, Encode};
 use log::debug;
 use hash_db::{Hasher, HashDB, EMPTY_PREFIX, Prefix};
 use sp_trie::{
@@ -109,9 +109,69 @@ impl<'a, S, H> ProvingBackendRecorder<'a, S, H>
 	}
 }
 
-/// Global proof recorder, act as a layer over a hash db for recording queried
-/// data.
-pub type ProofRecorder<H> = Arc<RwLock<HashMap<<H as Hasher>::Out, Option<DBValue>>>>;
+#[derive(Default)]
+struct ProofRecorderInner<Hash> {
+	/// All the records that we have stored so far.
+	records: HashMap<Hash, Option<DBValue>>,
+	/// The encoded size of all recorded values.
+	encoded_size: usize,
+}
+
+/// Global proof recorder, act as a layer over a hash db for recording queried data.
+#[derive(Clone, Default)]
+pub struct ProofRecorder<Hash> {
+	inner: Arc<RwLock<ProofRecorderInner<Hash>>>,
+}
+
+impl<Hash: std::hash::Hash + Eq> ProofRecorder<Hash> {
+	/// Record the given `key` => `val` combination.
+	pub fn record(&self, key: Hash, val: Option<DBValue>) {
+		let mut inner = self.inner.write();
+		let encoded_size = if let Entry::Vacant(entry) = inner.records.entry(key) {
+			let encoded_size = val.as_ref().map(Encode::encoded_size).unwrap_or(0);
+
+			entry.insert(val);
+			encoded_size
+		} else {
+			0
+		};
+
+		inner.encoded_size += encoded_size;
+	}
+
+	/// Returns the value at the given `key`.
+	pub fn get(&self, key: &Hash) -> Option<Option<DBValue>> {
+		self.inner.read().records.get(key).cloned()
+	}
+
+	/// Returns the estimated encoded size of the proof.
+	///
+	/// The estimation is maybe bigger (by in maximum 4 bytes), but never smaller than the actual
+	/// encoded proof.
+	pub fn estimate_encoded_size(&self) -> usize {
+		let inner = self.inner.read();
+		inner.encoded_size
+			+ codec::Compact(inner.records.len() as u32).encoded_size()
+	}
+
+	/// Convert into a [`StorageProof`].
+	pub fn to_storage_proof(&self) -> StorageProof {
+		let trie_nodes = self.inner.read()
+			.records
+			.iter()
+			.filter_map(|(_k, v)| v.as_ref().map(|v| v.to_vec()))
+			.collect();
+
+		StorageProof::new(trie_nodes)
+	}
+
+	/// Reset the internal state.
+	pub fn reset(&self) {
+		let mut inner = self.inner.write();
+		inner.records.clear();
+		inner.encoded_size = 0;
+	}
+}
 
 /// Patricia trie-based backend which also tracks all touched storage trie values.
 /// These can be sent to remote node and used as a proof of execution.
@@ -122,7 +182,7 @@ pub struct ProvingBackend<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> (
 /// Trie backend storage with its proof recorder.
 pub struct ProofRecorderBackend<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> {
 	backend: &'a S,
-	proof_recorder: ProofRecorder<H>,
+	proof_recorder: ProofRecorder<H::Out>,
 }
 
 impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> ProvingBackend<'a, S, H>
@@ -137,7 +197,7 @@ impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> ProvingBackend<'a, S, H>
 	/// Create new proving backend with the given recorder.
 	pub fn new_with_recorder(
 		backend: &'a TrieBackend<S, H>,
-		proof_recorder: ProofRecorder<H>,
+		proof_recorder: ProofRecorder<H::Out>,
 	) -> Self {
 		let essence = backend.essence();
 		let root = essence.root().clone();
@@ -150,12 +210,7 @@ impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> ProvingBackend<'a, S, H>
 
 	/// Extracting the gathered unordered proof.
 	pub fn extract_proof(&self) -> StorageProof {
-		let trie_nodes = self.0.essence().backend_storage().proof_recorder
-			.read()
-			.iter()
-			.filter_map(|(_k, v)| v.as_ref().map(|v| v.to_vec()))
-			.collect();
-		StorageProof::new(trie_nodes)
+		self.0.essence().backend_storage().proof_recorder.to_storage_proof()
 	}
 }
 
@@ -165,11 +220,12 @@ impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> TrieBackendStorage<H>
 	type Overlay = S::Overlay;
 
 	fn get(&self, key: &H::Out, prefix: Prefix) -> Result<Option<DBValue>, String> {
-		if let Some(v) = self.proof_recorder.read().get(key) {
-			return Ok(v.clone());
+		if let Some(v) = self.proof_recorder.get(key) {
+			return Ok(v);
 		}
-		let backend_value =  self.backend.get(key, prefix)?;
-		self.proof_recorder.write().insert(key.clone(), backend_value.clone());
+
+		let backend_value = self.backend.get(key, prefix)?;
+		self.proof_recorder.record(key.clone(), backend_value.clone());
 		Ok(backend_value)
 	}
 }
@@ -343,8 +399,8 @@ mod tests {
 		assert_eq!(trie_backend.storage(b"key").unwrap(), proving_backend.storage(b"key").unwrap());
 		assert_eq!(trie_backend.pairs(), proving_backend.pairs());
 
-		let (trie_root, mut trie_mdb) = trie_backend.storage_root(::std::iter::empty());
-		let (proving_root, mut proving_mdb) = proving_backend.storage_root(::std::iter::empty());
+		let (trie_root, mut trie_mdb) = trie_backend.storage_root(std::iter::empty());
+		let (proving_root, mut proving_mdb) = proving_backend.storage_root(std::iter::empty());
 		assert_eq!(trie_root, proving_root);
 		assert_eq!(trie_mdb.drain(), proving_mdb.drain());
 	}
@@ -405,7 +461,7 @@ mod tests {
 		));
 
 		let trie = in_memory.as_trie_backend().unwrap();
-		let trie_root = trie.storage_root(::std::iter::empty()).0;
+		let trie_root = trie.storage_root(std::iter::empty()).0;
 		assert_eq!(in_memory_root, trie_root);
 		(0..64).for_each(|i| assert_eq!(
 			trie.storage(&[i]).unwrap().unwrap(),
@@ -440,4 +496,35 @@ mod tests {
 			vec![64]
 		);
 	}
+
+	#[test]
+	fn storage_proof_encoded_size_estimation_works() {
+		let trie_backend = test_trie();
+		let backend = test_proving(&trie_backend);
+
+		let check_estimation = |backend: &ProvingBackend<'_, PrefixedMemoryDB<BlakeTwo256>, BlakeTwo256>| {
+			let storage_proof = backend.extract_proof();
+			let estimation = backend.0.essence()
+				.backend_storage()
+				.proof_recorder
+				.estimate_encoded_size();
+
+			assert_eq!(storage_proof.encoded_size(), estimation);
+		};
+
+		assert_eq!(backend.storage(b"key").unwrap(), Some(b"value".to_vec()));
+		check_estimation(&backend);
+
+		assert_eq!(backend.storage(b"value1").unwrap(), Some(vec![42]));
+		check_estimation(&backend);
+
+		assert_eq!(backend.storage(b"value2").unwrap(), Some(vec![24]));
+		check_estimation(&backend);
+
+		assert!(backend.storage(b"doesnotexist").unwrap().is_none());
+		check_estimation(&backend);
+
+		assert!(backend.storage(b"doesnotexist2").unwrap().is_none());
+		check_estimation(&backend);
+	}
 }
diff --git a/substrate/test-utils/runtime/src/lib.rs b/substrate/test-utils/runtime/src/lib.rs
index 837b3715c81..150bc403732 100644
--- a/substrate/test-utils/runtime/src/lib.rs
+++ b/substrate/test-utils/runtime/src/lib.rs
@@ -190,7 +190,7 @@ impl BlindCheckable for Extrinsic {
 					Err(InvalidTransaction::BadProof.into())
 				}
 			},
-			Extrinsic::IncludeData(_) => Err(InvalidTransaction::BadProof.into()),
+			Extrinsic::IncludeData(v) => Ok(Extrinsic::IncludeData(v)),
 			Extrinsic::StorageChange(key, value) => Ok(Extrinsic::StorageChange(key, value)),
 			Extrinsic::ChangesTrieConfigUpdate(new_config) =>
 				Ok(Extrinsic::ChangesTrieConfigUpdate(new_config)),
-- 
GitLab