diff --git a/substrate/client/api/src/backend.rs b/substrate/client/api/src/backend.rs
index 3108ba899467b4be50b34e93121cad54ca2b6115..14841d8d3e96ff29aa26d462c5dcb486754b4731 100644
--- a/substrate/client/api/src/backend.rs
+++ b/substrate/client/api/src/backend.rs
@@ -26,7 +26,7 @@ use sp_runtime::{generic::BlockId, Justification, Justifications, Storage};
 use sp_runtime::traits::{Block as BlockT, NumberFor, HashFor};
 use sp_state_machine::{
 	ChangesTrieState, ChangesTrieStorage as StateChangesTrieStorage, ChangesTrieTransaction,
-	StorageCollection, ChildStorageCollection, OffchainChangesCollection,
+	StorageCollection, ChildStorageCollection, OffchainChangesCollection, IndexOperation,
 };
 use sp_storage::{StorageData, StorageKey, PrefixedStorageKey, ChildInfo};
 use crate::{
@@ -201,6 +201,9 @@ pub trait BlockImportOperation<Block: BlockT> {
 	/// Mark a block as new head. If both block import and set head are specified, set head
 	/// overrides block import's best block rule.
 	fn mark_head(&mut self, id: BlockId<Block>) -> sp_blockchain::Result<()>;
+
+	/// Add a transaction index operation.
+	fn update_transaction_index(&mut self, index: Vec<IndexOperation>) -> sp_blockchain::Result<()>;
 }
 
 /// Interface for performing operations on the backend.
diff --git a/substrate/client/api/src/client.rs b/substrate/client/api/src/client.rs
index 97fe77c8d81e1ce675f5c414ee8430e3ea96a73a..4a0940b1f4bd35921a27c738485749764af37b83 100644
--- a/substrate/client/api/src/client.rs
+++ b/substrate/client/api/src/client.rs
@@ -96,15 +96,18 @@ pub trait BlockBackend<Block: BlockT> {
 	/// Get block hash by number.
 	fn block_hash(&self, number: NumberFor<Block>) -> sp_blockchain::Result<Option<Block::Hash>>;
 
-	/// Get single extrinsic by hash.
-	fn extrinsic(
+	/// Get single indexed transaction by content hash. 
+	///
+	/// Note that this will only fetch transactions
+	/// that are indexed by the runtime with `storage_index_transaction`.
+	fn indexed_transaction(
 		&self,
 		hash: &Block::Hash,
-	) -> sp_blockchain::Result<Option<<Block as BlockT>::Extrinsic>>;
+	) -> sp_blockchain::Result<Option<Vec<u8>>>;
 
-	/// Check if extrinsic exists.
-	fn have_extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result<bool> {
-		Ok(self.extrinsic(hash)?.is_some())
+	/// Check if transaction index exists.
+	fn has_indexed_transaction(&self, hash: &Block::Hash) -> sp_blockchain::Result<bool> {
+		Ok(self.indexed_transaction(hash)?.is_some())
 	}
 }
 
diff --git a/substrate/client/api/src/in_mem.rs b/substrate/client/api/src/in_mem.rs
index c3d266954278ea25b0120113bbc8b3208dd2bd9e..930ae39c4b5233dabcffb94d8d17385c682d1a9c 100644
--- a/substrate/client/api/src/in_mem.rs
+++ b/substrate/client/api/src/in_mem.rs
@@ -30,7 +30,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero, NumberFor, Ha
 use sp_runtime::{Justification, Justifications, Storage};
 use sp_state_machine::{
 	ChangesTrieTransaction, InMemoryBackend, Backend as StateBackend, StorageCollection,
-	ChildStorageCollection,
+	ChildStorageCollection, IndexOperation,
 };
 use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata};
 
@@ -415,10 +415,10 @@ impl<Block: BlockT> blockchain::Backend<Block> for Blockchain<Block> {
 		unimplemented!()
 	}
 
-	fn extrinsic(
+	fn indexed_transaction(
 		&self,
 		_hash: &Block::Hash,
-	) -> sp_blockchain::Result<Option<<Block as BlockT>::Extrinsic>> {
+	) -> sp_blockchain::Result<Option<Vec<u8>>> {
 		unimplemented!("Not supported by the in-mem backend.")
 	}
 }
@@ -613,6 +613,10 @@ impl<Block: BlockT> backend::BlockImportOperation<Block> for BlockImportOperatio
 		self.set_head = Some(block);
 		Ok(())
 	}
+
+	fn update_transaction_index(&mut self, _index: Vec<IndexOperation>) -> sp_blockchain::Result<()> {
+		Ok(())
+	}
 }
 
 /// In-memory backend. Keeps all states and blocks in memory.
diff --git a/substrate/client/api/src/leaves.rs b/substrate/client/api/src/leaves.rs
index 1971012c6aabc1a696c175d3480d5cf6c448f039..47cac8b186f4abaea526b68e7266b999290651d9 100644
--- a/substrate/client/api/src/leaves.rs
+++ b/substrate/client/api/src/leaves.rs
@@ -25,7 +25,7 @@ use sp_runtime::traits::AtLeast32Bit;
 use codec::{Encode, Decode};
 use sp_blockchain::{Error, Result};
 
-type DbHash = [u8; 32];
+type DbHash = sp_core::H256;
 
 #[derive(Debug, Clone, PartialEq, Eq)]
 struct LeafSetItem<H, N> {
@@ -55,6 +55,11 @@ impl<H, N: Ord> FinalizationDisplaced<H, N> {
 		// one transaction, then there will be no overlap in the keys.
 		self.leaves.append(&mut other.leaves);
 	}
+
+	/// Iterate over all displaced leaves.
+	pub fn leaves(&self) -> impl IntoIterator<Item=&H> {
+		self.leaves.values().flatten()
+	}
 }
 
 /// list of leaf hashes ordered by number (descending).
diff --git a/substrate/client/db/Cargo.toml b/substrate/client/db/Cargo.toml
index 72c26fead1c1cc83783a7dde704fe6fb437b4bc8..e5e52494c2db64cb87052c5a46aed1dd888d3b78 100644
--- a/substrate/client/db/Cargo.toml
+++ b/substrate/client/db/Cargo.toml
@@ -35,7 +35,7 @@ sp-trie = { version = "3.0.0", path = "../../primitives/trie" }
 sp-consensus = { version = "0.9.0", path = "../../primitives/consensus/common" }
 sp-blockchain = { version = "3.0.0", path = "../../primitives/blockchain" }
 sp-database = { version = "3.0.0", path = "../../primitives/database" }
-parity-db = { version = "0.2.2", optional = true }
+parity-db = { version = "0.2.3", optional = true }
 prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.9.0", path = "../../utils/prometheus" }
 
 [dev-dependencies]
diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs
index acda057938e92c092eb39f798bd4cfd7d1adc236..0fc8e299f2a6c0104757b38f9752b3eeac80aa0c 100644
--- a/substrate/client/db/src/lib.rs
+++ b/substrate/client/db/src/lib.rs
@@ -78,7 +78,7 @@ use sp_runtime::traits::{
 use sp_state_machine::{
 	DBValue, ChangesTrieTransaction, ChangesTrieCacheAction, UsageInfo as StateUsageInfo,
 	StorageCollection, ChildStorageCollection, OffchainChangesCollection,
-	backend::Backend as StateBackend, StateMachineStats,
+	backend::Backend as StateBackend, StateMachineStats, IndexOperation,
 };
 use crate::utils::{DatabaseType, Meta, meta_keys, read_db, read_meta};
 use crate::changes_tries_storage::{DbChangesTrieStorage, DbChangesTrieStorageTransaction};
@@ -107,7 +107,16 @@ pub type DbState<B> = sp_state_machine::TrieBackend<
 
 const DB_HASH_LEN: usize = 32;
 /// Hash type that this backend uses for the database.
-pub type DbHash = [u8; DB_HASH_LEN];
+pub type DbHash = sp_core::H256;
+
+/// This is used as block body when storage-chain mode is enabled.
+#[derive(Debug, Encode, Decode)]
+struct ExtrinsicHeader {
+	/// Hash of the indexed part
+	indexed_hash: DbHash, // Zero hash if there's no indexed data
+	/// The rest of the data.
+	data: Vec<u8>,
+}
 
 /// A reference tracking state.
 ///
@@ -506,33 +515,47 @@ impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for Blockcha
 
 impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<Block> {
 	fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
-		match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
-			Some(body) => {
-				match self.transaction_storage {
-					TransactionStorageMode::BlockBody => match Decode::decode(&mut &body[..]) {
-						Ok(body) => Ok(Some(body)),
-						Err(err) => return Err(sp_blockchain::Error::Backend(
-							format!("Error decoding body: {}", err)
-						)),
-					},
-					TransactionStorageMode::StorageChain => {
-						match Vec::<Block::Hash>::decode(&mut &body[..]) {
-							Ok(hashes) => {
-								let extrinsics: ClientResult<Vec<Block::Extrinsic>> = hashes.into_iter().map(
-									|h| self.extrinsic(&h).and_then(|maybe_ex| maybe_ex.ok_or_else(
-										|| sp_blockchain::Error::Backend(
-											format!("Missing transaction: {}", h))))
-								).collect();
-								Ok(Some(extrinsics?))
+		let body = match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
+			Some(body) => body,
+			None => return Ok(None),
+		};
+		match self.transaction_storage {
+			TransactionStorageMode::BlockBody => match Decode::decode(&mut &body[..]) {
+				Ok(body) => Ok(Some(body)),
+				Err(err) => return Err(sp_blockchain::Error::Backend(
+					format!("Error decoding body: {}", err)
+				)),
+			},
+			TransactionStorageMode::StorageChain => {
+				match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) {
+					Ok(index) => {
+						let extrinsics: ClientResult<Vec<Block::Extrinsic>> = index.into_iter().map(
+							| ExtrinsicHeader { indexed_hash, data } | {
+								let decode_result = if indexed_hash != Default::default() {
+									match self.db.get(columns::TRANSACTION, indexed_hash.as_ref()) {
+										Some(t) => {
+											let mut input = utils::join_input(data.as_ref(), t.as_ref());
+											Block::Extrinsic::decode(&mut input)
+										},
+										None => return Err(sp_blockchain::Error::Backend(
+											format!("Missing indexed transaction {:?}", indexed_hash))
+										)
+									}
+								} else {
+									Block::Extrinsic::decode(&mut data.as_ref())
+								};
+								decode_result.map_err(|err| sp_blockchain::Error::Backend(
+									format!("Error decoding extrinsic: {}", err))
+								)
 							}
-							Err(err) => return Err(sp_blockchain::Error::Backend(
-								format!("Error decoding body list: {}", err)
-							)),
-						}
+						).collect();
+						Ok(Some(extrinsics?))
 					}
+					Err(err) => return Err(sp_blockchain::Error::Backend(
+						format!("Error decoding body list: {}", err)
+					)),
 				}
 			}
-			None => Ok(None),
 		}
 	}
 
@@ -564,21 +587,11 @@ impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<B
 		children::read_children(&*self.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash)
 	}
 
-	fn extrinsic(&self, hash: &Block::Hash) -> ClientResult<Option<Block::Extrinsic>> {
-		match self.db.get(columns::TRANSACTION, hash.as_ref()) {
-			Some(ex) => {
-				match Decode::decode(&mut &ex[..]) {
-					Ok(ex) => Ok(Some(ex)),
-					Err(err) => Err(sp_blockchain::Error::Backend(
-							format!("Error decoding extrinsic {}: {}", hash, err)
-					)),
-				}
-			},
-			None => Ok(None),
-		}
+	fn indexed_transaction(&self, hash: &Block::Hash) -> ClientResult<Option<Vec<u8>>> {
+		Ok(self.db.get(columns::TRANSACTION, hash.as_ref()))
 	}
 
-	fn have_extrinsic(&self, hash: &Block::Hash) -> ClientResult<bool> {
+	fn has_indexed_transaction(&self, hash: &Block::Hash) -> ClientResult<bool> {
 		Ok(self.db.contains(columns::TRANSACTION, hash.as_ref()))
 	}
 }
@@ -681,6 +694,7 @@ pub struct BlockImportOperation<Block: BlockT> {
 	finalized_blocks: Vec<(BlockId<Block>, Option<Justification>)>,
 	set_head: Option<BlockId<Block>>,
 	commit_state: bool,
+	index_ops: Vec<IndexOperation>,
 }
 
 impl<Block: BlockT> BlockImportOperation<Block> {
@@ -823,6 +837,11 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for Bloc
 		self.set_head = Some(block);
 		Ok(())
 	}
+
+	fn update_transaction_index(&mut self, index_ops: Vec<IndexOperation>) -> ClientResult<()> {
+		self.index_ops = index_ops;
+		Ok(())
+	}
 }
 
 struct StorageDb<Block: BlockT> {
@@ -1155,21 +1174,21 @@ impl<Block: BlockT> Backend<Block> {
 			if new_canonical <= self.storage.state_db.best_canonical().unwrap_or(0) {
 				return Ok(())
 			}
-
 			let hash = if new_canonical == number_u64 {
 				hash
 			} else {
-				::sc_client_api::blockchain::HeaderBackend::hash(&self.blockchain, new_canonical.saturated_into())?
-					.expect("existence of block with number `new_canonical` \
-						implies existence of blocks with all numbers before it; qed")
+				sc_client_api::blockchain::HeaderBackend::hash(
+					&self.blockchain,
+					new_canonical.saturated_into(),
+				)?.expect("existence of block with number `new_canonical` \
+					implies existence of blocks with all numbers before it; qed")
 			};
 
 			trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash);
 			let commit = self.storage.state_db.canonicalize_block(&hash)
 				.map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from_state_db(e))?;
 			apply_state_commit(transaction, commit);
-		};
-
+		}
 		Ok(())
 	}
 
@@ -1225,20 +1244,14 @@ impl<Block: BlockT> Backend<Block> {
 			)?;
 
 			transaction.set_from_vec(columns::HEADER, &lookup_key, pending_block.header.encode());
-			if let Some(body) = &pending_block.body {
+			if let Some(body) = pending_block.body {
 				match self.transaction_storage {
 					TransactionStorageMode::BlockBody => {
 						transaction.set_from_vec(columns::BODY, &lookup_key, body.encode());
 					},
 					TransactionStorageMode::StorageChain => {
-						let mut hashes = Vec::with_capacity(body.len());
-						for extrinsic in body {
-							let extrinsic = extrinsic.encode();
-							let hash = HashFor::<Block>::hash(&extrinsic);
-							transaction.set(columns::TRANSACTION, &hash.as_ref(), &extrinsic);
-							hashes.push(hash);
-						}
-						transaction.set_from_vec(columns::BODY, &lookup_key, hashes.encode());
+						let body = apply_index_ops::<Block>(&mut transaction, body, operation.index_ops);
+						transaction.set_from_vec(columns::BODY, &lookup_key, body);
 					},
 				}
 			}
@@ -1491,8 +1504,8 @@ impl<Block: BlockT> Backend<Block> {
 			}
 		}
 
-		self.prune_blocks(transaction, f_num)?;
 		let new_displaced = self.blockchain.leaves.write().finalize_height(f_num);
+		self.prune_blocks(transaction, f_num, &new_displaced)?;
 		match displaced {
 			x @ &mut None => *x = Some(new_displaced),
 			&mut Some(ref mut displaced) => displaced.merge(new_displaced),
@@ -1505,47 +1518,83 @@ impl<Block: BlockT> Backend<Block> {
 		&self,
 		transaction: &mut Transaction<DbHash>,
 		finalized: NumberFor<Block>,
+		displaced: &FinalizationDisplaced<Block::Hash, NumberFor<Block>>,
 	) -> ClientResult<()> {
 		if let KeepBlocks::Some(keep_blocks) = self.keep_blocks {
 			// Always keep the last finalized block
 			let keep = std::cmp::max(keep_blocks, 1);
-			if finalized < keep.into() {
-				return Ok(())
+			if finalized >= keep.into() {
+				let number = finalized.saturating_sub(keep.into());
+				self.prune_block(transaction, BlockId::<Block>::number(number))?;
 			}
-			let number = finalized.saturating_sub(keep.into());
-			match read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY, BlockId::<Block>::number(number))? {
-				Some(body) => {
-					debug!(target: "db", "Removing block #{}", number);
-					utils::remove_from_db(
-						transaction,
-						&*self.storage.db,
-						columns::KEY_LOOKUP,
-						columns::BODY,
-						BlockId::<Block>::number(number),
-					)?;
-					match self.transaction_storage {
-						TransactionStorageMode::BlockBody => {},
-						TransactionStorageMode::StorageChain => {
-							match Vec::<Block::Hash>::decode(&mut &body[..]) {
-								Ok(hashes) => {
-									for h in hashes {
-										transaction.remove(columns::TRANSACTION, h.as_ref());
+
+			// Also discard all blocks from displaced branches
+			for h in displaced.leaves() {
+				let mut number = finalized;
+				let mut hash = h.clone();
+				// Follow displaced chains back until we reach a finalized block.
+				// Since leaves are discarded due to finality, they can't have parents
+				// that are canonical, but not yet finalized. So we stop deletig as soon as
+				// we reach canonical chain.
+				while self.blockchain.hash(number)? != Some(hash.clone()) {
+					let id = BlockId::<Block>::hash(hash.clone());
+					match self.blockchain.header(id)? {
+						Some(header) => {
+							self.prune_block(transaction, id)?;
+							number = header.number().saturating_sub(One::one());
+							hash = header.parent_hash().clone();
+						},
+						None => break,
+					}
+				}
+			}
+		}
+		Ok(())
+	}
+
+	fn prune_block(
+		&self,
+		transaction: &mut Transaction<DbHash>,
+		id: BlockId<Block>,
+	) -> ClientResult<()> {
+		match read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY, id)? {
+			Some(body) => {
+				debug!(target: "db", "Removing block #{}", id);
+				utils::remove_from_db(
+					transaction,
+					&*self.storage.db,
+					columns::KEY_LOOKUP,
+					columns::BODY,
+					id,
+				)?;
+				match self.transaction_storage {
+					TransactionStorageMode::BlockBody => {},
+					TransactionStorageMode::StorageChain => {
+						match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) {
+							Ok(body) => {
+								for ExtrinsicHeader { indexed_hash, .. } in body {
+									if indexed_hash != Default::default() {
+										transaction.release(
+											columns::TRANSACTION,
+											indexed_hash,
+										);
 									}
 								}
-								Err(err) => return Err(sp_blockchain::Error::Backend(
-									format!("Error decoding body list: {}", err)
-								)),
 							}
+							Err(err) => return Err(sp_blockchain::Error::Backend(
+									format!("Error decoding body list: {}", err)
+							)),
 						}
 					}
 				}
-				None => return Ok(()),
 			}
+			None => return Ok(()),
 		}
 		Ok(())
 	}
 }
 
+
 fn apply_state_commit(transaction: &mut Transaction<DbHash>, commit: sc_state_db::CommitSet<Vec<u8>>) {
 	for (key, val) in commit.data.inserted.into_iter() {
 		transaction.set_from_vec(columns::STATE, &key[..], val);
@@ -1561,6 +1610,67 @@ fn apply_state_commit(transaction: &mut Transaction<DbHash>, commit: sc_state_db
 	}
 }
 
+fn apply_index_ops<Block: BlockT>(
+	transaction: &mut Transaction<DbHash>,
+	body: Vec<Block::Extrinsic>,
+	ops: Vec<IndexOperation>,
+) -> Vec<u8> {
+	let mut extrinsic_headers: Vec<ExtrinsicHeader> = Vec::with_capacity(body.len());
+	let mut index_map = HashMap::new();
+	let mut renewed_map = HashMap::new();
+	for op in ops {
+		match op {
+			IndexOperation::Insert { extrinsic, offset } => {
+				index_map.insert(extrinsic, offset);
+			}
+			IndexOperation::Renew { extrinsic, hash, .. } => {
+				renewed_map.insert(extrinsic, DbHash::from_slice(hash.as_ref()));
+			}
+		}
+	}
+	for (index, extrinsic) in body.into_iter().enumerate() {
+		let extrinsic = extrinsic.encode();
+		let extrinsic_header = if let Some(hash) = renewed_map.get(&(index as u32)) {
+			// Bump ref counter
+			transaction.reference(columns::TRANSACTION, DbHash::from_slice(hash.as_ref()));
+			ExtrinsicHeader {
+				indexed_hash: hash.clone(),
+				data: extrinsic,
+			}
+		} else {
+			match index_map.get(&(index as u32)) {
+				Some(offset) if *offset as usize <= extrinsic.len() => {
+					let offset = *offset as usize;
+					let hash = HashFor::<Block>::hash(&extrinsic[offset..]);
+					transaction.store(
+						columns::TRANSACTION,
+						DbHash::from_slice(hash.as_ref()),
+						extrinsic[offset..].to_vec(),
+					);
+					ExtrinsicHeader {
+						indexed_hash: DbHash::from_slice(hash.as_ref()),
+						data: extrinsic[..offset].to_vec(),
+					}
+				},
+				_ => {
+					ExtrinsicHeader {
+						indexed_hash: Default::default(),
+						data: extrinsic,
+					}
+				}
+			}
+		};
+		extrinsic_headers.push(extrinsic_header);
+	}
+	debug!(
+		target: "db",
+		"DB transaction index: {} inserted, {} renewed",
+		index_map.len(),
+		renewed_map.len()
+	);
+	extrinsic_headers.encode()
+}
+
 impl<Block> sc_client_api::backend::AuxStore for Backend<Block> where Block: BlockT {
 	fn insert_aux<
 		'a,
@@ -1609,6 +1719,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
 			finalized_blocks: Vec::new(),
 			set_head: None,
 			commit_state: false,
+			index_ops: Default::default(),
 		})
 	}
 
@@ -1998,7 +2109,7 @@ pub(crate) mod tests {
 		changes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
 		extrinsics_root: H256,
 	) -> H256 {
-		insert_block(backend, number, parent_hash, changes, extrinsics_root, Vec::new())
+		insert_block(backend, number, parent_hash, changes, extrinsics_root, Vec::new(), None)
 	}
 
 	pub fn insert_block(
@@ -2008,6 +2119,7 @@ pub(crate) mod tests {
 		changes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
 		extrinsics_root: H256,
 		body: Vec<ExtrinsicWrapper<u64>>,
+		transaction_index: Option<Vec<IndexOperation>>,
 	) -> H256 {
 		use sp_runtime::testing::Digest;
 
@@ -2035,6 +2147,9 @@ pub(crate) mod tests {
 		let mut op = backend.begin_operation().unwrap();
 		backend.begin_state_operation(&mut op, block_id).unwrap();
 		op.set_block_data(header, Some(body), None, NewBlockState::Best).unwrap();
+		if let Some(index) = transaction_index {
+			op.update_transaction_index(index).unwrap();
+		}
 		op.update_changes_trie((changes_trie_update, ChangesTrieCacheAction::Clear)).unwrap();
 		backend.commit_operation(op).unwrap();
 
@@ -2676,7 +2791,7 @@ pub(crate) mod tests {
 			let mut blocks = Vec::new();
 			let mut prev_hash = Default::default();
 			for i in 0 .. 5 {
-				let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()]);
+				let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()], None);
 				blocks.push(hash);
 				prev_hash = hash;
 			}
@@ -2697,4 +2812,100 @@ pub(crate) mod tests {
 			assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap());
 		}
 	}
+
+	#[test]
+	fn prune_blocks_on_finalize_with_fork() {
+		let backend = Backend::<Block>::new_test_with_tx_storage(
+			2,
+			10,
+			TransactionStorageMode::StorageChain
+		);
+		let mut blocks = Vec::new();
+		let mut prev_hash = Default::default();
+		for i in 0 .. 5 {
+			let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()], None);
+			blocks.push(hash);
+			prev_hash = hash;
+		}
+
+		// insert a fork at block 2
+		let fork_hash_root = insert_block(
+			&backend,
+			2,
+			blocks[1],
+			None,
+			sp_core::H256::random(),
+			vec![2.into()],
+			None
+		);
+		insert_block(&backend, 3, fork_hash_root, None, H256::random(), vec![3.into(), 11.into()], None);
+		let mut op = backend.begin_operation().unwrap();
+		backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap();
+		op.mark_head(BlockId::Hash(blocks[4])).unwrap();
+		backend.commit_operation(op).unwrap();
+
+		for i in 1 .. 5 {
+			let mut op = backend.begin_operation().unwrap();
+			backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap();
+			op.mark_finalized(BlockId::Hash(blocks[i]), None).unwrap();
+			backend.commit_operation(op).unwrap();
+		}
+
+		let bc = backend.blockchain();
+		assert_eq!(None, bc.body(BlockId::hash(blocks[0])).unwrap());
+		assert_eq!(None, bc.body(BlockId::hash(blocks[1])).unwrap());
+		assert_eq!(None, bc.body(BlockId::hash(blocks[2])).unwrap());
+		assert_eq!(Some(vec![3.into()]), bc.body(BlockId::hash(blocks[3])).unwrap());
+		assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap());
+	}
+
+	#[test]
+	fn renew_transaction_storage() {
+		let backend = Backend::<Block>::new_test_with_tx_storage(
+			2,
+			10,
+			TransactionStorageMode::StorageChain
+		);
+		let mut blocks = Vec::new();
+		let mut prev_hash = Default::default();
+		let x1 = ExtrinsicWrapper::from(0u64).encode();
+		let x1_hash = <HashFor::<Block> as sp_core::Hasher>::hash(&x1[1..]);
+		for i in 0 .. 10 {
+			let mut index = Vec::new();
+			if i == 0 {
+				index.push(IndexOperation::Insert { extrinsic: 0, offset: 1 });
+			} else if i < 5 {
+				// keep renewing 1st
+				index.push(IndexOperation::Renew {
+					extrinsic: 0,
+					hash: x1_hash.as_ref().to_vec(),
+					size: (x1.len() - 1) as u32,
+				});
+			} // else stop renewing
+			let hash = insert_block(
+				&backend,
+				i,
+				prev_hash,
+				None,
+				Default::default(),
+				vec![i.into()],
+				Some(index)
+			);
+			blocks.push(hash);
+			prev_hash = hash;
+		}
+
+		for i in 1 .. 10 {
+			let mut op = backend.begin_operation().unwrap();
+			backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap();
+			op.mark_finalized(BlockId::Hash(blocks[i]), None).unwrap();
+			backend.commit_operation(op).unwrap();
+			let bc = backend.blockchain();
+			if i < 6 {
+				assert!(bc.indexed_transaction(&x1_hash).unwrap().is_some());
+			} else {
+				assert!(bc.indexed_transaction(&x1_hash).unwrap().is_none());
+			}
+		}
+	}
 }
diff --git a/substrate/client/db/src/light.rs b/substrate/client/db/src/light.rs
index 91f37dd374d9f8912f40529cf1bbe89acc58288c..bf24197c5b5d9eaf68db7567d21b9f928ec09296 100644
--- a/substrate/client/db/src/light.rs
+++ b/substrate/client/db/src/light.rs
@@ -756,7 +756,7 @@ pub(crate) mod tests {
 	#[test]
 	fn finalized_ancient_headers_are_replaced_with_cht() {
 		fn insert_headers<F: Fn(&Hash, u64) -> Header>(header_producer: F) ->
-			(Arc<sp_database::MemDb<DbHash>>, LightStorage<Block>)
+			(Arc<sp_database::MemDb>, LightStorage<Block>)
 		{
 			let raw_db = Arc::new(sp_database::MemDb::default());
 			let db = LightStorage::from_kvdb(raw_db.clone()).unwrap();
diff --git a/substrate/client/db/src/parity_db.rs b/substrate/client/db/src/parity_db.rs
index 71cc5117f19ee47254db2f092443fe642e6c34cf..ed39c1e9f669f365b0b1abdb674c12ee8a13a2a4 100644
--- a/substrate/client/db/src/parity_db.rs
+++ b/substrate/client/db/src/parity_db.rs
@@ -33,7 +33,7 @@ fn handle_err<T>(result: parity_db::Result<T>) -> T {
 }
 
 /// Wrap parity-db database into a trait object that implements `sp_database::Database`
-pub fn open<H: Clone>(path: &std::path::Path, db_type: DatabaseType)
+pub fn open<H: Clone + AsRef<[u8]>>(path: &std::path::Path, db_type: DatabaseType)
 	-> parity_db::Result<std::sync::Arc<dyn Database<H>>>
 {
 	let mut config = parity_db::Options::with_columns(path, NUM_COLUMNS as u8);
@@ -48,7 +48,7 @@ pub fn open<H: Clone>(path: &std::path::Path, db_type: DatabaseType)
 	Ok(std::sync::Arc::new(DbAdapter(db)))
 }
 
-impl<H: Clone> Database<H> for DbAdapter {
+impl<H: Clone + AsRef<[u8]>> Database<H> for DbAdapter {
 	fn commit(&self, transaction: Transaction<H>) -> Result<(), DatabaseError> {
 		handle_err(self.0.commit(transaction.0.into_iter().map(|change|
 			match change {
@@ -65,7 +65,11 @@ impl<H: Clone> Database<H> for DbAdapter {
 		handle_err(self.0.get(col as u8, key))
 	}
 
-	fn lookup(&self, _hash: &H) -> Option<Vec<u8>> {
-		unimplemented!();
+	fn contains(&self, col: ColumnId, key: &[u8]) -> bool {
+		handle_err(self.0.get_size(col as u8, key)).is_some()
+	}
+
+	fn value_size(&self, col: ColumnId, key: &[u8]) -> Option<usize> {
+		handle_err(self.0.get_size(col as u8, key)).map(|s| s as usize)
 	}
 }
diff --git a/substrate/client/db/src/utils.rs b/substrate/client/db/src/utils.rs
index cd9b2a6f56d415e5b193b9e94b7d0241ca24c4bd..590b994d50e8775b3edea641abea932b841e2d6f 100644
--- a/substrate/client/db/src/utils.rs
+++ b/substrate/client/db/src/utils.rs
@@ -278,7 +278,7 @@ pub fn open_database<Block: BlockT>(
 		#[cfg(feature = "with-parity-db")]
 		DatabaseSettingsSrc::ParityDb { path } => {
 			crate::parity_db::open(&path, db_type)
-				.map_err(|e| sp_blockchain::Error::Backend(format!("{:?}", e)))?
+				.map_err(|e| sp_blockchain::Error::Backend(format!("{}", e)))?
 		},
 		#[cfg(not(feature = "with-parity-db"))]
 		DatabaseSettingsSrc::ParityDb { .. } => {
@@ -449,10 +449,35 @@ impl DatabaseType {
 	}
 }
 
+pub(crate) struct JoinInput<'a, 'b>(&'a [u8], &'b [u8]);
+
+pub(crate) fn join_input<'a, 'b>(i1: &'a[u8], i2: &'b [u8]) -> JoinInput<'a, 'b> {
+	JoinInput(i1, i2)
+}
+
+impl<'a, 'b> codec::Input for JoinInput<'a, 'b> {
+	fn remaining_len(&mut self) -> Result<Option<usize>, codec::Error> {
+		Ok(Some(self.0.len() + self.1.len()))
+	}
+
+	fn read(&mut self, into: &mut [u8]) -> Result<(), codec::Error> {
+		let mut read = 0;
+		if self.0.len() > 0 {
+			read = std::cmp::min(self.0.len(), into.len());
+			self.0.read(&mut into[..read])?;
+		}
+		if read < into.len() {
+			self.1.read(&mut into[read..])?;
+		}
+		Ok(())
+	}
+}
+
 #[cfg(test)]
 mod tests {
 	use super::*;
 	use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper};
+	use codec::Input;
 	type Block = RawBlock<ExtrinsicWrapper<u32>>;
 
 	#[test]
@@ -469,4 +494,25 @@ mod tests {
 		assert_eq!(DatabaseType::Full.as_str(), "full");
 		assert_eq!(DatabaseType::Light.as_str(), "light");
 	}
+
+	#[test]
+	fn join_input_works() {
+		let buf1 = [1, 2, 3, 4];
+		let buf2 = [5, 6, 7, 8];
+		let mut test = [0, 0, 0];
+		let mut joined = join_input(buf1.as_ref(), buf2.as_ref());
+		assert_eq!(joined.remaining_len().unwrap(), Some(8));
+
+		joined.read(&mut test).unwrap();
+		assert_eq!(test, [1, 2, 3]);
+		assert_eq!(joined.remaining_len().unwrap(), Some(5));
+
+		joined.read(&mut test).unwrap();
+		assert_eq!(test, [4, 5, 6]);
+		assert_eq!(joined.remaining_len().unwrap(), Some(2));
+
+		joined.read(&mut test[0..2]).unwrap();
+		assert_eq!(test, [7, 8, 6]);
+		assert_eq!(joined.remaining_len().unwrap(), Some(0));
+	}
 }
diff --git a/substrate/client/light/src/backend.rs b/substrate/client/light/src/backend.rs
index 52ace4fd947537e633d2de6baa12c8aa04335757..621ada13ff61d041a8ccf045a626a7c3ca91c24f 100644
--- a/substrate/client/light/src/backend.rs
+++ b/substrate/client/light/src/backend.rs
@@ -30,7 +30,7 @@ use sp_core::storage::{well_known_keys, ChildInfo};
 use sp_core::offchain::storage::InMemOffchainStorage;
 use sp_state_machine::{
 	Backend as StateBackend, TrieBackend, InMemoryBackend, ChangesTrieTransaction,
-	StorageCollection, ChildStorageCollection,
+	StorageCollection, ChildStorageCollection, IndexOperation,
 };
 use sp_runtime::{generic::BlockId, Justification, Justifications, Storage};
 use sp_runtime::traits::{Block as BlockT, NumberFor, Zero, Header, HashFor};
@@ -374,6 +374,11 @@ impl<S, Block> BlockImportOperation<Block> for ImportOperation<Block, S>
 		self.set_head = Some(block);
 		Ok(())
 	}
+
+	fn update_transaction_index(&mut self, _index: Vec<IndexOperation>) -> sp_blockchain::Result<()> {
+		// noop for the light client
+		Ok(())
+	}
 }
 
 impl<H: Hasher> std::fmt::Debug for GenesisOrUnavailableState<H> {
diff --git a/substrate/client/light/src/blockchain.rs b/substrate/client/light/src/blockchain.rs
index 062b3a9866d086bbcbaf08ad2ca8e795b1985e27..3349adf7ac6934daa6fffe91047b306a6a1e45e3 100644
--- a/substrate/client/light/src/blockchain.rs
+++ b/substrate/client/light/src/blockchain.rs
@@ -129,10 +129,10 @@ impl<S, Block> BlockchainBackend<Block> for Blockchain<S> where Block: BlockT, S
 		Err(ClientError::NotAvailableOnLightClient)
 	}
 
-	fn extrinsic(
+	fn indexed_transaction(
 		&self,
 		_hash: &Block::Hash,
-	) -> ClientResult<Option<<Block as BlockT>::Extrinsic>> {
+	) -> ClientResult<Option<Vec<u8>>> {
 		Err(ClientError::NotAvailableOnLightClient)
 	}
 }
diff --git a/substrate/client/network/src/bitswap.rs b/substrate/client/network/src/bitswap.rs
index 7129f3dbe07b1a879a6df47471dd53a007af48c1..aea2b8420cb2ca1be2b772e00fb641de0cc8256b 100644
--- a/substrate/client/network/src/bitswap.rs
+++ b/substrate/client/network/src/bitswap.rs
@@ -25,7 +25,6 @@ use std::io;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 use cid::Version;
-use codec::Encode;
 use core::pin::Pin;
 use futures::Future;
 use futures::io::{AsyncRead, AsyncWrite};
@@ -257,15 +256,15 @@ impl<B: BlockT> NetworkBehaviour for Bitswap<B> {
 			}
 			let mut hash = B::Hash::default();
 			hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
-			let extrinsic = match self.client.extrinsic(&hash) {
+			let transaction = match self.client.indexed_transaction(&hash) {
 				Ok(ex) => ex,
 				Err(e) => {
-					error!(target: LOG_TARGET, "Error retrieving extrinsic {}: {}", hash, e);
+					error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e);
 					None
 				}
 			};
-			match extrinsic {
-				Some(extrinsic) => {
+			match transaction {
+				Some(transaction) => {
 					trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash);
 					if entry.want_type == WantType::Block as i32 {
 						let prefix = Prefix {
@@ -276,7 +275,7 @@ impl<B: BlockT> NetworkBehaviour for Bitswap<B> {
 						};
 						response.payload.push(MessageBlock {
 							prefix: prefix.to_bytes(),
-							data: extrinsic.encode(),
+							data: transaction,
 						});
 					} else {
 						response.block_presences.push(BlockPresence {
diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs
index 81c98b8b1e2b9c12db834942cdcb11c75ae619c6..a39c45664192055736ea7a0f6f886bf1ac8e8821 100644
--- a/substrate/client/service/src/client/client.rs
+++ b/substrate/client/service/src/client/client.rs
@@ -767,6 +767,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
 					offchain_sc,
 					tx, _,
 					changes_trie_tx,
+					tx_index,
 				) = storage_changes.into_inner();
 
 				if self.config.offchain_indexing_api {
@@ -775,6 +776,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
 
 				operation.op.update_db_storage(tx)?;
 				operation.op.update_storage(main_sc.clone(), child_sc.clone())?;
+				operation.op.update_transaction_index(tx_index)?;
 
 				if let Some(changes_trie_transaction) = changes_trie_tx {
 					operation.op.update_changes_trie(changes_trie_transaction)?;
@@ -1945,12 +1947,12 @@ impl<B, E, Block, RA> BlockBackend<Block> for Client<B, E, Block, RA>
 		self.backend.blockchain().hash(number)
 	}
 
-	fn extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result<Option<Block::Extrinsic>> {
-		self.backend.blockchain().extrinsic(hash)
+	fn indexed_transaction(&self, hash: &Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
+		self.backend.blockchain().indexed_transaction(hash)
 	}
 
-	fn have_extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result<bool> {
-		self.backend.blockchain().have_extrinsic(hash)
+	fn has_indexed_transaction(&self, hash: &Block::Hash) -> sp_blockchain::Result<bool> {
+		self.backend.blockchain().has_indexed_transaction(hash)
 	}
 }
 
diff --git a/substrate/primitives/blockchain/src/backend.rs b/substrate/primitives/blockchain/src/backend.rs
index 6ee836acb64413e01974378f8c5dc76fb395097b..b00cbada9f4766b174734d60d5480e9c164e2784 100644
--- a/substrate/primitives/blockchain/src/backend.rs
+++ b/substrate/primitives/blockchain/src/backend.rs
@@ -216,15 +216,16 @@ pub trait Backend<Block: BlockT>: HeaderBackend<Block> + HeaderMetadata<Block, E
 		Ok(None)
 	}
 
-	/// Get single extrinsic by hash.
-	fn extrinsic(
+	/// Get single indexed transaction by content hash. Note that this will only fetch transactions
+	/// that are indexed by the runtime with `storage_index_transaction`.
+	fn indexed_transaction(
 		&self,
 		hash: &Block::Hash,
-	) -> Result<Option<<Block as BlockT>::Extrinsic>>;
+	) -> Result<Option<Vec<u8>>>;
 
-	/// Check if extrinsic exists.
-	fn have_extrinsic(&self, hash: &Block::Hash) -> Result<bool> {
-		Ok(self.extrinsic(hash)?.is_some())
+	/// Check if indexed transaction exists.
+	fn has_indexed_transaction(&self, hash: &Block::Hash) -> Result<bool> {
+		Ok(self.indexed_transaction(hash)?.is_some())
 	}
 }
 
diff --git a/substrate/primitives/database/Cargo.toml b/substrate/primitives/database/Cargo.toml
index 4062ba292352fd84f4b814977b5f14c11bafd5ee..aae7668b5ec807145a0a2c148267bf6e85ce3b77 100644
--- a/substrate/primitives/database/Cargo.toml
+++ b/substrate/primitives/database/Cargo.toml
@@ -13,3 +13,4 @@ readme = "README.md"
 [dependencies]
 parking_lot = "0.11.1"
 kvdb = "0.9.0"
+
diff --git a/substrate/primitives/database/src/kvdb.rs b/substrate/primitives/database/src/kvdb.rs
index b50ca53786f9f183927b4be874ce5322abbbf77e..d99fe6360ef7b7585c7c37d999be3865d6501b93 100644
--- a/substrate/primitives/database/src/kvdb.rs
+++ b/substrate/primitives/database/src/kvdb.rs
@@ -33,18 +33,73 @@ fn handle_err<T>(result: std::io::Result<T>) -> T {
 }
 
 /// Wrap RocksDb database into a trait object that implements `sp_database::Database`
-pub fn as_database<D: KeyValueDB + 'static, H: Clone>(db: D) -> std::sync::Arc<dyn Database<H>> {
+pub fn as_database<D, H>(db: D) -> std::sync::Arc<dyn Database<H>>
+	where D: KeyValueDB + 'static, H: Clone + AsRef<[u8]>
+{
 	std::sync::Arc::new(DbAdapter(db))
 }
 
-impl<D: KeyValueDB, H: Clone> Database<H> for DbAdapter<D> {
+impl <D: KeyValueDB> DbAdapter<D> {
+	// Returns counter key and counter value if it exists.
+	fn read_counter(&self, col: ColumnId, key: &[u8]) -> error::Result<(Vec<u8>, Option<u32>)> {
+		// Add a key suffix for the counter
+		let mut counter_key = key.to_vec();
+		counter_key.push(0);
+		Ok(match self.0.get(col, &counter_key).map_err(|e| error::DatabaseError(Box::new(e)))? {
+			Some(data) => {
+				let mut counter_data = [0; 4];
+				if data.len() != 4 {
+					return Err(error::DatabaseError(Box::new(
+								std::io::Error::new(std::io::ErrorKind::Other,
+									format!("Unexpected counter len {}", data.len())))
+					))
+				}
+				counter_data.copy_from_slice(&data);
+				let counter = u32::from_le_bytes(counter_data);
+				(counter_key, Some(counter))
+			},
+			None => (counter_key, None)
+		})
+	}
+}
+
+impl<D: KeyValueDB, H: Clone + AsRef<[u8]>> Database<H> for DbAdapter<D> {
 	fn commit(&self, transaction: Transaction<H>) -> error::Result<()> {
 		let mut tx = DBTransaction::new();
 		for change in transaction.0.into_iter() {
 			match change {
 				Change::Set(col, key, value) => tx.put_vec(col, &key, value),
 				Change::Remove(col, key) => tx.delete(col, &key),
-				_ => unimplemented!(),
+				Change::Store(col, key, value) => {
+					match self.read_counter(col, key.as_ref())? {
+						(counter_key, Some(mut counter)) => {
+							counter += 1;
+							tx.put(col, &counter_key, &counter.to_le_bytes());
+						},
+						(counter_key, None) => {
+							let d = 1u32.to_le_bytes();
+							tx.put(col, &counter_key, &d);
+							tx.put_vec(col, key.as_ref(), value);
+						},
+					}
+				}
+				Change::Reference(col, key) => {
+					if let (counter_key, Some(mut counter)) = self.read_counter(col, key.as_ref())? {
+						counter += 1;
+						tx.put(col, &counter_key, &counter.to_le_bytes());
+					}
+				}
+				Change::Release(col, key) => {
+					if let (counter_key, Some(mut counter)) = self.read_counter(col, key.as_ref())? {
+						counter -= 1;
+						if counter == 0 {
+							tx.delete(col, &counter_key);
+							tx.delete(col, key.as_ref());
+						} else {
+							tx.put(col, &counter_key, &counter.to_le_bytes());
+						}
+					}
+				}
 			}
 		}
 		self.0.write(tx).map_err(|e| error::DatabaseError(Box::new(e)))
@@ -54,7 +109,7 @@ impl<D: KeyValueDB, H: Clone> Database<H> for DbAdapter<D> {
 		handle_err(self.0.get(col, key))
 	}
 
-	fn lookup(&self, _hash: &H) -> Option<Vec<u8>> {
-		unimplemented!();
+	fn contains(&self, col: ColumnId, key: &[u8]) -> bool {
+		handle_err(self.0.has_key(col, key))
 	}
 }
diff --git a/substrate/primitives/database/src/lib.rs b/substrate/primitives/database/src/lib.rs
index 7107ea25c02c00a249f519c39df617ac2c2de9df..1fa0c8e49b01510c203d256d4b6e35bb79baa9f5 100644
--- a/substrate/primitives/database/src/lib.rs
+++ b/substrate/primitives/database/src/lib.rs
@@ -32,16 +32,9 @@ pub type ColumnId = u32;
 pub enum Change<H> {
 	Set(ColumnId, Vec<u8>, Vec<u8>),
 	Remove(ColumnId, Vec<u8>),
-	Store(H, Vec<u8>),
-	Release(H),
-}
-
-/// An alteration to the database that references the data.
-pub enum ChangeRef<'a, H> {
-	Set(ColumnId, &'a [u8], &'a [u8]),
-	Remove(ColumnId, &'a [u8]),
-	Store(H, &'a [u8]),
-	Release(H),
+	Store(ColumnId, H, Vec<u8>),
+	Reference(ColumnId, H),
+	Release(ColumnId, H),
 }
 
 /// A series of changes to the database that can be committed atomically. They do not take effect
@@ -67,49 +60,27 @@ impl<H> Transaction<H> {
 		self.0.push(Change::Remove(col, key.to_vec()))
 	}
 	/// Store the `preimage` of `hash` into the database, so that it may be looked up later with
-	/// `Database::lookup`. This may be called multiple times, but `Database::lookup` but subsequent
+	/// `Database::get`. This may be called multiple times, but subsequent
 	/// calls will ignore `preimage` and simply increase the number of references on `hash`.
-	pub fn store(&mut self, hash: H, preimage: &[u8]) {
-		self.0.push(Change::Store(hash, preimage.to_vec()))
+	pub fn store(&mut self, col: ColumnId, hash: H, preimage: Vec<u8>) {
+		self.0.push(Change::Store(col, hash, preimage))
+	}
+	/// Increase the number of references for `hash` in the database.
+	pub fn reference(&mut self, col: ColumnId, hash: H) {
+		self.0.push(Change::Reference(col, hash))
 	}
 	/// Release the preimage of `hash` from the database. An equal number of these to the number of
-	/// corresponding `store`s must have been given before it is legal for `Database::lookup` to
+	/// corresponding `store`s must have been given before it is legal for `Database::get` to
 	/// be unable to provide the preimage.
-	pub fn release(&mut self, hash: H) {
-		self.0.push(Change::Release(hash))
+	pub fn release(&mut self, col: ColumnId, hash: H) {
+		self.0.push(Change::Release(col, hash))
 	}
 }
 
-pub trait Database<H: Clone>: Send + Sync {
+pub trait Database<H: Clone + AsRef<[u8]>>: Send + Sync {
 	/// Commit the `transaction` to the database atomically. Any further calls to `get` or `lookup`
 	/// will reflect the new state.
-	fn commit(&self, transaction: Transaction<H>) -> error::Result<()> {
-		for change in transaction.0.into_iter() {
-			match change {
-				Change::Set(col, key, value) => self.set(col, &key, &value),
-				Change::Remove(col, key) => self.remove(col, &key),
-				Change::Store(hash, preimage) => self.store(&hash, &preimage),
-				Change::Release(hash) => self.release(&hash),
-			}?;
-		}
-
-		Ok(())
-	}
-
-	/// Commit the `transaction` to the database atomically. Any further calls to `get` or `lookup`
-	/// will reflect the new state.
-	fn commit_ref<'a>(&self, transaction: &mut dyn Iterator<Item=ChangeRef<'a, H>>) -> error::Result<()> {
-		let mut tx = Transaction::new();
-		for change in transaction {
-			match change {
-				ChangeRef::Set(col, key, value) => tx.set(col, key, value),
-				ChangeRef::Remove(col, key) => tx.remove(col, key),
-				ChangeRef::Store(hash, preimage) => tx.store(hash, preimage),
-				ChangeRef::Release(hash) => tx.release(hash),
-			}
-		}
-		self.commit(tx)
-	}
+	fn commit(&self, transaction: Transaction<H>) -> error::Result<()>;
 
 	/// Retrieve the value previously stored against `key` or `None` if
 	/// `key` is not currently in the database.
@@ -120,6 +91,11 @@ pub trait Database<H: Clone>: Send + Sync {
 		self.get(col, key).is_some()
 	}
 
+	/// Check value size in the database possibly without retrieving it.
+	fn value_size(&self, col: ColumnId, key: &[u8]) -> Option<usize> {
+		self.get(col, key).map(|v| v.len())
+	}
+
 	/// Call `f` with the value previously stored against `key`.
 	///
 	/// This may be faster than `get` since it doesn't allocate.
@@ -127,50 +103,6 @@ pub trait Database<H: Clone>: Send + Sync {
 	fn with_get(&self, col: ColumnId, key: &[u8], f: &mut dyn FnMut(&[u8])) {
 		self.get(col, key).map(|v| f(&v));
 	}
-
-	/// Set the value of `key` in `col` to `value`, replacing anything that is there currently.
-	fn set(&self, col: ColumnId, key: &[u8], value: &[u8]) -> error::Result<()> {
-		let mut t = Transaction::new();
-		t.set(col, key, value);
-		self.commit(t)
-	}
-	/// Remove the value of `key` in `col`.
-	fn remove(&self, col: ColumnId, key: &[u8]) -> error::Result<()> {
-		let mut t = Transaction::new();
-		t.remove(col, key);
-		self.commit(t)
-	}
-
-	/// Retrieve the first preimage previously `store`d for `hash` or `None` if no preimage is
-	/// currently stored.
-	fn lookup(&self, hash: &H) -> Option<Vec<u8>>;
-
-	/// Call `f` with the preimage stored for `hash` and return the result, or `None` if no preimage
-	/// is currently stored.
-	///
-	/// This may be faster than `lookup` since it doesn't allocate.
-	/// Use `with_lookup` helper function if you need `f` to return a value from `f`
-	fn with_lookup(&self, hash: &H, f: &mut dyn FnMut(&[u8])) {
-		self.lookup(hash).map(|v| f(&v));
-	}
-
-	/// Store the `preimage` of `hash` into the database, so that it may be looked up later with
-	/// `Database::lookup`. This may be called multiple times, but `Database::lookup` but subsequent
-	/// calls will ignore `preimage` and simply increase the number of references on `hash`.
-	fn store(&self, hash: &H, preimage: &[u8]) -> error::Result<()> {
-		let mut t = Transaction::new();
-		t.store(hash.clone(), preimage);
-		self.commit(t)
-	}
-
-	/// Release the preimage of `hash` from the database. An equal number of these to the number of
-	/// corresponding `store`s must have been given before it is legal for `Database::lookup` to
-	/// be unable to provide the preimage.
-	fn release(&self, hash: &H) -> error::Result<()> {
-		let mut t = Transaction::new();
-		t.release(hash.clone());
-		self.commit(t)
-	}
 }
 
 impl<H> std::fmt::Debug for dyn Database<H> {
@@ -183,20 +115,13 @@ impl<H> std::fmt::Debug for dyn Database<H> {
 /// `key` is not currently in the database.
 ///
 /// This may be faster than `get` since it doesn't allocate.
-pub fn with_get<R, H: Clone>(db: &dyn Database<H>, col: ColumnId, key: &[u8], mut f: impl FnMut(&[u8]) -> R) -> Option<R> {
+pub fn with_get<R, H: Clone + AsRef<[u8]>>(
+	db: &dyn Database<H>,
+	col: ColumnId,
+	key: &[u8], mut f: impl FnMut(&[u8]) -> R
+) -> Option<R> {
 	let mut result: Option<R> = None;
 	let mut adapter = |k: &_| { result = Some(f(k)); };
 	db.with_get(col, key, &mut adapter);
 	result
 }
-
-/// Call `f` with the preimage stored for `hash` and return the result, or `None` if no preimage
-/// is currently stored.
-///
-/// This may be faster than `lookup` since it doesn't allocate.
-pub fn with_lookup<R, H: Clone>(db: &dyn Database<H>, hash: &H, mut f: impl FnMut(&[u8]) -> R) -> Option<R> {
-	let mut result: Option<R> = None;
-	let mut adapter = |k: &_| { result = Some(f(k)); };
-	db.with_lookup(hash, &mut adapter);
-	result
-}
diff --git a/substrate/primitives/database/src/mem.rs b/substrate/primitives/database/src/mem.rs
index 41af2e2f235c05354181b5ce699a1586180226a0..24ddf03319711e84b626a276fc3beb0cd1574548 100644
--- a/substrate/primitives/database/src/mem.rs
+++ b/substrate/primitives/database/src/mem.rs
@@ -17,26 +17,41 @@
 
 //! In-memory implementation of `Database`
 
-use std::collections::HashMap;
+use std::collections::{HashMap, hash_map::Entry};
 use crate::{Database, Change, ColumnId, Transaction, error};
 use parking_lot::RwLock;
 
 #[derive(Default)]
 /// This implements `Database` as an in-memory hash map. `commit` is not atomic.
-pub struct MemDb<H: Clone + Send + Sync + Eq + PartialEq + Default + std::hash::Hash>
-	(RwLock<(HashMap<ColumnId, HashMap<Vec<u8>, Vec<u8>>>, HashMap<H, Vec<u8>>)>);
+pub struct MemDb(RwLock<HashMap<ColumnId, HashMap<Vec<u8>, (u32, Vec<u8>)>>>);
 
-impl<H> Database<H> for MemDb<H>
-	where H: Clone + Send + Sync + Eq + PartialEq + Default + std::hash::Hash
+impl<H> Database<H> for MemDb
+	where H: Clone + AsRef<[u8]>
 {
 	fn commit(&self, transaction: Transaction<H>) -> error::Result<()> {
 		let mut s = self.0.write();
 		for change in transaction.0.into_iter() {
 			match change {
-				Change::Set(col, key, value) => { s.0.entry(col).or_default().insert(key, value); },
-				Change::Remove(col, key) => { s.0.entry(col).or_default().remove(&key); },
-				Change::Store(hash, preimage) => { s.1.insert(hash, preimage); },
-				Change::Release(hash) => { s.1.remove(&hash); },
+				Change::Set(col, key, value) => { s.entry(col).or_default().insert(key, (1, value)); },
+				Change::Remove(col, key) => { s.entry(col).or_default().remove(&key); },
+				Change::Store(col, hash, value) => {
+					s.entry(col).or_default().entry(hash.as_ref().to_vec())
+						.and_modify(|(c, _)| *c += 1)
+						.or_insert_with(|| (1, value));
+				},
+				Change::Reference(col, hash) => {
+					if let Entry::Occupied(mut entry) = s.entry(col).or_default().entry(hash.as_ref().to_vec()) {
+						entry.get_mut().0 += 1;
+					}
+				}
+				Change::Release(col, hash) => {
+					if let Entry::Occupied(mut entry) = s.entry(col).or_default().entry(hash.as_ref().to_vec()) {
+						entry.get_mut().0 -= 1;
+						if entry.get().0 == 0 {
+							entry.remove();
+						}
+					}
+				}
 			}
 		}
 
@@ -45,18 +60,11 @@ impl<H> Database<H> for MemDb<H>
 
 	fn get(&self, col: ColumnId, key: &[u8]) -> Option<Vec<u8>> {
 		let s = self.0.read();
-		s.0.get(&col).and_then(|c| c.get(key).cloned())
-	}
-
-	fn lookup(&self, hash: &H) -> Option<Vec<u8>> {
-		let s = self.0.read();
-		s.1.get(hash).cloned()
+		s.get(&col).and_then(|c| c.get(key).map(|(_, v)| v.clone()))
 	}
 }
 
-impl<H> MemDb<H>
-	where H: Clone + Send + Sync + Eq + PartialEq + Default + std::hash::Hash
-{
+impl MemDb {
 	/// Create a new instance
 	pub fn new() -> Self {
 		MemDb::default()
@@ -65,7 +73,7 @@ impl<H> MemDb<H>
 	/// Count number of values in a column
 	pub fn count(&self, col: ColumnId) -> usize {
 		let s = self.0.read();
-		s.0.get(&col).map(|c| c.len()).unwrap_or(0)
+		s.get(&col).map(|c| c.len()).unwrap_or(0)
 	}
 }
 
diff --git a/substrate/primitives/externalities/src/lib.rs b/substrate/primitives/externalities/src/lib.rs
index 3ee37f5e31b93bb52ea11950b98744ab60343071..1077f41048d59cedde808c267c092d1f1054bbe1 100644
--- a/substrate/primitives/externalities/src/lib.rs
+++ b/substrate/primitives/externalities/src/lib.rs
@@ -228,6 +228,16 @@ pub trait Externalities: ExtensionStore {
 	/// no transaction is open that can be closed.
 	fn storage_commit_transaction(&mut self) -> Result<(), ()>;
 
+	/// Index specified transaction slice and store it.
+	fn storage_index_transaction(&mut self, _index: u32, _offset: u32) {
+		unimplemented!("storage_index_transaction");
+	}
+
+	/// Renew existing piece of transaction storage.
+	fn storage_renew_transaction_index(&mut self, _index: u32, _hash: &[u8], _size: u32) {
+		unimplemented!("storage_renew_transaction_index");
+	}
+
 	/// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 	/// Benchmarking related functionality and shouldn't be used anywhere else!
 	/// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
diff --git a/substrate/primitives/state-machine/src/ext.rs b/substrate/primitives/state-machine/src/ext.rs
index 7907cda6fb4e75205c47905c4547328c033198f5..65b7b638a9a2e47329b7b208b8fe7cc26361a417 100644
--- a/substrate/primitives/state-machine/src/ext.rs
+++ b/substrate/primitives/state-machine/src/ext.rs
@@ -18,7 +18,7 @@
 //! Concrete externalities implementation.
 
 use crate::{
-	StorageKey, StorageValue, OverlayedChanges,
+	StorageKey, StorageValue, OverlayedChanges, IndexOperation,
 	backend::Backend, overlayed_changes::OverlayedExtensions,
 };
 use hash_db::Hasher;
@@ -568,6 +568,36 @@ where
 		}
 	}
 
+	fn storage_index_transaction(&mut self, index: u32, offset: u32) {
+		trace!(
+			target: "state",
+			"{:04x}: IndexTransaction ({}): [{}..]",
+			self.id,
+			index,
+			offset,
+		);
+		self.overlay.add_transaction_index(IndexOperation::Insert {
+			extrinsic: index,
+			offset,
+		});
+	}
+
+	/// Renew existing piece of data storage.
+	fn storage_renew_transaction_index(&mut self, index: u32, hash: &[u8], size: u32) {
+		trace!(
+			target: "state",
+			"{:04x}: RenewTransactionIndex ({}) {} bytes",
+			self.id,
+			HexDisplay::from(&hash),
+			size,
+		);
+		self.overlay.add_transaction_index(IndexOperation::Renew {
+			extrinsic: index,
+			hash: hash.to_vec(),
+			size
+		});
+	}
+
 	#[cfg(not(feature = "std"))]
 	fn storage_changes_root(&mut self, _parent_hash: &[u8]) -> Result<Option<Vec<u8>>, ()> {
 		Ok(None)
diff --git a/substrate/primitives/state-machine/src/lib.rs b/substrate/primitives/state-machine/src/lib.rs
index 0167633d480702eaff58a0177c3b92db10d9b3a5..0a664840df850cb2250e57375041d4e7a2ce4704 100644
--- a/substrate/primitives/state-machine/src/lib.rs
+++ b/substrate/primitives/state-machine/src/lib.rs
@@ -121,6 +121,7 @@ pub use crate::overlayed_changes::{
 	StorageChanges, StorageTransactionCache,
 	OffchainChangesCollection,
 	OffchainOverlayedChanges,
+	IndexOperation,
 };
 pub use crate::backend::Backend;
 pub use crate::trie_backend_essence::{TrieBackendStorage, Storage};
diff --git a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
index 285bf2a73a1489647127cfc6578e217b0f341efa..1d3cbb59ba0c1ec62ee1993af179c4c93369dbec 100644
--- a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
+++ b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
@@ -103,12 +103,35 @@ pub struct OverlayedChanges {
 	children: Map<StorageKey, (OverlayedChangeSet, ChildInfo)>,
 	/// Offchain related changes.
 	offchain: OffchainOverlayedChanges,
+	/// Transaction index changes,
+	transaction_index_ops: Vec<IndexOperation>,
 	/// True if extrinsics stats must be collected.
 	collect_extrinsics: bool,
 	/// Collect statistic on this execution.
 	stats: StateMachineStats,
 }
 
+/// Transcation index operation.
+#[derive(Debug, Clone)]
+pub enum IndexOperation {
+	/// Insert transaction into index.
+	Insert {
+		/// Extrinsic index in the current block.
+		extrinsic: u32,
+		/// Data offset in the extrinsic.
+		offset: u32,
+	},
+	/// Renew existing transaction storage.
+	Renew {
+		/// Extrinsic index in the current block.
+		extrinsic: u32,
+		/// Referenced index hash.
+		hash: Vec<u8>,
+		/// Expected data size.
+		size: u32,
+	}
+}
+
 /// A storage changes structure that can be generated by the data collected in [`OverlayedChanges`].
 ///
 /// This contains all the changes to the storage and transactions to apply theses changes to the
@@ -137,6 +160,10 @@ pub struct StorageChanges<Transaction, H: Hasher, N: BlockNumber> {
 	/// Phantom data for block number until change trie support no_std.
 	#[cfg(not(feature = "std"))]
 	pub _ph: sp_std::marker::PhantomData<N>,
+
+	/// Changes to the transaction index,
+	#[cfg(feature = "std")]
+	pub transaction_index_changes: Vec<IndexOperation>,
 }
 
 #[cfg(feature = "std")]
@@ -149,6 +176,7 @@ impl<Transaction, H: Hasher, N: BlockNumber> StorageChanges<Transaction, H, N> {
 		Transaction,
 		H::Out,
 		Option<ChangesTrieTransaction<H, N>>,
+		Vec<IndexOperation>,
 	) {
 		(
 			self.main_storage_changes,
@@ -157,6 +185,7 @@ impl<Transaction, H: Hasher, N: BlockNumber> StorageChanges<Transaction, H, N> {
 			self.transaction,
 			self.transaction_storage_root,
 			self.changes_trie_transaction,
+			self.transaction_index_changes,
 		)
 	}
 }
@@ -214,6 +243,8 @@ impl<Transaction: Default, H: Hasher, N: BlockNumber> Default for StorageChanges
 			changes_trie_transaction: None,
 			#[cfg(not(feature = "std"))]
 			_ph: Default::default(),
+			#[cfg(feature = "std")]
+			transaction_index_changes: Default::default(),
 		}
 	}
 }
@@ -543,6 +574,9 @@ impl OverlayedChanges {
 		let (main_storage_changes, child_storage_changes) = self.drain_committed();
 		let offchain_storage_changes = self.offchain_drain_committed().collect();
 
+		#[cfg(feature = "std")]
+		let transaction_index_changes = std::mem::take(&mut self.transaction_index_ops);
+
 		Ok(StorageChanges {
 			main_storage_changes: main_storage_changes.collect(),
 			child_storage_changes: child_storage_changes.map(|(sk, it)| (sk, it.0.collect())).collect(),
@@ -551,6 +585,8 @@ impl OverlayedChanges {
 			transaction_storage_root,
 			#[cfg(feature = "std")]
 			changes_trie_transaction,
+			#[cfg(feature = "std")]
+			transaction_index_changes,
 			#[cfg(not(feature = "std"))]
 			_ph: Default::default(),
 		})
@@ -666,6 +702,11 @@ impl OverlayedChanges {
 			None => self.offchain.remove(STORAGE_PREFIX, key),
 		}
 	}
+
+	/// Add transaction index operation.
+	pub fn add_transaction_index(&mut self, op: IndexOperation) {
+		self.transaction_index_ops.push(op)
+	}
 }
 
 #[cfg(feature = "std")]