diff --git a/substrate/bin/node/bench/src/construct.rs b/substrate/bin/node/bench/src/construct.rs
index 8469ec62893b534978535cfa21d2a75208e7d2ee..652466231714849ea81b769a340efc383aaff008 100644
--- a/substrate/bin/node/bench/src/construct.rs
+++ b/substrate/bin/node/bench/src/construct.rs
@@ -171,6 +171,7 @@ impl core::Benchmark for ConstructionBenchmark {
 				inherent_data_providers.create_inherent_data().expect("Create inherent data failed"),
 				Default::default(),
 				std::time::Duration::from_secs(20),
+				None,
 			),
 		).map(|r| r.block).expect("Proposing failed");
 
diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs
index ce0ffb2cecc0af4b0cb0bc34128a43842c3a4072..5fa7aa00df5615f0247161fd41484cfb4d06672b 100644
--- a/substrate/bin/node/cli/src/service.rs
+++ b/substrate/bin/node/cli/src/service.rs
@@ -679,6 +679,7 @@ mod tests {
 						inherent_data,
 						digest,
 						std::time::Duration::from_secs(1),
+						None,
 					).await
 				}).expect("Error making test block").block;
 
diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs
index 910abfad5ae1eb944369f3066df8f55047320c59..c8277d3b5d32c2985fb88830590ce0b3bc4d04c4 100644
--- a/substrate/client/basic-authorship/src/basic_authorship.rs
+++ b/substrate/client/basic-authorship/src/basic_authorship.rs
@@ -22,7 +22,7 @@
 
 use std::{pin::Pin, time, sync::Arc};
 use sc_client_api::backend;
-use codec::Decode;
+use codec::{Decode, Encode};
 use sp_consensus::{evaluation, Proposal, ProofRecording, DisableProofRecording, EnableProofRecording};
 use sp_core::traits::SpawnNamed;
 use sp_inherents::InherentData;
@@ -42,14 +42,14 @@ use std::marker::PhantomData;
 use prometheus_endpoint::Registry as PrometheusRegistry;
 use sc_proposer_metrics::MetricsLink as PrometheusMetrics;
 
-/// Default maximum block size in bytes used by [`Proposer`].
+/// Default block size limit in bytes used by [`Proposer`].
 ///
-/// Can be overwritten by [`ProposerFactory::set_maximum_block_size`].
+/// Can be overwritten by [`ProposerFactory::set_block_size_limit`].
 ///
 /// Be aware that there is also an upper packet size on what the networking code
 /// will accept. If the block doesn't fit in such a package, it can not be
 /// transferred to other nodes.
-pub const DEFAULT_MAX_BLOCK_SIZE: usize = 4 * 1024 * 1024 + 512;
+pub const DEFAULT_BLOCK_SIZE_LIMIT: usize = 4 * 1024 * 1024 + 512;
 
 /// Proposer factory.
 pub struct ProposerFactory<A, B, C, PR> {
@@ -60,8 +60,14 @@ pub struct ProposerFactory<A, B, C, PR> {
 	transaction_pool: Arc<A>,
 	/// Prometheus Link,
 	metrics: PrometheusMetrics,
-	max_block_size: usize,
+	/// The default block size limit.
+	///
+	/// If no `block_size_limit` is passed to [`Proposer::propose`], this block size limit will be
+	/// used.
+	default_block_size_limit: usize,
 	telemetry: Option<TelemetryHandle>,
+	/// When estimating the block size, should the proof be included?
+	include_proof_in_block_size_estimation: bool,
 	/// phantom member to pin the `Backend`/`ProofRecording` type.
 	_phantom: PhantomData<(B, PR)>,
 }
@@ -81,9 +87,10 @@ impl<A, B, C> ProposerFactory<A, B, C, DisableProofRecording> {
 			spawn_handle: Box::new(spawn_handle),
 			transaction_pool,
 			metrics: PrometheusMetrics::new(prometheus),
-			max_block_size: DEFAULT_MAX_BLOCK_SIZE,
+			default_block_size_limit: DEFAULT_BLOCK_SIZE_LIMIT,
 			telemetry,
 			client,
+			include_proof_in_block_size_estimation: false,
 			_phantom: PhantomData,
 		}
 	}
@@ -93,6 +100,9 @@ impl<A, B, C> ProposerFactory<A, B, C, EnableProofRecording> {
 	/// Create a new proposer factory with proof recording enabled.
 	///
 	/// Each proposer created by this instance will record a proof while building a block.
+	///
+	/// This will also include the proof into the estimation of the block size. This can be disabled
+	/// by calling [`ProposerFactory::disable_proof_in_block_size_estimation`].
 	pub fn with_proof_recording(
 		spawn_handle: impl SpawnNamed + 'static,
 		client: Arc<C>,
@@ -101,24 +111,32 @@ impl<A, B, C> ProposerFactory<A, B, C, EnableProofRecording> {
 		telemetry: Option<TelemetryHandle>,
 	) -> Self {
 		ProposerFactory {
-			spawn_handle: Box::new(spawn_handle),
 			client,
+			spawn_handle: Box::new(spawn_handle),
 			transaction_pool,
 			metrics: PrometheusMetrics::new(prometheus),
-			max_block_size: DEFAULT_MAX_BLOCK_SIZE,
+			default_block_size_limit: DEFAULT_BLOCK_SIZE_LIMIT,
 			telemetry,
+			include_proof_in_block_size_estimation: true,
 			_phantom: PhantomData,
 		}
 	}
+
+	/// Disable the proof inclusion when estimating the block size.
+	pub fn disable_proof_in_block_size_estimation(&mut self) {
+		self.include_proof_in_block_size_estimation = false;
+	}
 }
 
 impl<A, B, C, PR> ProposerFactory<A, B, C, PR> {
-	/// Set the maximum block size in bytes.
+	/// Set the default block size limit in bytes.
+	///
+	/// The default value for the block size limit is:
+	/// [`DEFAULT_BLOCK_SIZE_LIMIT`].
 	///
-	/// The default value for the maximum block size is:
-	/// [`DEFAULT_MAX_BLOCK_SIZE`].
-	pub fn set_maximum_block_size(&mut self, size: usize) {
-		self.max_block_size = size;
+	/// If there is no block size limit passed to [`Proposer::propose`], this value will be used.
+	pub fn set_default_block_size_limit(&mut self, limit: usize) {
+		self.default_block_size_limit = limit;
 	}
 }
 
@@ -152,9 +170,10 @@ impl<B, Block, C, A, PR> ProposerFactory<A, B, C, PR>
 			transaction_pool: self.transaction_pool.clone(),
 			now,
 			metrics: self.metrics.clone(),
-			max_block_size: self.max_block_size,
+			default_block_size_limit: self.default_block_size_limit,
 			telemetry: self.telemetry.clone(),
 			_phantom: PhantomData,
+			include_proof_in_block_size_estimation: self.include_proof_in_block_size_estimation,
 		};
 
 		proposer
@@ -195,7 +214,8 @@ pub struct Proposer<B, Block: BlockT, C, A: TransactionPool, PR> {
 	transaction_pool: Arc<A>,
 	now: Box<dyn Fn() -> time::Instant + Send + Sync>,
 	metrics: PrometheusMetrics,
-	max_block_size: usize,
+	default_block_size_limit: usize,
+	include_proof_in_block_size_estimation: bool,
 	telemetry: Option<TelemetryHandle>,
 	_phantom: PhantomData<(B, PR)>,
 }
@@ -225,6 +245,7 @@ impl<A, B, Block, C, PR> sp_consensus::Proposer<Block> for
 		inherent_data: InherentData,
 		inherent_digests: DigestFor<Block>,
 		max_duration: time::Duration,
+		block_size_limit: Option<usize>,
 	) -> Self::Proposal {
 		let (tx, rx) = oneshot::channel();
 		let spawn_handle = self.spawn_handle.clone();
@@ -236,6 +257,7 @@ impl<A, B, Block, C, PR> sp_consensus::Proposer<Block> for
 				inherent_data,
 				inherent_digests,
 				deadline,
+				block_size_limit,
 			).await;
 			if tx.send(res).is_err() {
 				trace!("Could not send block production result to proposer!");
@@ -264,6 +286,7 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 		inherent_data: InherentData,
 		inherent_digests: DigestFor<Block>,
 		deadline: time::Instant,
+		block_size_limit: Option<usize>,
 	) -> Result<Proposal<Block, backend::TransactionFor<B, Block>, PR::Proof>, sp_blockchain::Error> {
 		/// If the block is full we will attempt to push at most
 		/// this number of transactions before quitting for real.
@@ -297,7 +320,9 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 		let mut unqueue_invalid = Vec::new();
 
 		let mut t1 = self.transaction_pool.ready_at(self.parent_number).fuse();
-		let mut t2 = futures_timer::Delay::new(deadline.saturating_duration_since((self.now)()) / 8).fuse();
+		let mut t2 = futures_timer::Delay::new(
+			deadline.saturating_duration_since((self.now)()) / 8,
+		).fuse();
 
 		let pending_iterator = select! {
 			res = t1 => res,
@@ -311,8 +336,13 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 			},
 		};
 
+		let block_size_limit = block_size_limit.unwrap_or(self.default_block_size_limit);
+
 		debug!("Attempting to push transactions from the pool.");
 		debug!("Pool status: {:?}", self.transaction_pool.status());
+		let mut transaction_pushed = false;
+		let mut hit_block_size_limit = false;
+
 		for pending_tx in pending_iterator {
 			if (self.now)() > deadline {
 				debug!(
@@ -324,9 +354,30 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 
 			let pending_tx_data = pending_tx.data().clone();
 			let pending_tx_hash = pending_tx.hash().clone();
+
+			let block_size = block_builder.estimate_block_size(
+				self.include_proof_in_block_size_estimation,
+			);
+			if block_size + pending_tx_data.encoded_size() > block_size_limit {
+				if skipped < MAX_SKIPPED_TRANSACTIONS {
+					skipped += 1;
+					debug!(
+						"Transaction would overflow the block size limit, \
+						 but will try {} more transactions before quitting.",
+						MAX_SKIPPED_TRANSACTIONS - skipped,
+					);
+					continue;
+				} else {
+					debug!("Reached block size limit, proceeding with proposing.");
+					hit_block_size_limit = true;
+					break;
+				}
+			}
+
 			trace!("[{:?}] Pushing to the block.", pending_tx_hash);
 			match sc_block_builder::BlockBuilder::push(&mut block_builder, pending_tx_data) {
 				Ok(()) => {
+					transaction_pushed = true;
 					debug!("[{:?}] Pushed to the block.", pending_tx_hash);
 				}
 				Err(ApplyExtrinsicFailed(Validity(e)))
@@ -356,6 +407,13 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 			}
 		}
 
+		if hit_block_size_limit && !transaction_pushed {
+			warn!(
+				"Hit block size limit of `{}` without including any transaction!",
+				block_size_limit,
+			);
+		}
+
 		self.transaction_pool.remove_invalid(&unqueue_invalid);
 
 		let (block, storage_changes, proof) = block_builder.build()?.into_inner();
@@ -367,7 +425,8 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 			}
 		);
 
-		info!("🎁 Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
+		info!(
+			"🎁 Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
 			block.header().number(),
 			<Block as BlockT>::Hash::from(block.header().hash()),
 			block.header().parent_hash(),
@@ -394,7 +453,6 @@ impl<A, B, Block, C, PR> Proposer<B, Block, C, A, PR>
 			&block,
 			&self.parent_hash,
 			self.parent_number,
-			self.max_block_size,
 		) {
 			error!("Failed to evaluate authored block: {:?}", err);
 		}
@@ -421,6 +479,7 @@ mod tests {
 	use sp_runtime::traits::NumberFor;
 	use sc_client_api::Backend;
 	use futures::executor::block_on;
+	use sp_consensus::Environment;
 
 	const SOURCE: TransactionSource = TransactionSource::External;
 
@@ -494,7 +553,7 @@ mod tests {
 		// when
 		let deadline = time::Duration::from_secs(3);
 		let block = block_on(
-			proposer.propose(Default::default(), Default::default(), deadline)
+			proposer.propose(Default::default(), Default::default(), deadline, None)
 		).map(|r| r.block).unwrap();
 
 		// then
@@ -540,7 +599,7 @@ mod tests {
 
 		let deadline = time::Duration::from_secs(1);
 		block_on(
-			proposer.propose(Default::default(), Default::default(), deadline)
+			proposer.propose(Default::default(), Default::default(), deadline, None)
 		).map(|r| r.block).unwrap();
 	}
 
@@ -587,7 +646,7 @@ mod tests {
 
 		let deadline = time::Duration::from_secs(9);
 		let proposal = block_on(
-			proposer.propose(Default::default(), Default::default(), deadline),
+			proposer.propose(Default::default(), Default::default(), deadline, None),
 		).unwrap();
 
 		assert_eq!(proposal.block.extrinsics().len(), 1);
@@ -669,7 +728,7 @@ mod tests {
 			// when
 			let deadline = time::Duration::from_secs(9);
 			let block = block_on(
-				proposer.propose(Default::default(), Default::default(), deadline)
+				proposer.propose(Default::default(), Default::default(), deadline, None)
 			).map(|r| r.block).unwrap();
 
 			// then
@@ -704,4 +763,82 @@ mod tests {
 		let block = propose_block(&client, 1, 2, 5);
 		block_on(client.import(BlockOrigin::Own, block)).unwrap();
 	}
+
+	#[test]
+	fn should_cease_building_block_when_block_limit_is_reached() {
+		let client = Arc::new(substrate_test_runtime_client::new());
+		let spawner = sp_core::testing::TaskExecutor::new();
+		let txpool = BasicPool::new_full(
+			Default::default(),
+			true.into(),
+			None,
+			spawner.clone(),
+			client.clone(),
+		);
+		let genesis_header = client.header(&BlockId::Number(0u64))
+			.expect("header get error")
+			.expect("there should be header");
+
+		let extrinsics_num = 4;
+		let extrinsics = (0..extrinsics_num)
+			.map(|v| Extrinsic::IncludeData(vec![v as u8; 10]))
+			.collect::<Vec<_>>();
+
+		let block_limit = genesis_header.encoded_size()
+			+ extrinsics.iter().take(extrinsics_num - 1).map(Encode::encoded_size).sum::<usize>()
+			+ Vec::<Extrinsic>::new().encoded_size();
+
+		block_on(
+			txpool.submit_at(&BlockId::number(0), SOURCE, extrinsics)
+		).unwrap();
+
+		block_on(txpool.maintain(chain_event(genesis_header.clone())));
+
+		let mut proposer_factory = ProposerFactory::new(
+			spawner.clone(),
+			client.clone(),
+			txpool.clone(),
+			None,
+			None,
+		);
+
+		let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
+
+		// Give it enough time
+		let deadline = time::Duration::from_secs(300);
+		let block = block_on(
+			proposer.propose(Default::default(), Default::default(), deadline, Some(block_limit))
+		).map(|r| r.block).unwrap();
+
+		// Based on the block limit, one transaction shouldn't be included.
+		assert_eq!(block.extrinsics().len(), extrinsics_num - 1);
+
+		let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
+
+		let block = block_on(
+			proposer.propose(Default::default(), Default::default(), deadline, None,
+		)).map(|r| r.block).unwrap();
+
+		// Without a block limit we should include all of them
+		assert_eq!(block.extrinsics().len(), extrinsics_num);
+
+		let mut proposer_factory = ProposerFactory::with_proof_recording(
+			spawner.clone(),
+			client.clone(),
+			txpool.clone(),
+			None,
+			None,
+		);
+
+		let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
+
+		// Give it enough time
+		let block = block_on(
+			proposer.propose(Default::default(), Default::default(), deadline, Some(block_limit))
+		).map(|r| r.block).unwrap();
+
+		// The block limit didn't changed, but we now include the proof in the estimation of the
+		// block size and thus, one less transaction should fit into the limit.
+		assert_eq!(block.extrinsics().len(), extrinsics_num - 2);
+	}
 }
diff --git a/substrate/client/basic-authorship/src/lib.rs b/substrate/client/basic-authorship/src/lib.rs
index acaf85db7633611092b93ac30d98602d5c286e03..133b833cdddc82cd55638af105da3bc861adaf86 100644
--- a/substrate/client/basic-authorship/src/lib.rs
+++ b/substrate/client/basic-authorship/src/lib.rs
@@ -62,6 +62,7 @@
 //! 	Default::default(),
 //! 	Default::default(),
 //! 	Duration::from_secs(2),
+//! 	None,
 //! );
 //!
 //! // We wait until the proposition is performed.
@@ -72,4 +73,4 @@
 
 mod basic_authorship;
 
-pub use crate::basic_authorship::{ProposerFactory, Proposer, DEFAULT_MAX_BLOCK_SIZE};
+pub use crate::basic_authorship::{ProposerFactory, Proposer, DEFAULT_BLOCK_SIZE_LIMIT};
diff --git a/substrate/client/block-builder/src/lib.rs b/substrate/client/block-builder/src/lib.rs
index 4893072a713771d66c0465055645b5e227ca2e8a..7d391f8fb85b3f52eef54a3c1ab5eeb096700659 100644
--- a/substrate/client/block-builder/src/lib.rs
+++ b/substrate/client/block-builder/src/lib.rs
@@ -135,6 +135,8 @@ pub struct BlockBuilder<'a, Block: BlockT, A: ProvideRuntimeApi<Block>, B> {
 	block_id: BlockId<Block>,
 	parent_hash: Block::Hash,
 	backend: &'a B,
+	/// The estimated size of the block header.
+	estimated_header_size: usize,
 }
 
 impl<'a, Block, A, B> BlockBuilder<'a, Block, A, B>
@@ -165,6 +167,8 @@ where
 			inherent_digests,
 		);
 
+		let estimated_header_size = header.encoded_size();
+
 		let mut api = api.runtime_api();
 
 		if record_proof.yes() {
@@ -183,6 +187,7 @@ where
 			api,
 			block_id,
 			backend,
+			estimated_header_size,
 		})
 	}
 
@@ -270,6 +275,20 @@ where
 			))
 		}).map_err(|e| Error::Application(Box::new(e)))
 	}
+
+	/// Estimate the size of the block in the current state.
+	///
+	/// If `include_proof` is `true`, the estimated size of the storage proof will be added
+	/// to the estimation.
+	pub fn estimate_block_size(&self, include_proof: bool) -> usize {
+		let size = self.estimated_header_size + self.extrinsics.encoded_size();
+
+		if include_proof {
+			size + self.api.proof_recorder().map(|pr| pr.estimate_encoded_size()).unwrap_or(0)
+		} else {
+			size
+		}
+	}
 }
 
 #[cfg(test)]
diff --git a/substrate/client/consensus/aura/src/lib.rs b/substrate/client/consensus/aura/src/lib.rs
index 77dac0f7544879e02586f661fbb291fb899126bc..3c72f359f8f15184451a807e93d4b4c6f6c9c8db 100644
--- a/substrate/client/consensus/aura/src/lib.rs
+++ b/substrate/client/consensus/aura/src/lib.rs
@@ -629,6 +629,7 @@ mod tests {
 			_: InherentData,
 			digests: DigestFor<TestBlock>,
 			_: Duration,
+			_: Option<usize>,
 		) -> Self::Proposal {
 			let r = self.1.new_block(digests).unwrap().build().map_err(|e| e.into());
 
@@ -887,6 +888,7 @@ mod tests {
 				ends_at: Instant::now() + Duration::from_secs(100),
 				inherent_data: InherentData::new(),
 				duration: Duration::from_millis(1000),
+				block_size_limit: None,
 			},
 		)).unwrap();
 
diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs
index 839d38b94a933bce5503c2725ea7607487b252d6..9949da61da579cece53b8572f1437b828b8b96cb 100644
--- a/substrate/client/consensus/babe/src/tests.rs
+++ b/substrate/client/consensus/babe/src/tests.rs
@@ -182,6 +182,7 @@ impl Proposer<TestBlock> for DummyProposer {
 		_: InherentData,
 		pre_digests: DigestFor<TestBlock>,
 		_: Duration,
+		_: Option<usize>,
 	) -> Self::Proposal {
 		self.propose_with(pre_digests)
 	}
diff --git a/substrate/client/consensus/manual-seal/src/seal_block.rs b/substrate/client/consensus/manual-seal/src/seal_block.rs
index 23a560cebd54b316ca1f4bcccc5d16bd865cfdfd..b21630f0377e2682f472d048cee84b24b5b1973d 100644
--- a/substrate/client/consensus/manual-seal/src/seal_block.rs
+++ b/substrate/client/consensus/manual-seal/src/seal_block.rs
@@ -127,6 +127,7 @@ pub async fn seal_block<B, BI, SC, C, E, P>(
 			id.clone(),
 			digest,
 			Duration::from_secs(MAX_PROPOSAL_DURATION),
+			None,
 		).map_err(|err| Error::StringError(format!("{:?}", err))).await?;
 
 		if proposal.block.extrinsics().len() == inherents_len && !create_empty {
diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs
index ea2e30afdc4857b53c104e00f67568e05a04ff88..bcbc2009321b839f6ba2e8d5cf149f8eb53006e7 100644
--- a/substrate/client/consensus/pow/src/lib.rs
+++ b/substrate/client/consensus/pow/src/lib.rs
@@ -669,6 +669,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
 				inherent_data,
 				inherent_digest,
 				build_time.clone(),
+				None,
 			).await {
 				Ok(x) => x,
 				Err(err) => {
diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs
index c1f13fea1f9effb5179a007e1474c1d5296024c0..5157f381e6f43847d35957e1aaee67fca1e4893c 100644
--- a/substrate/client/consensus/slots/src/lib.rs
+++ b/substrate/client/consensus/slots/src/lib.rs
@@ -313,6 +313,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
 				logs,
 			},
 			proposing_remaining_duration.mul_f32(0.98),
+			None,
 		).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)));
 
 		let proposal = match futures::future::select(proposing, proposing_remaining).await {
@@ -535,7 +536,7 @@ pub enum Error<T> where T: Debug {
 	SlotDurationInvalid(SlotDuration<T>),
 }
 
-/// A slot duration. Create with `get_or_compute`.
+/// A slot duration. Create with [`get_or_compute`](Self::get_or_compute).
 // The internal member should stay private here to maintain invariants of
 // `get_or_compute`.
 #[derive(Clone, Copy, Debug, Encode, Decode, Hash, PartialOrd, Ord, PartialEq, Eq)]
@@ -793,6 +794,7 @@ mod test {
 			timestamp: Default::default(),
 			inherent_data: Default::default(),
 			ends_at: Instant::now(),
+			block_size_limit: None,
 		}
 	}
 
diff --git a/substrate/client/consensus/slots/src/slots.rs b/substrate/client/consensus/slots/src/slots.rs
index d7ed1eda64c09919a80eb14457663c678dc50b8c..4057a6d0d15a9a4d04e9d1ef9e8395bc1e6ce4d7 100644
--- a/substrate/client/consensus/slots/src/slots.rs
+++ b/substrate/client/consensus/slots/src/slots.rs
@@ -58,6 +58,10 @@ pub struct SlotInfo {
 	pub inherent_data: InherentData,
 	/// Slot duration.
 	pub duration: Duration,
+	/// Some potential block size limit for the block to be authored at this slot.
+	///
+	/// For more information see [`Proposer::propose`](sp_consensus::Proposer::propose).
+	pub block_size_limit: Option<usize>,
 }
 
 impl SlotInfo {
@@ -69,12 +73,14 @@ impl SlotInfo {
 		timestamp: sp_timestamp::Timestamp,
 		inherent_data: InherentData,
 		duration: Duration,
+		block_size_limit: Option<usize>,
 	) -> Self {
 		Self {
 			slot,
 			timestamp,
 			inherent_data,
 			duration,
+			block_size_limit,
 			ends_at: Instant::now() + time_until_next(timestamp.as_duration(), duration),
 		}
 	}
@@ -147,6 +153,7 @@ impl<SC: SlotCompatible> Slots<SC> {
 					timestamp,
 					inherent_data,
 					self.slot_duration,
+					None,
 				))
 			}
 		}
diff --git a/substrate/client/db/src/bench.rs b/substrate/client/db/src/bench.rs
index 2704676207b00798442ab0f08d19f3d24e416ea0..a2501891b31e33cb22ba7192e8d980f833ad0647 100644
--- a/substrate/client/db/src/bench.rs
+++ b/substrate/client/db/src/bench.rs
@@ -23,7 +23,7 @@ use std::cell::{Cell, RefCell};
 use std::collections::HashMap;
 
 use hash_db::{Prefix, Hasher};
-use sp_trie::{MemoryDB, prefixed_key, StorageProof};
+use sp_trie::{MemoryDB, prefixed_key};
 use sp_core::{
 	storage::{ChildInfo, TrackedStorageKey},
 	hexdisplay::HexDisplay
@@ -34,7 +34,6 @@ use sp_state_machine::{
 	DBValue, backend::Backend as StateBackend, StorageCollection, ChildStorageCollection, ProofRecorder,
 };
 use kvdb::{KeyValueDB, DBTransaction};
-use codec::Encode;
 use crate::storage_cache::{CachingState, SharedCache, new_shared_cache};
 
 type DbState<B> = sp_state_machine::TrieBackend<
@@ -45,7 +44,7 @@ type State<B> = CachingState<DbState<B>, B>;
 
 struct StorageDb<Block: BlockT> {
 	db: Arc<dyn KeyValueDB>,
-	proof_recorder: Option<ProofRecorder<HashFor<Block>>>,
+	proof_recorder: Option<ProofRecorder<Block::Hash>>,
 	_block: std::marker::PhantomData<Block>,
 }
 
@@ -53,12 +52,12 @@ impl<Block: BlockT> sp_state_machine::Storage<HashFor<Block>> for StorageDb<Bloc
 	fn get(&self, key: &Block::Hash, prefix: Prefix) -> Result<Option<DBValue>, String> {
 		let prefixed_key = prefixed_key::<HashFor<Block>>(key, prefix);
 		if let Some(recorder) = &self.proof_recorder {
-			if let Some(v) = recorder.read().get(&key) {
+			if let Some(v) = recorder.get(&key) {
 				return Ok(v.clone());
 			}
 			let backend_value = self.db.get(0, &prefixed_key)
 				.map_err(|e| format!("Database backend error: {:?}", e))?;
-			recorder.write().insert(key.clone(), backend_value.clone());
+			recorder.record(key.clone(), backend_value.clone());
 			Ok(backend_value)
 		} else {
 			self.db.get(0, &prefixed_key)
@@ -117,7 +116,7 @@ pub struct BenchmarkingState<B: BlockT> {
 	child_key_tracker: RefCell<HashMap<Vec<u8>, HashMap<Vec<u8>, KeyTracker>>>,
 	read_write_tracker: RefCell<ReadWriteTracker>,
 	whitelist: RefCell<Vec<TrackedStorageKey>>,
-	proof_recorder: Option<ProofRecorder<HashFor<B>>>,
+	proof_recorder: Option<ProofRecorder<B::Hash>>,
 }
 
 impl<B: BlockT> BenchmarkingState<B> {
@@ -164,12 +163,10 @@ impl<B: BlockT> BenchmarkingState<B> {
 		*self.state.borrow_mut() = None;
 		let db = match self.db.take() {
 			Some(db) => db,
-			None => Arc::new(::kvdb_memorydb::create(1)),
+			None => Arc::new(kvdb_memorydb::create(1)),
 		};
 		self.db.set(Some(db.clone()));
-		if let Some(recorder) = &self.proof_recorder {
-			recorder.write().clear();
-		}
+		self.proof_recorder.as_ref().map(|r| r.reset());
 		let storage_db = Arc::new(StorageDb::<B> {
 			db,
 			proof_recorder: self.proof_recorder.clone(),
@@ -429,7 +426,8 @@ impl<B: BlockT> StateBackend<HashFor<B>> for BenchmarkingState<B> {
 		None
 	}
 
-	fn commit(&self,
+	fn commit(
+		&self,
 		storage_root: <HashFor<B> as Hasher>::Out,
 		mut transaction: Self::Transaction,
 		main_storage_changes: StorageCollection,
@@ -518,14 +516,7 @@ impl<B: BlockT> StateBackend<HashFor<B>> for BenchmarkingState<B> {
 	}
 
 	fn proof_size(&self) -> Option<u32> {
-		self.proof_recorder.as_ref().map(|recorder| {
-			let proof = StorageProof::new(recorder
-				.read()
-				.iter()
-				.filter_map(|(_k, v)| v.as_ref().map(|v| v.to_vec()))
-				.collect());
-			proof.encoded_size() as u32
-		})
+		self.proof_recorder.as_ref().map(|recorder| recorder.estimate_encoded_size() as u32)
 	}
 }
 
diff --git a/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs b/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs
index 2be8545a81d1da030d746a44ed2c0996ad3afaaf..642da2c465e66e522d8c53091a1a89be379f7f36 100644
--- a/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs
+++ b/substrate/primitives/api/proc-macro/src/impl_runtime_apis.rs
@@ -282,16 +282,14 @@ fn generate_runtime_api_base_structures() -> Result<TokenStream> {
 				self.recorder = Some(Default::default());
 			}
 
+			fn proof_recorder(&self) -> Option<#crate_::ProofRecorder<Block>> {
+				self.recorder.clone()
+			}
+
 			fn extract_proof(&mut self) -> Option<#crate_::StorageProof> {
 				self.recorder
 					.take()
-					.map(|recorder| {
-						let trie_nodes = recorder.read()
-							.iter()
-							.filter_map(|(_k, v)| v.as_ref().map(|v| v.to_vec()))
-							.collect();
-						#crate_::StorageProof::new(trie_nodes)
-					})
+					.map(|recorder| recorder.to_storage_proof())
 			}
 
 			fn into_storage_changes(
diff --git a/substrate/primitives/api/proc-macro/src/mock_impl_runtime_apis.rs b/substrate/primitives/api/proc-macro/src/mock_impl_runtime_apis.rs
index 62a03a59baacdc49991f8088ed47f94cffc39c59..383cd4f635ea299c6cbb8eb136827ce2adb6b27a 100644
--- a/substrate/primitives/api/proc-macro/src/mock_impl_runtime_apis.rs
+++ b/substrate/primitives/api/proc-macro/src/mock_impl_runtime_apis.rs
@@ -102,6 +102,10 @@ fn implement_common_api_traits(
 				unimplemented!("`extract_proof` not implemented for runtime api mocks")
 			}
 
+			fn proof_recorder(&self) -> Option<#crate_::ProofRecorder<#block_type>> {
+				unimplemented!("`proof_recorder` not implemented for runtime api mocks")
+			}
+
 			fn into_storage_changes(
 				&self,
 				_: &Self::StateBackend,
diff --git a/substrate/primitives/api/src/lib.rs b/substrate/primitives/api/src/lib.rs
index afb9af343ba6ce713736fae192c7c0c61d078ac4..155bb899a2ed596317c6fad2b0aae71902792ca7 100644
--- a/substrate/primitives/api/src/lib.rs
+++ b/substrate/primitives/api/src/lib.rs
@@ -362,7 +362,7 @@ pub use sp_api_proc_macro::mock_impl_runtime_apis;
 
 /// A type that records all accessed trie nodes and generates a proof out of it.
 #[cfg(feature = "std")]
-pub type ProofRecorder<B> = sp_state_machine::ProofRecorder<HashFor<B>>;
+pub type ProofRecorder<B> = sp_state_machine::ProofRecorder<<B as BlockT>::Hash>;
 
 /// A type that is used as cache for the storage transactions.
 #[cfg(feature = "std")]
@@ -471,6 +471,9 @@ pub trait ApiExt<Block: BlockT> {
 	/// If `record_proof` was not called before, this will return `None`.
 	fn extract_proof(&mut self) -> Option<StorageProof>;
 
+	/// Returns the current active proof recorder.
+	fn proof_recorder(&self) -> Option<ProofRecorder<Block>>;
+
 	/// Convert the api object into the storage changes that were done while executing runtime
 	/// api functions.
 	///
diff --git a/substrate/primitives/consensus/common/src/evaluation.rs b/substrate/primitives/consensus/common/src/evaluation.rs
index be930fa4a00165ef052ef10ccac42049abc829e2..c18c8b127f991b2d6a1f0dfcbf18bfa865083a94 100644
--- a/substrate/primitives/consensus/common/src/evaluation.rs
+++ b/substrate/primitives/consensus/common/src/evaluation.rs
@@ -39,9 +39,6 @@ pub enum Error {
 	/// Proposal had wrong number.
 	#[error("Proposal had wrong number. Expected {expected:?}, got {got:?}")]
 	WrongNumber { expected: BlockNumber, got: BlockNumber },
-	/// Proposal exceeded the maximum size.
-	#[error("Proposal size {block_size} exceeds maximum allowed size of {max_block_size}.")]
-	ProposalTooLarge { block_size: usize, max_block_size: usize },
 }
 
 /// Attempt to evaluate a substrate block as a node block, returning error
@@ -50,17 +47,12 @@ pub fn evaluate_initial<Block: BlockT>(
 	proposal: &Block,
 	parent_hash: &<Block as BlockT>::Hash,
 	parent_number: <<Block as BlockT>::Header as HeaderT>::Number,
-	max_block_size: usize,
 ) -> Result<()> {
 
 	let encoded = Encode::encode(proposal);
 	let proposal = Block::decode(&mut &encoded[..])
 		.map_err(|e| Error::BadProposalFormat(e))?;
 
-	if encoded.len() > max_block_size {
-		return Err(Error::ProposalTooLarge { max_block_size, block_size: encoded.len() })
-	}
-
 	if *parent_hash != *proposal.header().parent_hash() {
 		return Err(Error::WrongParentHash {
 			expected: format!("{:?}", *parent_hash),
diff --git a/substrate/primitives/consensus/common/src/lib.rs b/substrate/primitives/consensus/common/src/lib.rs
index 27a43dbe022082cad9d1dd6f412780fb558e1bc3..642b6b12e7d6f2df39830f936850928ffb0f7998 100644
--- a/substrate/primitives/consensus/common/src/lib.rs
+++ b/substrate/primitives/consensus/common/src/lib.rs
@@ -196,6 +196,13 @@ pub trait Proposer<B: BlockT> {
 	/// a maximum duration for building this proposal is given. If building the proposal takes
 	/// longer than this maximum, the proposal will be very likely discarded.
 	///
+	/// If `block_size_limit` is given, the proposer should push transactions until the block size
+	/// limit is hit. Depending on the `finalize_block` implementation of the runtime, it probably
+	/// incorporates other operations (that are happening after the block limit is hit). So,
+	/// when the block size estimation also includes a proof that is recorded alongside the block
+	/// production, the proof can still grow. This means that the `block_size_limit` should not be
+	/// the hard limit of what is actually allowed.
+	///
 	/// # Return
 	///
 	/// Returns a future that resolves to a [`Proposal`] or to [`Error`].
@@ -204,6 +211,7 @@ pub trait Proposer<B: BlockT> {
 		inherent_data: InherentData,
 		inherent_digests: DigestFor<B>,
 		max_duration: Duration,
+		block_size_limit: Option<usize>,
 	) -> Self::Proposal;
 }
 
diff --git a/substrate/primitives/state-machine/src/proving_backend.rs b/substrate/primitives/state-machine/src/proving_backend.rs
index 6b87aa12eb1af9f7b64a4f90c4d789b5a7ddc000..28672659fa10c5d8a03fcac136a6ea023f613cc4 100644
--- a/substrate/primitives/state-machine/src/proving_backend.rs
+++ b/substrate/primitives/state-machine/src/proving_backend.rs
@@ -17,9 +17,9 @@
 
 //! Proving state machine backend.
 
-use std::{sync::Arc, collections::HashMap};
+use std::{sync::Arc, collections::{HashMap, hash_map::Entry}};
 use parking_lot::RwLock;
-use codec::{Decode, Codec};
+use codec::{Decode, Codec, Encode};
 use log::debug;
 use hash_db::{Hasher, HashDB, EMPTY_PREFIX, Prefix};
 use sp_trie::{
@@ -109,9 +109,69 @@ impl<'a, S, H> ProvingBackendRecorder<'a, S, H>
 	}
 }
 
-/// Global proof recorder, act as a layer over a hash db for recording queried
-/// data.
-pub type ProofRecorder<H> = Arc<RwLock<HashMap<<H as Hasher>::Out, Option<DBValue>>>>;
+#[derive(Default)]
+struct ProofRecorderInner<Hash> {
+	/// All the records that we have stored so far.
+	records: HashMap<Hash, Option<DBValue>>,
+	/// The encoded size of all recorded values.
+	encoded_size: usize,
+}
+
+/// Global proof recorder, act as a layer over a hash db for recording queried data.
+#[derive(Clone, Default)]
+pub struct ProofRecorder<Hash> {
+	inner: Arc<RwLock<ProofRecorderInner<Hash>>>,
+}
+
+impl<Hash: std::hash::Hash + Eq> ProofRecorder<Hash> {
+	/// Record the given `key` => `val` combination.
+	pub fn record(&self, key: Hash, val: Option<DBValue>) {
+		let mut inner = self.inner.write();
+		let encoded_size = if let Entry::Vacant(entry) = inner.records.entry(key) {
+			let encoded_size = val.as_ref().map(Encode::encoded_size).unwrap_or(0);
+
+			entry.insert(val);
+			encoded_size
+		} else {
+			0
+		};
+
+		inner.encoded_size += encoded_size;
+	}
+
+	/// Returns the value at the given `key`.
+	pub fn get(&self, key: &Hash) -> Option<Option<DBValue>> {
+		self.inner.read().records.get(key).cloned()
+	}
+
+	/// Returns the estimated encoded size of the proof.
+	///
+	/// The estimation is maybe bigger (by in maximum 4 bytes), but never smaller than the actual
+	/// encoded proof.
+	pub fn estimate_encoded_size(&self) -> usize {
+		let inner = self.inner.read();
+		inner.encoded_size
+			+ codec::Compact(inner.records.len() as u32).encoded_size()
+	}
+
+	/// Convert into a [`StorageProof`].
+	pub fn to_storage_proof(&self) -> StorageProof {
+		let trie_nodes = self.inner.read()
+			.records
+			.iter()
+			.filter_map(|(_k, v)| v.as_ref().map(|v| v.to_vec()))
+			.collect();
+
+		StorageProof::new(trie_nodes)
+	}
+
+	/// Reset the internal state.
+	pub fn reset(&self) {
+		let mut inner = self.inner.write();
+		inner.records.clear();
+		inner.encoded_size = 0;
+	}
+}
 
 /// Patricia trie-based backend which also tracks all touched storage trie values.
 /// These can be sent to remote node and used as a proof of execution.
@@ -122,7 +182,7 @@ pub struct ProvingBackend<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> (
 /// Trie backend storage with its proof recorder.
 pub struct ProofRecorderBackend<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> {
 	backend: &'a S,
-	proof_recorder: ProofRecorder<H>,
+	proof_recorder: ProofRecorder<H::Out>,
 }
 
 impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> ProvingBackend<'a, S, H>
@@ -137,7 +197,7 @@ impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> ProvingBackend<'a, S, H>
 	/// Create new proving backend with the given recorder.
 	pub fn new_with_recorder(
 		backend: &'a TrieBackend<S, H>,
-		proof_recorder: ProofRecorder<H>,
+		proof_recorder: ProofRecorder<H::Out>,
 	) -> Self {
 		let essence = backend.essence();
 		let root = essence.root().clone();
@@ -150,12 +210,7 @@ impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> ProvingBackend<'a, S, H>
 
 	/// Extracting the gathered unordered proof.
 	pub fn extract_proof(&self) -> StorageProof {
-		let trie_nodes = self.0.essence().backend_storage().proof_recorder
-			.read()
-			.iter()
-			.filter_map(|(_k, v)| v.as_ref().map(|v| v.to_vec()))
-			.collect();
-		StorageProof::new(trie_nodes)
+		self.0.essence().backend_storage().proof_recorder.to_storage_proof()
 	}
 }
 
@@ -165,11 +220,12 @@ impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> TrieBackendStorage<H>
 	type Overlay = S::Overlay;
 
 	fn get(&self, key: &H::Out, prefix: Prefix) -> Result<Option<DBValue>, String> {
-		if let Some(v) = self.proof_recorder.read().get(key) {
-			return Ok(v.clone());
+		if let Some(v) = self.proof_recorder.get(key) {
+			return Ok(v);
 		}
-		let backend_value =  self.backend.get(key, prefix)?;
-		self.proof_recorder.write().insert(key.clone(), backend_value.clone());
+
+		let backend_value = self.backend.get(key, prefix)?;
+		self.proof_recorder.record(key.clone(), backend_value.clone());
 		Ok(backend_value)
 	}
 }
@@ -343,8 +399,8 @@ mod tests {
 		assert_eq!(trie_backend.storage(b"key").unwrap(), proving_backend.storage(b"key").unwrap());
 		assert_eq!(trie_backend.pairs(), proving_backend.pairs());
 
-		let (trie_root, mut trie_mdb) = trie_backend.storage_root(::std::iter::empty());
-		let (proving_root, mut proving_mdb) = proving_backend.storage_root(::std::iter::empty());
+		let (trie_root, mut trie_mdb) = trie_backend.storage_root(std::iter::empty());
+		let (proving_root, mut proving_mdb) = proving_backend.storage_root(std::iter::empty());
 		assert_eq!(trie_root, proving_root);
 		assert_eq!(trie_mdb.drain(), proving_mdb.drain());
 	}
@@ -405,7 +461,7 @@ mod tests {
 		));
 
 		let trie = in_memory.as_trie_backend().unwrap();
-		let trie_root = trie.storage_root(::std::iter::empty()).0;
+		let trie_root = trie.storage_root(std::iter::empty()).0;
 		assert_eq!(in_memory_root, trie_root);
 		(0..64).for_each(|i| assert_eq!(
 			trie.storage(&[i]).unwrap().unwrap(),
@@ -440,4 +496,35 @@ mod tests {
 			vec![64]
 		);
 	}
+
+	#[test]
+	fn storage_proof_encoded_size_estimation_works() {
+		let trie_backend = test_trie();
+		let backend = test_proving(&trie_backend);
+
+		let check_estimation = |backend: &ProvingBackend<'_, PrefixedMemoryDB<BlakeTwo256>, BlakeTwo256>| {
+			let storage_proof = backend.extract_proof();
+			let estimation = backend.0.essence()
+				.backend_storage()
+				.proof_recorder
+				.estimate_encoded_size();
+
+			assert_eq!(storage_proof.encoded_size(), estimation);
+		};
+
+		assert_eq!(backend.storage(b"key").unwrap(), Some(b"value".to_vec()));
+		check_estimation(&backend);
+
+		assert_eq!(backend.storage(b"value1").unwrap(), Some(vec![42]));
+		check_estimation(&backend);
+
+		assert_eq!(backend.storage(b"value2").unwrap(), Some(vec![24]));
+		check_estimation(&backend);
+
+		assert!(backend.storage(b"doesnotexist").unwrap().is_none());
+		check_estimation(&backend);
+
+		assert!(backend.storage(b"doesnotexist2").unwrap().is_none());
+		check_estimation(&backend);
+	}
 }
diff --git a/substrate/test-utils/runtime/src/lib.rs b/substrate/test-utils/runtime/src/lib.rs
index 837b3715c819286e365c6fe83decdf5eea77b5db..150bc403732c73b2781fdf4b8ca6fd5738476770 100644
--- a/substrate/test-utils/runtime/src/lib.rs
+++ b/substrate/test-utils/runtime/src/lib.rs
@@ -190,7 +190,7 @@ impl BlindCheckable for Extrinsic {
 					Err(InvalidTransaction::BadProof.into())
 				}
 			},
-			Extrinsic::IncludeData(_) => Err(InvalidTransaction::BadProof.into()),
+			Extrinsic::IncludeData(v) => Ok(Extrinsic::IncludeData(v)),
 			Extrinsic::StorageChange(key, value) => Ok(Extrinsic::StorageChange(key, value)),
 			Extrinsic::ChangesTrieConfigUpdate(new_config) =>
 				Ok(Extrinsic::ChangesTrieConfigUpdate(new_config)),