From 9be655701a9665a60ec9db2c95a0feabb2e12a32 Mon Sep 17 00:00:00 2001
From: Sebastian Kunert <skunert49@gmail.com>
Date: Thu, 19 Jan 2023 16:13:16 +0100
Subject: [PATCH] Notification-based block pinning (#13157)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Worker

* Reorganize and unpin onnotification drop

* Pin in state-db, pass block number

* Pin blocks in blockchain db

* Switch to reference counted LRU

* Disable pinning when we keep all blocks

* Fix pinning hint for state-db

* Remove pinning from backend layer

* Improve readability

* Add justifications to test

* Fix justification behaviour

* Remove debug prints

* Convert channels to tracing_unbounded

* Add comments to the test

* Documentation and Cleanup

* Move task start to client

* Simplify cache

* Improve test, remove unwanted log

* Add tracing logs, remove expect for block number

* Cleanup

* Add conversion method for unpin handle to Finalitynotification

* Revert unwanted changes

* Improve naming

* Make clippy happy

* Fix docs

Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>

* Use `NumberFor` instead of u64 in API

* Hand over weak reference to unpin worker task

* Unwanted

* &Hash -> Hash

* Remove number from interface, rename `_unpin_handle`, LOG_TARGET

* Move RwLock one layer up

* Apply code style suggestions

* Improve comments

* Replace lru crate by schnellru

* Only insert values for pinned items + better docs

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Improve comments, log target and test

Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Co-authored-by: Bastian Köcher <git@kchr.de>
---
 substrate/Cargo.lock                          |  52 +-
 substrate/client/api/src/backend.rs           |  22 +-
 substrate/client/api/src/client.rs            | 109 +++-
 substrate/client/api/src/in_mem.rs            |   6 +
 substrate/client/db/Cargo.toml                |   1 +
 substrate/client/db/src/lib.rs                | 531 ++++++++++++++++--
 .../client/db/src/pinned_blocks_cache.rs      | 231 ++++++++
 .../finality-grandpa/src/until_imported.rs    |  13 +-
 substrate/client/service/src/builder.rs       |   4 +-
 substrate/client/service/src/client/client.rs |  77 ++-
 substrate/test-utils/client/src/lib.rs        |   7 +-
 11 files changed, 954 insertions(+), 99 deletions(-)
 create mode 100644 substrate/client/db/src/pinned_blocks_cache.rs

diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock
index 44663827720..100e0d21520 100644
--- a/substrate/Cargo.lock
+++ b/substrate/Cargo.lock
@@ -137,6 +137,18 @@ dependencies = [
  "version_check",
 ]
 
+[[package]]
+name = "ahash"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bf6ccdb167abbf410dcb915cabd428929d7f6a04980b54a11f26a39f1c7f7107"
+dependencies = [
+ "cfg-if",
+ "getrandom 0.2.8",
+ "once_cell",
+ "version_check",
+]
+
 [[package]]
 name = "aho-corasick"
 version = "0.7.20"
@@ -1836,7 +1848,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7c24f403d068ad0b359e577a77f92392118be3f3c927538f2bb544a5ecd828c6"
 dependencies = [
  "curve25519-dalek 3.2.0",
- "hashbrown",
+ "hashbrown 0.12.3",
  "hex",
  "rand_core 0.6.4",
  "sha2 0.9.9",
@@ -2806,9 +2818,15 @@ version = "0.12.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
 dependencies = [
- "ahash",
+ "ahash 0.7.6",
 ]
 
+[[package]]
+name = "hashbrown"
+version = "0.13.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
+
 [[package]]
 name = "heck"
 version = "0.4.0"
@@ -3122,7 +3140,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
 dependencies = [
  "autocfg",
- "hashbrown",
+ "hashbrown 0.12.3",
  "serde",
 ]
 
@@ -4172,7 +4190,7 @@ version = "0.8.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "b6e8aaa3f231bb4bd57b84b2d5dc3ae7f350265df8aa96492e0bc394a1571909"
 dependencies = [
- "hashbrown",
+ "hashbrown 0.12.3",
 ]
 
 [[package]]
@@ -4310,7 +4328,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5e0c7cba9ce19ac7ffd2053ac9f49843bbd3f4318feedfd74e85c19d5fb0ba66"
 dependencies = [
  "hash-db",
- "hashbrown",
+ "hashbrown 0.12.3",
 ]
 
 [[package]]
@@ -5085,7 +5103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "21158b2c33aa6d4561f1c0a6ea283ca92bc54802a93b263e910746d679a7eb53"
 dependencies = [
  "crc32fast",
- "hashbrown",
+ "hashbrown 0.12.3",
  "indexmap",
  "memchr",
 ]
@@ -7991,6 +8009,7 @@ dependencies = [
  "rand 0.8.5",
  "sc-client-api",
  "sc-state-db",
+ "schnellru",
  "sp-arithmetic",
  "sp-blockchain",
  "sp-core",
@@ -8334,7 +8353,7 @@ dependencies = [
 name = "sc-finality-grandpa"
 version = "0.10.0-dev"
 dependencies = [
- "ahash",
+ "ahash 0.7.6",
  "array-bytes",
  "assert_matches",
  "async-trait",
@@ -8534,7 +8553,7 @@ dependencies = [
 name = "sc-network-gossip"
 version = "0.10.0-dev"
 dependencies = [
- "ahash",
+ "ahash 0.7.6",
  "futures",
  "futures-timer",
  "libp2p",
@@ -9132,6 +9151,17 @@ dependencies = [
  "windows-sys 0.36.1",
 ]
 
+[[package]]
+name = "schnellru"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d"
+dependencies = [
+ "ahash 0.8.2",
+ "cfg-if",
+ "hashbrown 0.13.2",
+]
+
 [[package]]
 name = "schnorrkel"
 version = "0.9.1"
@@ -10275,11 +10305,11 @@ dependencies = [
 name = "sp-trie"
 version = "7.0.0"
 dependencies = [
- "ahash",
+ "ahash 0.7.6",
  "array-bytes",
  "criterion",
  "hash-db",
- "hashbrown",
+ "hashbrown 0.12.3",
  "lazy_static",
  "lru",
  "memory-db",
@@ -11279,7 +11309,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "004e1e8f92535694b4cb1444dc5a8073ecf0815e3357f729638b9f8fc4062908"
 dependencies = [
  "hash-db",
- "hashbrown",
+ "hashbrown 0.12.3",
  "log",
  "rustc-hex",
  "smallvec",
diff --git a/substrate/client/api/src/backend.rs b/substrate/client/api/src/backend.rs
index 21d213ffb15..4ef609bdd45 100644
--- a/substrate/client/api/src/backend.rs
+++ b/substrate/client/api/src/backend.rs
@@ -436,12 +436,24 @@ pub trait StorageProvider<Block: BlockT, B: Backend<Block>> {
 ///
 /// Manages the data layer.
 ///
-/// Note on state pruning: while an object from `state_at` is alive, the state
+/// # State Pruning
+///
+/// While an object from `state_at` is alive, the state
 /// should not be pruned. The backend should internally reference-count
 /// its state objects.
 ///
 /// The same applies for live `BlockImportOperation`s: while an import operation building on a
 /// parent `P` is alive, the state for `P` should not be pruned.
+///
+/// # Block Pruning
+///
+/// Users can pin blocks in memory by calling `pin_block`. When
+/// a block would be pruned, its value is kept in an in-memory cache
+/// until it is unpinned via `unpin_block`.
+///
+/// While a block is pinned, its state is also preserved.
+///
+/// The backend should internally reference count the number of pin / unpin calls.
 pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
 	/// Associated block insertion operation type.
 	type BlockImportOperation: BlockImportOperation<Block, State = Self::State>;
@@ -502,6 +514,14 @@ pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
 	/// Returns a handle to offchain storage.
 	fn offchain_storage(&self) -> Option<Self::OffchainStorage>;
 
+	/// Pin the block to keep body, justification and state available after pruning.
+	/// Number of pins are reference counted. Users need to make sure to perform
+	/// one call to [`Self::unpin_block`] per call to [`Self::pin_block`].
+	fn pin_block(&self, hash: Block::Hash) -> sp_blockchain::Result<()>;
+
+	/// Unpin the block to allow pruning.
+	fn unpin_block(&self, hash: Block::Hash);
+
 	/// Returns true if state for given block is available.
 	fn have_state_at(&self, hash: Block::Hash, _number: NumberFor<Block>) -> bool {
 		self.state_at(hash).is_ok()
diff --git a/substrate/client/api/src/client.rs b/substrate/client/api/src/client.rs
index 0d00257fa7b..8e7ceb68704 100644
--- a/substrate/client/api/src/client.rs
+++ b/substrate/client/api/src/client.rs
@@ -30,7 +30,7 @@ use std::{collections::HashSet, fmt, sync::Arc};
 use crate::{blockchain::Info, notifications::StorageEventStream, FinalizeSummary, ImportSummary};
 
 use sc_transaction_pool_api::ChainEvent;
-use sc_utils::mpsc::TracingUnboundedReceiver;
+use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
 use sp_blockchain;
 
 /// Type that implements `futures::Stream` of block import events.
@@ -264,6 +264,53 @@ impl fmt::Display for UsageInfo {
 	}
 }
 
+/// Sends a message to the pinning-worker once dropped to unpin a block in the backend.
+#[derive(Debug)]
+pub struct UnpinHandleInner<Block: BlockT> {
+	/// Hash of the block pinned by this handle
+	hash: Block::Hash,
+	unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
+}
+
+impl<Block: BlockT> UnpinHandleInner<Block> {
+	/// Create a new [`UnpinHandleInner`]
+	pub fn new(
+		hash: Block::Hash,
+		unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
+	) -> Self {
+		Self { hash, unpin_worker_sender }
+	}
+}
+
+impl<Block: BlockT> Drop for UnpinHandleInner<Block> {
+	fn drop(&mut self) {
+		if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) {
+			log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err);
+		};
+	}
+}
+
+/// Keeps a specific block pinned while the handle is alive.
+/// Once the last handle instance for a given block is dropped, the
+/// block is unpinned in the [`Backend`](crate::backend::Backend::unpin_block).
+#[derive(Debug, Clone)]
+pub struct UnpinHandle<Block: BlockT>(Arc<UnpinHandleInner<Block>>);
+
+impl<Block: BlockT> UnpinHandle<Block> {
+	/// Create a new [`UnpinHandle`]
+	pub fn new(
+		hash: Block::Hash,
+		unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
+	) -> UnpinHandle<Block> {
+		UnpinHandle(Arc::new(UnpinHandleInner::new(hash, unpin_worker_sender)))
+	}
+
+	/// Hash of the block this handle is unpinning on drop
+	pub fn hash(&self) -> Block::Hash {
+		self.0.hash
+	}
+}
+
 /// Summary of an imported block
 #[derive(Clone, Debug)]
 pub struct BlockImportNotification<Block: BlockT> {
@@ -279,6 +326,36 @@ pub struct BlockImportNotification<Block: BlockT> {
 	///
 	/// If `None`, there was no re-org while importing.
 	pub tree_route: Option<Arc<sp_blockchain::TreeRoute<Block>>>,
+	/// Handle to unpin the block this notification is for
+	unpin_handle: UnpinHandle<Block>,
+}
+
+impl<Block: BlockT> BlockImportNotification<Block> {
+	/// Create new notification
+	pub fn new(
+		hash: Block::Hash,
+		origin: BlockOrigin,
+		header: Block::Header,
+		is_new_best: bool,
+		tree_route: Option<Arc<sp_blockchain::TreeRoute<Block>>>,
+		unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
+	) -> Self {
+		Self {
+			hash,
+			origin,
+			header,
+			is_new_best,
+			tree_route,
+			unpin_handle: UnpinHandle::new(hash, unpin_worker_sender),
+		}
+	}
+
+	/// Consume this notification and extract the unpin handle.
+	///
+	/// Note: Only use this if you want to keep the block pinned in the backend.
+	pub fn into_unpin_handle(self) -> UnpinHandle<Block> {
+		self.unpin_handle
+	}
 }
 
 /// Summary of a finalized block.
@@ -294,6 +371,8 @@ pub struct FinalityNotification<Block: BlockT> {
 	pub tree_route: Arc<[Block::Hash]>,
 	/// Stale branches heads.
 	pub stale_heads: Arc<[Block::Hash]>,
+	/// Handle to unpin the block this notification is for
+	unpin_handle: UnpinHandle<Block>,
 }
 
 impl<B: BlockT> TryFrom<BlockImportNotification<B>> for ChainEvent<B> {
@@ -314,26 +393,44 @@ impl<B: BlockT> From<FinalityNotification<B>> for ChainEvent<B> {
 	}
 }
 
-impl<B: BlockT> From<FinalizeSummary<B>> for FinalityNotification<B> {
-	fn from(mut summary: FinalizeSummary<B>) -> Self {
+impl<Block: BlockT> FinalityNotification<Block> {
+	/// Create finality notification from finality summary.
+	pub fn from_summary(
+		mut summary: FinalizeSummary<Block>,
+		unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
+	) -> FinalityNotification<Block> {
 		let hash = summary.finalized.pop().unwrap_or_default();
 		FinalityNotification {
 			hash,
 			header: summary.header,
 			tree_route: Arc::from(summary.finalized),
 			stale_heads: Arc::from(summary.stale_heads),
+			unpin_handle: UnpinHandle::new(hash, unpin_worker_sender),
 		}
 	}
+
+	/// Consume this notification and extract the unpin handle.
+	///
+	/// Note: Only use this if you want to keep the block pinned in the backend.
+	pub fn into_unpin_handle(self) -> UnpinHandle<Block> {
+		self.unpin_handle
+	}
 }
 
-impl<B: BlockT> From<ImportSummary<B>> for BlockImportNotification<B> {
-	fn from(summary: ImportSummary<B>) -> Self {
+impl<Block: BlockT> BlockImportNotification<Block> {
+	/// Create finality notification from finality summary.
+	pub fn from_summary(
+		summary: ImportSummary<Block>,
+		unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
+	) -> BlockImportNotification<Block> {
+		let hash = summary.hash;
 		BlockImportNotification {
-			hash: summary.hash,
+			hash,
 			origin: summary.origin,
 			header: summary.header,
 			is_new_best: summary.is_new_best,
 			tree_route: summary.tree_route.map(Arc::new),
+			unpin_handle: UnpinHandle::new(hash, unpin_worker_sender),
 		}
 	}
 }
diff --git a/substrate/client/api/src/in_mem.rs b/substrate/client/api/src/in_mem.rs
index 144aa352f55..5e82757e7d9 100644
--- a/substrate/client/api/src/in_mem.rs
+++ b/substrate/client/api/src/in_mem.rs
@@ -788,6 +788,12 @@ where
 	fn requires_full_sync(&self) -> bool {
 		false
 	}
+
+	fn pin_block(&self, _: <Block as BlockT>::Hash) -> blockchain::Result<()> {
+		Ok(())
+	}
+
+	fn unpin_block(&self, _: <Block as BlockT>::Hash) {}
 }
 
 impl<Block: BlockT> backend::LocalBackend<Block> for Backend<Block> where Block::Hash: Ord {}
diff --git a/substrate/client/db/Cargo.toml b/substrate/client/db/Cargo.toml
index 562a94f1900..2d7291a014d 100644
--- a/substrate/client/db/Cargo.toml
+++ b/substrate/client/db/Cargo.toml
@@ -26,6 +26,7 @@ parity-db = "0.4.2"
 parking_lot = "0.12.1"
 sc-client-api = { version = "4.0.0-dev", path = "../api" }
 sc-state-db = { version = "0.10.0-dev", path = "../state-db" }
+schnellru = "0.2.1"
 sp-arithmetic = { version = "6.0.0", path = "../../primitives/arithmetic" }
 sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
 sp-core = { version = "7.0.0", path = "../../primitives/core" }
diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs
index 99fa786e040..09ccfef1cc2 100644
--- a/substrate/client/db/src/lib.rs
+++ b/substrate/client/db/src/lib.rs
@@ -34,6 +34,7 @@ pub mod bench;
 
 mod children;
 mod parity_db;
+mod pinned_blocks_cache;
 mod record_stats_state;
 mod stats;
 #[cfg(any(feature = "rocksdb", test))]
@@ -51,6 +52,7 @@ use std::{
 };
 
 use crate::{
+	pinned_blocks_cache::PinnedBlocksCache,
 	record_stats_state::RecordStatsState,
 	stats::StateUsageStats,
 	utils::{meta_keys, read_db, read_meta, DatabaseType, Meta},
@@ -481,6 +483,7 @@ pub struct BlockchainDb<Block: BlockT> {
 	leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
 	header_metadata_cache: Arc<HeaderMetadataCache<Block>>,
 	header_cache: Mutex<LinkedHashMap<Block::Hash, Option<Block::Header>>>,
+	pinned_blocks_cache: Arc<RwLock<PinnedBlocksCache<Block>>>,
 }
 
 impl<Block: BlockT> BlockchainDb<Block> {
@@ -493,6 +496,7 @@ impl<Block: BlockT> BlockchainDb<Block> {
 			meta: Arc::new(RwLock::new(meta)),
 			header_metadata_cache: Arc::new(HeaderMetadataCache::default()),
 			header_cache: Default::default(),
+			pinned_blocks_cache: Arc::new(RwLock::new(PinnedBlocksCache::new())),
 		})
 	}
 
@@ -521,62 +525,83 @@ impl<Block: BlockT> BlockchainDb<Block> {
 		let mut meta = self.meta.write();
 		meta.block_gap = gap;
 	}
-}
 
-impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for BlockchainDb<Block> {
-	fn header(&self, hash: Block::Hash) -> ClientResult<Option<Block::Header>> {
-		let mut cache = self.header_cache.lock();
-		if let Some(result) = cache.get_refresh(&hash) {
-			return Ok(result.clone())
+	/// Empty the cache of pinned items.
+	fn clear_pinning_cache(&self) {
+		self.pinned_blocks_cache.write().clear();
+	}
+
+	/// Load a justification into the cache of pinned items.
+	/// Reference count of the item will not be increased. Use this
+	/// to load values for items into the cache which have already been pinned.
+	fn insert_justifications_if_pinned(&self, hash: Block::Hash, justification: Justification) {
+		let mut cache = self.pinned_blocks_cache.write();
+		if !cache.contains(hash) {
+			return
 		}
-		let header = utils::read_header(
-			&*self.db,
-			columns::KEY_LOOKUP,
-			columns::HEADER,
-			BlockId::<Block>::Hash(hash),
-		)?;
-		cache_header(&mut cache, hash, header.clone());
-		Ok(header)
+
+		let justifications = Justifications::from(justification);
+		cache.insert_justifications(hash, Some(justifications));
 	}
 
-	fn info(&self) -> sc_client_api::blockchain::Info<Block> {
-		let meta = self.meta.read();
-		sc_client_api::blockchain::Info {
-			best_hash: meta.best_hash,
-			best_number: meta.best_number,
-			genesis_hash: meta.genesis_hash,
-			finalized_hash: meta.finalized_hash,
-			finalized_number: meta.finalized_number,
-			finalized_state: meta.finalized_state,
-			number_leaves: self.leaves.read().count(),
-			block_gap: meta.block_gap,
+	/// Load a justification from the db into the cache of pinned items.
+	/// Reference count of the item will not be increased. Use this
+	/// to load values for items into the cache which have already been pinned.
+	fn insert_persisted_justifications_if_pinned(&self, hash: Block::Hash) -> ClientResult<()> {
+		let mut cache = self.pinned_blocks_cache.write();
+		if !cache.contains(hash) {
+			return Ok(())
 		}
+
+		let justifications = self.justifications_uncached(hash)?;
+		cache.insert_justifications(hash, justifications);
+		Ok(())
 	}
 
-	fn status(&self, hash: Block::Hash) -> ClientResult<sc_client_api::blockchain::BlockStatus> {
-		match self.header(hash)?.is_some() {
-			true => Ok(sc_client_api::blockchain::BlockStatus::InChain),
-			false => Ok(sc_client_api::blockchain::BlockStatus::Unknown),
+	/// Load a block body from the db into the cache of pinned items.
+	/// Reference count of the item will not be increased. Use this
+	/// to load values for items items into the cache which have already been pinned.
+	fn insert_persisted_body_if_pinned(&self, hash: Block::Hash) -> ClientResult<()> {
+		let mut cache = self.pinned_blocks_cache.write();
+		if !cache.contains(hash) {
+			return Ok(())
 		}
+
+		let body = self.body_uncached(hash)?;
+		cache.insert_body(hash, body);
+		Ok(())
 	}
 
-	fn number(&self, hash: Block::Hash) -> ClientResult<Option<NumberFor<Block>>> {
-		Ok(self.header_metadata(hash).ok().map(|header_metadata| header_metadata.number))
+	/// Bump reference count for pinned item.
+	fn bump_ref(&self, hash: Block::Hash) {
+		self.pinned_blocks_cache.write().pin(hash);
 	}
 
-	fn hash(&self, number: NumberFor<Block>) -> ClientResult<Option<Block::Hash>> {
-		Ok(utils::read_header::<Block>(
+	/// Decrease reference count for pinned item and remove if reference count is 0.
+	fn unpin(&self, hash: Block::Hash) {
+		self.pinned_blocks_cache.write().unpin(hash);
+	}
+
+	fn justifications_uncached(&self, hash: Block::Hash) -> ClientResult<Option<Justifications>> {
+		match read_db(
 			&*self.db,
 			columns::KEY_LOOKUP,
-			columns::HEADER,
-			BlockId::Number(number),
-		)?
-		.map(|header| header.hash()))
+			columns::JUSTIFICATIONS,
+			BlockId::<Block>::Hash(hash),
+		)? {
+			Some(justifications) => match Decode::decode(&mut &justifications[..]) {
+				Ok(justifications) => Ok(Some(justifications)),
+				Err(err) =>
+					return Err(sp_blockchain::Error::Backend(format!(
+						"Error decoding justifications: {}",
+						err
+					))),
+			},
+			None => Ok(None),
+		}
 	}
-}
 
-impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<Block> {
-	fn body(&self, hash: Block::Hash) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
+	fn body_uncached(&self, hash: Block::Hash) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
 		if let Some(body) =
 			read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, BlockId::Hash::<Block>(hash))?
 		{
@@ -640,24 +665,77 @@ impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<B
 		}
 		Ok(None)
 	}
+}
 
-	fn justifications(&self, hash: Block::Hash) -> ClientResult<Option<Justifications>> {
-		match read_db(
+impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for BlockchainDb<Block> {
+	fn header(&self, hash: Block::Hash) -> ClientResult<Option<Block::Header>> {
+		let mut cache = self.header_cache.lock();
+		if let Some(result) = cache.get_refresh(&hash) {
+			return Ok(result.clone())
+		}
+		let header = utils::read_header(
 			&*self.db,
 			columns::KEY_LOOKUP,
-			columns::JUSTIFICATIONS,
+			columns::HEADER,
 			BlockId::<Block>::Hash(hash),
-		)? {
-			Some(justifications) => match Decode::decode(&mut &justifications[..]) {
-				Ok(justifications) => Ok(Some(justifications)),
-				Err(err) =>
-					return Err(sp_blockchain::Error::Backend(format!(
-						"Error decoding justifications: {}",
-						err
-					))),
-			},
-			None => Ok(None),
+		)?;
+		cache_header(&mut cache, hash, header.clone());
+		Ok(header)
+	}
+
+	fn info(&self) -> sc_client_api::blockchain::Info<Block> {
+		let meta = self.meta.read();
+		sc_client_api::blockchain::Info {
+			best_hash: meta.best_hash,
+			best_number: meta.best_number,
+			genesis_hash: meta.genesis_hash,
+			finalized_hash: meta.finalized_hash,
+			finalized_number: meta.finalized_number,
+			finalized_state: meta.finalized_state,
+			number_leaves: self.leaves.read().count(),
+			block_gap: meta.block_gap,
+		}
+	}
+
+	fn status(&self, hash: Block::Hash) -> ClientResult<sc_client_api::blockchain::BlockStatus> {
+		match self.header(hash)?.is_some() {
+			true => Ok(sc_client_api::blockchain::BlockStatus::InChain),
+			false => Ok(sc_client_api::blockchain::BlockStatus::Unknown),
+		}
+	}
+
+	fn number(&self, hash: Block::Hash) -> ClientResult<Option<NumberFor<Block>>> {
+		Ok(self.header_metadata(hash).ok().map(|header_metadata| header_metadata.number))
+	}
+
+	fn hash(&self, number: NumberFor<Block>) -> ClientResult<Option<Block::Hash>> {
+		Ok(utils::read_header::<Block>(
+			&*self.db,
+			columns::KEY_LOOKUP,
+			columns::HEADER,
+			BlockId::Number(number),
+		)?
+		.map(|header| header.hash()))
+	}
+}
+
+impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<Block> {
+	fn body(&self, hash: Block::Hash) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
+		let cache = self.pinned_blocks_cache.read();
+		if let Some(result) = cache.body(&hash) {
+			return Ok(result.clone())
+		}
+
+		self.body_uncached(hash)
+	}
+
+	fn justifications(&self, hash: Block::Hash) -> ClientResult<Option<Justifications>> {
+		let cache = self.pinned_blocks_cache.read();
+		if let Some(result) = cache.justifications(&hash) {
+			return Ok(result.clone())
 		}
+
+		self.justifications_uncached(hash)
 	}
 
 	fn last_finalized(&self) -> ClientResult<Block::Hash> {
@@ -1291,20 +1369,28 @@ impl<Block: BlockT> Backend<Block> {
 		header: &Block::Header,
 		last_finalized: Option<Block::Hash>,
 		justification: Option<Justification>,
+		current_transaction_justifications: &mut HashMap<Block::Hash, Justification>,
 	) -> ClientResult<MetaUpdate<Block>> {
 		// TODO: ensure best chain contains this block.
 		let number = *header.number();
 		self.ensure_sequential_finalization(header, last_finalized)?;
 		let with_state = sc_client_api::Backend::have_state_at(self, hash, number);
 
-		self.note_finalized(transaction, header, hash, with_state)?;
+		self.note_finalized(
+			transaction,
+			header,
+			hash,
+			with_state,
+			current_transaction_justifications,
+		)?;
 
 		if let Some(justification) = justification {
 			transaction.set_from_vec(
 				columns::JUSTIFICATIONS,
 				&utils::number_and_hash_to_lookup_key(number, hash)?,
-				Justifications::from(justification).encode(),
+				Justifications::from(justification.clone()).encode(),
 			);
+			current_transaction_justifications.insert(hash, justification);
 		}
 		Ok(MetaUpdate { hash, number, is_best: false, is_finalized: true, with_state })
 	}
@@ -1371,6 +1457,8 @@ impl<Block: BlockT> Backend<Block> {
 			(meta.best_number, meta.finalized_hash, meta.finalized_number, meta.block_gap)
 		};
 
+		let mut current_transaction_justifications: HashMap<Block::Hash, Justification> =
+			HashMap::new();
 		for (block_hash, justification) in operation.finalized_blocks {
 			let block_header = self.blockchain.expect_header(block_hash)?;
 			meta_updates.push(self.finalize_block_with_transaction(
@@ -1379,6 +1467,7 @@ impl<Block: BlockT> Backend<Block> {
 				&block_header,
 				Some(last_finalized_hash),
 				justification,
+				&mut current_transaction_justifications,
 			)?);
 			last_finalized_hash = block_hash;
 			last_finalized_num = *block_header.number();
@@ -1551,7 +1640,14 @@ impl<Block: BlockT> Backend<Block> {
 			if finalized {
 				// TODO: ensure best chain contains this block.
 				self.ensure_sequential_finalization(header, Some(last_finalized_hash))?;
-				self.note_finalized(&mut transaction, header, hash, operation.commit_state)?;
+				let mut current_transaction_justifications = HashMap::new();
+				self.note_finalized(
+					&mut transaction,
+					header,
+					hash,
+					operation.commit_state,
+					&mut current_transaction_justifications,
+				)?;
 			} else {
 				// canonicalize blocks which are old enough, regardless of finality.
 				self.force_delayed_canonicalize(&mut transaction)?
@@ -1684,6 +1780,7 @@ impl<Block: BlockT> Backend<Block> {
 		f_header: &Block::Header,
 		f_hash: Block::Hash,
 		with_state: bool,
+		current_transaction_justifications: &mut HashMap<Block::Hash, Justification>,
 	) -> ClientResult<()> {
 		let f_num = *f_header.number();
 
@@ -1709,7 +1806,7 @@ impl<Block: BlockT> Backend<Block> {
 		}
 
 		let new_displaced = self.blockchain.leaves.write().finalize_height(f_num);
-		self.prune_blocks(transaction, f_num, &new_displaced)?;
+		self.prune_blocks(transaction, f_num, &new_displaced, current_transaction_justifications)?;
 
 		Ok(())
 	}
@@ -1717,22 +1814,39 @@ impl<Block: BlockT> Backend<Block> {
 	fn prune_blocks(
 		&self,
 		transaction: &mut Transaction<DbHash>,
-		finalized: NumberFor<Block>,
+		finalized_number: NumberFor<Block>,
 		displaced: &FinalizationOutcome<Block::Hash, NumberFor<Block>>,
+		current_transaction_justifications: &mut HashMap<Block::Hash, Justification>,
 	) -> ClientResult<()> {
 		match self.blocks_pruning {
 			BlocksPruning::KeepAll => {},
 			BlocksPruning::Some(blocks_pruning) => {
 				// Always keep the last finalized block
 				let keep = std::cmp::max(blocks_pruning, 1);
-				if finalized >= keep.into() {
-					let number = finalized.saturating_sub(keep.into());
+				if finalized_number >= keep.into() {
+					let number = finalized_number.saturating_sub(keep.into());
+
+					// Before we prune a block, check if it is pinned
+					if let Some(hash) = self.blockchain.hash(number)? {
+						self.blockchain.insert_persisted_body_if_pinned(hash)?;
+
+						// If the block was finalized in this transaction, it will not be in the db
+						// yet.
+						if let Some(justification) =
+							current_transaction_justifications.remove(&hash)
+						{
+							self.blockchain.insert_justifications_if_pinned(hash, justification);
+						} else {
+							self.blockchain.insert_persisted_justifications_if_pinned(hash)?;
+						}
+					};
+
 					self.prune_block(transaction, BlockId::<Block>::number(number))?;
 				}
-				self.prune_displaced_branches(transaction, finalized, displaced)?;
+				self.prune_displaced_branches(transaction, finalized_number, displaced)?;
 			},
 			BlocksPruning::KeepFinalized => {
-				self.prune_displaced_branches(transaction, finalized, displaced)?;
+				self.prune_displaced_branches(transaction, finalized_number, displaced)?;
 			},
 		}
 		Ok(())
@@ -1755,6 +1869,8 @@ impl<Block: BlockT> Backend<Block> {
 			while self.blockchain.hash(number)? != Some(hash) {
 				match self.blockchain.header(hash)? {
 					Some(header) => {
+						self.blockchain.insert_persisted_body_if_pinned(hash)?;
+
 						self.prune_block(transaction, BlockId::<Block>::hash(hash))?;
 						number = header.number().saturating_sub(One::one());
 						hash = *header.parent_hash();
@@ -1985,6 +2101,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
 				.state_db
 				.reset(state_meta_db)
 				.map_err(sp_blockchain::Error::from_state_db)?;
+			self.blockchain.clear_pinning_cache();
 			Err(e)
 		} else {
 			self.storage.state_db.sync();
@@ -2000,12 +2117,14 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
 		let mut transaction = Transaction::new();
 		let header = self.blockchain.expect_header(hash)?;
 
+		let mut current_transaction_justifications = HashMap::new();
 		let m = self.finalize_block_with_transaction(
 			&mut transaction,
 			hash,
 			&header,
 			None,
 			justification,
+			&mut current_transaction_justifications,
 		)?;
 
 		self.storage.db.commit(transaction)?;
@@ -2382,6 +2501,49 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
 			PruningMode::ArchiveAll | PruningMode::ArchiveCanonical
 		)
 	}
+
+	fn pin_block(&self, hash: <Block as BlockT>::Hash) -> sp_blockchain::Result<()> {
+		let hint = || {
+			let header_metadata = self.blockchain.header_metadata(hash);
+			header_metadata
+				.map(|hdr| {
+					sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref())
+						.unwrap_or(None)
+						.is_some()
+				})
+				.unwrap_or(false)
+		};
+
+		if let Some(number) = self.blockchain.number(hash)? {
+			self.storage.state_db.pin(&hash, number.saturated_into::<u64>(), hint).map_err(
+				|_| {
+					sp_blockchain::Error::UnknownBlock(format!(
+						"State already discarded for `{:?}`",
+						hash
+					))
+				},
+			)?;
+		} else {
+			return Err(ClientError::UnknownBlock(format!(
+				"Can not pin block with hash `{:?}`. Block not found.",
+				hash
+			)))
+		}
+
+		if self.blocks_pruning != BlocksPruning::KeepAll {
+			// Only increase reference count for this hash. Value is loaded once we prune.
+			self.blockchain.bump_ref(hash);
+		}
+		Ok(())
+	}
+
+	fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
+		self.storage.state_db.unpin(&hash);
+
+		if self.blocks_pruning != BlocksPruning::KeepAll {
+			self.blockchain.unpin(hash);
+		}
+	}
 }
 
 impl<Block: BlockT> sc_client_api::backend::LocalBackend<Block> for Backend<Block> {}
@@ -4009,4 +4171,249 @@ pub(crate) mod tests {
 			assert_eq!(block4, backend.blockchain().hash(4).unwrap().unwrap());
 		}
 	}
+
+	#[test]
+	fn test_pinned_blocks_on_finalize() {
+		let backend = Backend::<Block>::new_test_with_tx_storage(BlocksPruning::Some(1), 10);
+		let mut blocks = Vec::new();
+		let mut prev_hash = Default::default();
+
+		let build_justification = |i: u64| ([0, 0, 0, 0], vec![i.try_into().unwrap()]);
+		// Block tree:
+		//   0 -> 1 -> 2 -> 3 -> 4
+		for i in 0..5 {
+			let hash = insert_block(
+				&backend,
+				i,
+				prev_hash,
+				None,
+				Default::default(),
+				vec![i.into()],
+				None,
+			)
+			.unwrap();
+			blocks.push(hash);
+			// Avoid block pruning.
+			backend.pin_block(blocks[i as usize]).unwrap();
+
+			prev_hash = hash;
+		}
+
+		let bc = backend.blockchain();
+
+		// Check that we can properly access values when there is reference count
+		// but no value.
+		assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap());
+
+		// Block 1 gets pinned three times
+		backend.pin_block(blocks[1]).unwrap();
+		backend.pin_block(blocks[1]).unwrap();
+
+		// Finalize all blocks. This will trigger pruning.
+		let mut op = backend.begin_operation().unwrap();
+		backend.begin_state_operation(&mut op, blocks[4]).unwrap();
+		for i in 1..5 {
+			op.mark_finalized(blocks[i], Some(build_justification(i.try_into().unwrap())))
+				.unwrap();
+		}
+		backend.commit_operation(op).unwrap();
+
+		// Block 0, 1, 2, 3 are pinned, so all values should be cached.
+		// Block 4 is inside the pruning window, its value is in db.
+		assert_eq!(Some(vec![0.into()]), bc.body(blocks[0]).unwrap());
+
+		assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap());
+		assert_eq!(
+			Some(Justifications::from(build_justification(1))),
+			bc.justifications(blocks[1]).unwrap()
+		);
+
+		assert_eq!(Some(vec![2.into()]), bc.body(blocks[2]).unwrap());
+		assert_eq!(
+			Some(Justifications::from(build_justification(2))),
+			bc.justifications(blocks[2]).unwrap()
+		);
+
+		assert_eq!(Some(vec![3.into()]), bc.body(blocks[3]).unwrap());
+		assert_eq!(
+			Some(Justifications::from(build_justification(3))),
+			bc.justifications(blocks[3]).unwrap()
+		);
+
+		assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
+		assert_eq!(
+			Some(Justifications::from(build_justification(4))),
+			bc.justifications(blocks[4]).unwrap()
+		);
+
+		// Unpin all blocks. Values should be removed from cache.
+		for block in &blocks {
+			backend.unpin_block(*block);
+		}
+
+		assert!(bc.body(blocks[0]).unwrap().is_none());
+		// Block 1 was pinned twice, we expect it to be still cached
+		assert!(bc.body(blocks[1]).unwrap().is_some());
+		assert!(bc.justifications(blocks[1]).unwrap().is_some());
+		// Headers should also be available while pinned
+		assert!(bc.header(blocks[1]).ok().flatten().is_some());
+		assert!(bc.body(blocks[2]).unwrap().is_none());
+		assert!(bc.justifications(blocks[2]).unwrap().is_none());
+		assert!(bc.body(blocks[3]).unwrap().is_none());
+		assert!(bc.justifications(blocks[3]).unwrap().is_none());
+
+		// After these unpins, block 1 should also be removed
+		backend.unpin_block(blocks[1]);
+		assert!(bc.body(blocks[1]).unwrap().is_some());
+		assert!(bc.justifications(blocks[1]).unwrap().is_some());
+		backend.unpin_block(blocks[1]);
+		assert!(bc.body(blocks[1]).unwrap().is_none());
+		assert!(bc.justifications(blocks[1]).unwrap().is_none());
+
+		// Block 4 is inside the pruning window and still kept
+		assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
+		assert_eq!(
+			Some(Justifications::from(build_justification(4))),
+			bc.justifications(blocks[4]).unwrap()
+		);
+
+		// Block tree:
+		//   0 -> 1 -> 2 -> 3 -> 4 -> 5
+		let hash =
+			insert_block(&backend, 5, prev_hash, None, Default::default(), vec![5.into()], None)
+				.unwrap();
+		blocks.push(hash);
+
+		backend.pin_block(blocks[4]).unwrap();
+		// Mark block 5 as finalized.
+		let mut op = backend.begin_operation().unwrap();
+		backend.begin_state_operation(&mut op, blocks[5]).unwrap();
+		op.mark_finalized(blocks[5], Some(build_justification(5))).unwrap();
+		backend.commit_operation(op).unwrap();
+
+		assert!(bc.body(blocks[0]).unwrap().is_none());
+		assert!(bc.body(blocks[1]).unwrap().is_none());
+		assert!(bc.body(blocks[2]).unwrap().is_none());
+		assert!(bc.body(blocks[3]).unwrap().is_none());
+
+		assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
+		assert_eq!(
+			Some(Justifications::from(build_justification(4))),
+			bc.justifications(blocks[4]).unwrap()
+		);
+		assert_eq!(Some(vec![5.into()]), bc.body(blocks[5]).unwrap());
+		assert!(bc.header(blocks[5]).ok().flatten().is_some());
+
+		backend.unpin_block(blocks[4]);
+		assert!(bc.body(blocks[4]).unwrap().is_none());
+		assert!(bc.justifications(blocks[4]).unwrap().is_none());
+
+		// Append a justification to block 5.
+		backend.append_justification(blocks[5], ([0, 0, 0, 1], vec![42])).unwrap();
+
+		let hash =
+			insert_block(&backend, 6, blocks[5], None, Default::default(), vec![6.into()], None)
+				.unwrap();
+		blocks.push(hash);
+
+		// Pin block 5 so it gets loaded into the cache on prune
+		backend.pin_block(blocks[5]).unwrap();
+
+		// Finalize block 6 so block 5 gets pruned. Since it is pinned both justifications should be
+		// in memory.
+		let mut op = backend.begin_operation().unwrap();
+		backend.begin_state_operation(&mut op, blocks[6]).unwrap();
+		op.mark_finalized(blocks[6], None).unwrap();
+		backend.commit_operation(op).unwrap();
+
+		assert_eq!(Some(vec![5.into()]), bc.body(blocks[5]).unwrap());
+		assert!(bc.header(blocks[5]).ok().flatten().is_some());
+		let mut expected = Justifications::from(build_justification(5));
+		expected.append(([0, 0, 0, 1], vec![42]));
+		assert_eq!(Some(expected), bc.justifications(blocks[5]).unwrap());
+	}
+
+	#[test]
+	fn test_pinned_blocks_on_finalize_with_fork() {
+		let backend = Backend::<Block>::new_test_with_tx_storage(BlocksPruning::Some(1), 10);
+		let mut blocks = Vec::new();
+		let mut prev_hash = Default::default();
+
+		// Block tree:
+		//   0 -> 1 -> 2 -> 3 -> 4
+		for i in 0..5 {
+			let hash = insert_block(
+				&backend,
+				i,
+				prev_hash,
+				None,
+				Default::default(),
+				vec![i.into()],
+				None,
+			)
+			.unwrap();
+			blocks.push(hash);
+
+			// Avoid block pruning.
+			backend.pin_block(blocks[i as usize]).unwrap();
+
+			prev_hash = hash;
+		}
+
+		// Insert a fork at the second block.
+		// Block tree:
+		//   0 -> 1 -> 2 -> 3 -> 4
+		//        \ -> 2 -> 3
+		let fork_hash_root =
+			insert_block(&backend, 2, blocks[1], None, H256::random(), vec![2.into()], None)
+				.unwrap();
+		let fork_hash_3 = insert_block(
+			&backend,
+			3,
+			fork_hash_root,
+			None,
+			H256::random(),
+			vec![3.into(), 11.into()],
+			None,
+		)
+		.unwrap();
+
+		// Do not prune the fork hash.
+		backend.pin_block(fork_hash_3).unwrap();
+
+		let mut op = backend.begin_operation().unwrap();
+		backend.begin_state_operation(&mut op, blocks[4]).unwrap();
+		op.mark_head(blocks[4]).unwrap();
+		backend.commit_operation(op).unwrap();
+
+		for i in 1..5 {
+			let mut op = backend.begin_operation().unwrap();
+			backend.begin_state_operation(&mut op, blocks[4]).unwrap();
+			op.mark_finalized(blocks[i], None).unwrap();
+			backend.commit_operation(op).unwrap();
+		}
+
+		let bc = backend.blockchain();
+		assert_eq!(Some(vec![0.into()]), bc.body(blocks[0]).unwrap());
+		assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap());
+		assert_eq!(Some(vec![2.into()]), bc.body(blocks[2]).unwrap());
+		assert_eq!(Some(vec![3.into()]), bc.body(blocks[3]).unwrap());
+		assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap());
+		// Check the fork hashes.
+		assert_eq!(None, bc.body(fork_hash_root).unwrap());
+		assert_eq!(Some(vec![3.into(), 11.into()]), bc.body(fork_hash_3).unwrap());
+
+		// Unpin all blocks, except the forked one.
+		for block in &blocks {
+			backend.unpin_block(*block);
+		}
+		assert!(bc.body(blocks[0]).unwrap().is_none());
+		assert!(bc.body(blocks[1]).unwrap().is_none());
+		assert!(bc.body(blocks[2]).unwrap().is_none());
+		assert!(bc.body(blocks[3]).unwrap().is_none());
+
+		assert!(bc.body(fork_hash_3).unwrap().is_some());
+		backend.unpin_block(fork_hash_3);
+		assert!(bc.body(fork_hash_3).unwrap().is_none());
+	}
 }
diff --git a/substrate/client/db/src/pinned_blocks_cache.rs b/substrate/client/db/src/pinned_blocks_cache.rs
new file mode 100644
index 00000000000..39ff1c52778
--- /dev/null
+++ b/substrate/client/db/src/pinned_blocks_cache.rs
@@ -0,0 +1,231 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2017-2023 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+use schnellru::{Limiter, LruMap};
+use sp_runtime::{traits::Block as BlockT, Justifications};
+
+const LOG_TARGET: &str = "db::pin";
+const PINNING_CACHE_SIZE: usize = 1024;
+
+/// Entry for pinned blocks cache.
+struct PinnedBlockCacheEntry<Block: BlockT> {
+	/// How many times this item has been pinned
+	ref_count: u32,
+
+	/// Cached justifications for this block
+	pub justifications: Option<Option<Justifications>>,
+
+	/// Cached body for this block
+	pub body: Option<Option<Vec<Block::Extrinsic>>>,
+}
+
+impl<Block: BlockT> Default for PinnedBlockCacheEntry<Block> {
+	fn default() -> Self {
+		Self { ref_count: 0, justifications: None, body: None }
+	}
+}
+
+impl<Block: BlockT> PinnedBlockCacheEntry<Block> {
+	pub fn decrease_ref(&mut self) {
+		self.ref_count = self.ref_count.saturating_sub(1);
+	}
+
+	pub fn increase_ref(&mut self) {
+		self.ref_count = self.ref_count.saturating_add(1);
+	}
+
+	pub fn has_no_references(&self) -> bool {
+		self.ref_count == 0
+	}
+}
+
+/// A limiter for a map which is limited by the number of elements.
+#[derive(Copy, Clone, Debug)]
+struct LoggingByLengthLimiter {
+	max_length: usize,
+}
+
+impl LoggingByLengthLimiter {
+	/// Creates a new length limiter with a given `max_length`.
+	pub const fn new(max_length: usize) -> LoggingByLengthLimiter {
+		LoggingByLengthLimiter { max_length }
+	}
+}
+
+impl<Block: BlockT> Limiter<Block::Hash, PinnedBlockCacheEntry<Block>> for LoggingByLengthLimiter {
+	type KeyToInsert<'a> = Block::Hash;
+	type LinkType = usize;
+
+	fn is_over_the_limit(&self, length: usize) -> bool {
+		length > self.max_length
+	}
+
+	fn on_insert(
+		&mut self,
+		_length: usize,
+		key: Self::KeyToInsert<'_>,
+		value: PinnedBlockCacheEntry<Block>,
+	) -> Option<(Block::Hash, PinnedBlockCacheEntry<Block>)> {
+		if self.max_length > 0 {
+			Some((key, value))
+		} else {
+			None
+		}
+	}
+
+	fn on_replace(
+		&mut self,
+		_length: usize,
+		_old_key: &mut Block::Hash,
+		_new_key: Block::Hash,
+		_old_value: &mut PinnedBlockCacheEntry<Block>,
+		_new_value: &mut PinnedBlockCacheEntry<Block>,
+	) -> bool {
+		true
+	}
+
+	fn on_removed(&mut self, key: &mut Block::Hash, value: &mut PinnedBlockCacheEntry<Block>) {
+		// If reference count was larger than 0 on removal,
+		// the item was removed due to capacity limitations.
+		// Since the cache should be large enough for pinned items,
+		// we want to know about these evictions.
+		if value.ref_count > 0 {
+			log::warn!(
+				target: LOG_TARGET,
+				"Pinned block cache limit reached. Evicting value. hash = {}",
+				key
+			);
+		} else {
+			log::trace!(
+				target: LOG_TARGET,
+				"Evicting value from pinned block cache. hash = {}",
+				key
+			)
+		}
+	}
+
+	fn on_cleared(&mut self) {}
+
+	fn on_grow(&mut self, _new_memory_usage: usize) -> bool {
+		true
+	}
+}
+
+/// Reference counted cache for pinned block bodies and justifications.
+pub struct PinnedBlocksCache<Block: BlockT> {
+	cache: LruMap<Block::Hash, PinnedBlockCacheEntry<Block>, LoggingByLengthLimiter>,
+}
+
+impl<Block: BlockT> PinnedBlocksCache<Block> {
+	pub fn new() -> Self {
+		Self { cache: LruMap::new(LoggingByLengthLimiter::new(PINNING_CACHE_SIZE)) }
+	}
+
+	/// Increase reference count of an item.
+	/// Create an entry with empty value in the cache if necessary.
+	pub fn pin(&mut self, hash: Block::Hash) {
+		match self.cache.get_or_insert(hash, Default::default) {
+			Some(entry) => {
+				entry.increase_ref();
+				log::trace!(
+					target: LOG_TARGET,
+					"Bumped cache refcount. hash = {}, num_entries = {}",
+					hash,
+					self.cache.len()
+				);
+			},
+			None =>
+				log::warn!(target: LOG_TARGET, "Unable to bump reference count. hash = {}", hash),
+		};
+	}
+
+	/// Clear the cache
+	pub fn clear(&mut self) {
+		self.cache.clear();
+	}
+
+	/// Check if item is contained in the cache
+	pub fn contains(&self, hash: Block::Hash) -> bool {
+		self.cache.peek(&hash).is_some()
+	}
+
+	/// Attach body to an existing cache item
+	pub fn insert_body(&mut self, hash: Block::Hash, extrinsics: Option<Vec<Block::Extrinsic>>) {
+		match self.cache.peek_mut(&hash) {
+			Some(mut entry) => {
+				entry.body = Some(extrinsics);
+				log::trace!(
+					target: LOG_TARGET,
+					"Cached body. hash = {}, num_entries = {}",
+					hash,
+					self.cache.len()
+				);
+			},
+			None => log::warn!(
+				target: LOG_TARGET,
+				"Unable to insert body for uncached item. hash = {}",
+				hash
+			),
+		}
+	}
+
+	/// Attach justification to an existing cache item
+	pub fn insert_justifications(
+		&mut self,
+		hash: Block::Hash,
+		justifications: Option<Justifications>,
+	) {
+		match self.cache.peek_mut(&hash) {
+			Some(mut entry) => {
+				entry.justifications = Some(justifications);
+				log::trace!(
+					target: LOG_TARGET,
+					"Cached justification. hash = {}, num_entries = {}",
+					hash,
+					self.cache.len()
+				);
+			},
+			None => log::warn!(
+				target: LOG_TARGET,
+				"Unable to insert justifications for uncached item. hash = {}",
+				hash
+			),
+		}
+	}
+
+	/// Decreases reference count of an item.
+	/// If the count hits 0, the item is removed.
+	pub fn unpin(&mut self, hash: Block::Hash) {
+		if let Some(entry) = self.cache.peek_mut(&hash) {
+			entry.decrease_ref();
+			if entry.has_no_references() {
+				self.cache.remove(&hash);
+			}
+		}
+	}
+
+	/// Get justifications for cached block
+	pub fn justifications(&self, hash: &Block::Hash) -> Option<&Option<Justifications>> {
+		self.cache.peek(hash).and_then(|entry| entry.justifications.as_ref())
+	}
+
+	/// Get body for cached block
+	pub fn body(&self, hash: &Block::Hash) -> Option<&Option<Vec<Block::Extrinsic>>> {
+		self.cache.peek(hash).and_then(|entry| entry.body.as_ref())
+	}
+}
diff --git a/substrate/client/finality-grandpa/src/until_imported.rs b/substrate/client/finality-grandpa/src/until_imported.rs
index 776411f8fb4..3bca77ae839 100644
--- a/substrate/client/finality-grandpa/src/until_imported.rs
+++ b/substrate/client/finality-grandpa/src/until_imported.rs
@@ -593,16 +593,17 @@ mod tests {
 		fn import_header(&self, header: Header) {
 			let hash = header.hash();
 			let number = *header.number();
-
+			let (tx, _rx) = tracing_unbounded("unpin-worker-channel", 10_000);
 			self.known_blocks.lock().insert(hash, number);
 			self.sender
-				.unbounded_send(BlockImportNotification {
+				.unbounded_send(BlockImportNotification::<Block>::new(
 					hash,
-					origin: BlockOrigin::File,
+					BlockOrigin::File,
 					header,
-					is_new_best: false,
-					tree_route: None,
-				})
+					false,
+					None,
+					tx,
+				))
 				.unwrap();
 		}
 	}
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index 1f94f96fae8..0b09f550ce3 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -329,13 +329,15 @@ where
 	let executor = crate::client::LocalCallExecutor::new(
 		backend.clone(),
 		executor,
-		spawn_handle,
+		spawn_handle.clone(),
 		config.clone(),
 		execution_extensions,
 	)?;
+
 	crate::client::Client::new(
 		backend,
 		executor,
+		spawn_handle,
 		genesis_block_builder,
 		fork_blocks,
 		bad_blocks,
diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs
index 8e10a7b2eda..d32baa671f5 100644
--- a/substrate/client/service/src/client/client.rs
+++ b/substrate/client/service/src/client/client.rs
@@ -22,7 +22,8 @@ use super::{
 	block_rules::{BlockRules, LookupResult as BlockLookupResult},
 	genesis::BuildGenesisBlock,
 };
-use log::{info, trace, warn};
+use futures::{FutureExt, StreamExt};
+use log::{error, info, trace, warn};
 use parking_lot::{Mutex, RwLock};
 use prometheus_endpoint::Registry;
 use rand::Rng;
@@ -58,9 +59,12 @@ use sp_blockchain::{
 use sp_consensus::{BlockOrigin, BlockStatus, Error as ConsensusError};
 
 use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
-use sp_core::storage::{
-	well_known_keys, ChildInfo, ChildType, PrefixedStorageKey, Storage, StorageChild, StorageData,
-	StorageKey,
+use sp_core::{
+	storage::{
+		well_known_keys, ChildInfo, ChildType, PrefixedStorageKey, Storage, StorageChild,
+		StorageData, StorageKey,
+	},
+	traits::SpawnNamed,
 };
 #[cfg(feature = "test-helpers")]
 use sp_keystore::SyncCryptoStorePtr;
@@ -88,9 +92,7 @@ use std::{
 
 #[cfg(feature = "test-helpers")]
 use {
-	super::call_executor::LocalCallExecutor,
-	sc_client_api::in_mem,
-	sp_core::traits::{CodeExecutor, SpawnNamed},
+	super::call_executor::LocalCallExecutor, sc_client_api::in_mem, sp_core::traits::CodeExecutor,
 };
 
 type NotificationSinks<T> = Mutex<Vec<TracingUnboundedSender<T>>>;
@@ -116,6 +118,7 @@ where
 	block_rules: BlockRules<Block>,
 	config: ClientConfig<Block>,
 	telemetry: Option<TelemetryHandle>,
+	unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
 	_phantom: PhantomData<RA>,
 }
 
@@ -246,7 +249,7 @@ where
 	let call_executor = LocalCallExecutor::new(
 		backend.clone(),
 		executor,
-		spawn_handle,
+		spawn_handle.clone(),
 		config.clone(),
 		extensions,
 	)?;
@@ -254,6 +257,7 @@ where
 	Client::new(
 		backend,
 		call_executor,
+		spawn_handle,
 		genesis_block_builder,
 		Default::default(),
 		Default::default(),
@@ -296,11 +300,20 @@ where
 
 			let ClientImportOperation { mut op, notify_imported, notify_finalized } = op;
 
-			let finality_notification = notify_finalized.map(|summary| summary.into());
+			let finality_notification = notify_finalized.map(|summary| {
+				FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone())
+			});
+
 			let (import_notification, storage_changes) = match notify_imported {
 				Some(mut summary) => {
 					let storage_changes = summary.storage_changes.take();
-					(Some(summary.into()), storage_changes)
+					(
+						Some(BlockImportNotification::from_summary(
+							summary,
+							self.unpin_worker_sender.clone(),
+						)),
+						storage_changes,
+					)
 				},
 				None => (None, None),
 			};
@@ -318,6 +331,27 @@ where
 
 			self.backend.commit_operation(op)?;
 
+			// We need to pin the block in the backend once
+			// for each notification. Once all notifications are
+			// dropped, the block will be unpinned automatically.
+			if let Some(ref notification) = finality_notification {
+				if let Err(err) = self.backend.pin_block(notification.hash) {
+					error!(
+						"Unable to pin block for finality notification. hash: {}, Error: {}",
+						notification.hash, err
+					);
+				};
+			}
+
+			if let Some(ref notification) = import_notification {
+				if let Err(err) = self.backend.pin_block(notification.hash) {
+					error!(
+						"Unable to pin block for import notification. hash: {}, Error: {}",
+						notification.hash, err
+					);
+				};
+			}
+
 			self.notify_finalized(finality_notification)?;
 			self.notify_imported(import_notification, storage_changes)?;
 
@@ -357,6 +391,7 @@ where
 	pub fn new<G>(
 		backend: Arc<B>,
 		executor: E,
+		spawn_handle: Box<dyn SpawnNamed>,
 		genesis_block_builder: G,
 		fork_blocks: ForkBlocks<Block>,
 		bad_blocks: BadBlocks<Block>,
@@ -369,6 +404,7 @@ where
 			Block,
 			BlockImportOperation = <B as backend::Backend<Block>>::BlockImportOperation,
 		>,
+		B: 'static,
 	{
 		let info = backend.blockchain().info();
 		if info.finalized_state.is_none() {
@@ -390,6 +426,26 @@ where
 			backend.commit_operation(op)?;
 		}
 
+		let (unpin_worker_sender, mut rx) =
+			tracing_unbounded::<Block::Hash>("unpin-worker-channel", 10_000);
+		let task_backend = Arc::downgrade(&backend);
+		spawn_handle.spawn(
+			"unpin-worker",
+			None,
+			async move {
+				while let Some(message) = rx.next().await {
+					if let Some(backend) = task_backend.upgrade() {
+						backend.unpin_block(message);
+					} else {
+						log::debug!("Terminating unpin-worker, backend reference was dropped.");
+						return
+					}
+				}
+				log::debug!("Terminating unpin-worker, stream terminated.")
+			}
+			.boxed(),
+		);
+
 		Ok(Client {
 			backend,
 			executor,
@@ -402,6 +458,7 @@ where
 			block_rules: BlockRules::new(fork_blocks, bad_blocks),
 			config,
 			telemetry,
+			unpin_worker_sender,
 			_phantom: Default::default(),
 		})
 	}
diff --git a/substrate/test-utils/client/src/lib.rs b/substrate/test-utils/client/src/lib.rs
index 5dc93da13fe..ff744b80cbf 100644
--- a/substrate/test-utils/client/src/lib.rs
+++ b/substrate/test-utils/client/src/lib.rs
@@ -41,7 +41,7 @@ use futures::{future::Future, stream::StreamExt};
 use sc_client_api::BlockchainEvents;
 use sc_service::client::{ClientConfig, LocalCallExecutor};
 use serde::Deserialize;
-use sp_core::storage::ChildInfo;
+use sp_core::{storage::ChildInfo, testing::TaskExecutor};
 use sp_runtime::{codec::Encode, traits::Block as BlockT, OpaqueExtrinsic};
 use std::{
 	collections::{HashMap, HashSet},
@@ -62,7 +62,7 @@ impl GenesisInit for () {
 }
 
 /// A builder for creating a test client instance.
-pub struct TestClientBuilder<Block: BlockT, ExecutorDispatch, Backend, G: GenesisInit> {
+pub struct TestClientBuilder<Block: BlockT, ExecutorDispatch, Backend: 'static, G: GenesisInit> {
 	execution_strategies: ExecutionStrategies,
 	genesis_init: G,
 	/// The key is an unprefixed storage key, this only contains
@@ -237,9 +237,12 @@ impl<Block: BlockT, ExecutorDispatch, Backend, G: GenesisInit>
 		)
 		.expect("Creates genesis block builder");
 
+		let spawn_handle = Box::new(TaskExecutor::new());
+
 		let client = client::Client::new(
 			self.backend.clone(),
 			executor,
+			spawn_handle,
 			genesis_block_builder,
 			self.fork_blocks,
 			self.bad_blocks,
-- 
GitLab