diff --git a/substrate/core/client/db/src/lib.rs b/substrate/core/client/db/src/lib.rs
index f663cc0834d33e8b893e3c72fdcf1efaeb6e9160..944540b98b0931635c79322de01da9220673d290 100644
--- a/substrate/core/client/db/src/lib.rs
+++ b/substrate/core/client/db/src/lib.rs
@@ -430,7 +430,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
 	}
 
 	/// Prune obsolete changes tries.
-	pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor<Block>) {
+	pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor<Block>) {	
 		// never prune on archive nodes
 		let min_blocks_to_keep = match self.min_blocks_to_keep {
 			Some(min_blocks_to_keep) => min_blocks_to_keep,
@@ -456,6 +456,23 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
 	}
 }
 
+impl<Block: BlockT> client::backend::PrunableStateChangesTrieStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
+	fn oldest_changes_trie_block(
+		&self,
+		config: &ChangesTrieConfiguration,
+		best_finalized_block: u64
+	) -> u64 {
+		match self.min_blocks_to_keep {
+			Some(min_blocks_to_keep) => state_machine::oldest_non_pruned_changes_trie(
+				config,
+				min_blocks_to_keep,
+				best_finalized_block,
+			),
+			None => 1,
+		}
+	}
+}
+
 impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
 	fn root(&self, anchor: &state_machine::ChangesTrieAnchorBlockId<H256>, block: u64) -> Result<Option<H256>, String> {
 		// check API requirement
diff --git a/substrate/core/client/src/backend.rs b/substrate/core/client/src/backend.rs
index 6e65f289ad8cace2a5a9d58d3b10bc1f434c2d55..b4040f59a9fa497515696623cfac1e4944b0bbd4 100644
--- a/substrate/core/client/src/backend.rs
+++ b/substrate/core/client/src/backend.rs
@@ -17,6 +17,7 @@
 //! Substrate Client data backend
 
 use crate::error;
+use primitives::ChangesTrieConfiguration;
 use runtime_primitives::{generic::BlockId, Justification, StorageMap, ChildrenStorageMap};
 use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, NumberFor};
 use state_machine::backend::Backend as StateBackend;
@@ -113,7 +114,7 @@ pub trait Backend<Block, H>: AuxStore + Send + Sync where
 	/// Associated state backend type.
 	type State: StateBackend<H>;
 	/// Changes trie storage.
-	type ChangesTrieStorage: StateChangesTrieStorage<H>;
+	type ChangesTrieStorage: PrunableStateChangesTrieStorage<H>;
 
 	/// Begin a new block insertion transaction with given parent block id.
 	/// When constructing the genesis, this is called with all-zero hash.
@@ -154,6 +155,12 @@ pub trait Backend<Block, H>: AuxStore + Send + Sync where
 	}
 }
 
+/// Changes trie storage that supports pruning.
+pub trait PrunableStateChangesTrieStorage<H: Hasher>: StateChangesTrieStorage<H> {
+	/// Get number block of oldest, non-pruned changes trie.
+	fn oldest_changes_trie_block(&self, config: &ChangesTrieConfiguration, best_finalized: u64) -> u64;
+}
+
 /// Mark for all Backend implementations, that are making use of state data, stored locally.
 pub trait LocalBackend<Block, H>: Backend<Block, H>
 where
diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs
index f365c61c1fa28a36702dc8f540d62009c32aa945..010912e75f9276fdcac51d003105c32bb36be962 100644
--- a/substrate/core/client/src/client.rs
+++ b/substrate/core/client/src/client.rs
@@ -42,7 +42,7 @@ use state_machine::{
 	key_changes, key_changes_proof, OverlayedChanges
 };
 
-use crate::backend::{self, BlockImportOperation};
+use crate::backend::{self, BlockImportOperation, PrunableStateChangesTrieStorage};
 use crate::blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend};
 use crate::call_executor::{CallExecutor, LocalCallExecutor};
 use executor::{RuntimeVersion, RuntimeInfo};
@@ -355,35 +355,54 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
 		Ok((header, proof))
 	}
 
+	/// Get longest range within [first; last] that is possible to use in `key_changes`
+	/// and `key_changes_proof` calls.
+	/// Range could be shortened from the beginning if some changes tries have been pruned.
+	/// Returns Ok(None) if changes trues are not supported.
+	pub fn max_key_changes_range(
+		&self,
+		first: NumberFor<Block>,
+		last: BlockId<Block>,
+	) -> error::Result<Option<(NumberFor<Block>, BlockId<Block>)>> {
+		let (config, storage) = match self.require_changes_trie().ok() {
+			Some((config, storage)) => (config, storage),
+			None => return Ok(None),
+		};
+ 		let first = first.as_();
+		let last_num = self.backend.blockchain().expect_block_number_from_id(&last)?.as_();
+		if first > last_num {
+			return Err(error::ErrorKind::ChangesTrieAccessFailed("Invalid changes trie range".into()).into());
+		}
+ 		let finalized_number = self.backend.blockchain().info()?.finalized_number;
+		let oldest = storage.oldest_changes_trie_block(&config, finalized_number.as_());
+		let first = As::sa(::std::cmp::max(first, oldest));
+		Ok(Some((first, last)))
+	}
+
 	/// Get pairs of (block, extrinsic) where key has been changed at given blocks range.
 	/// Works only for runtimes that are supporting changes tries.
 	pub fn key_changes(
 		&self,
-		first: Block::Hash,
-		last: Block::Hash,
-		key: &[u8]
+		first: NumberFor<Block>,
+		last: BlockId<Block>,
+		key: &StorageKey
 	) -> error::Result<Vec<(NumberFor<Block>, u32)>> {
-		let config = self.changes_trie_config()?;
-		let storage = self.backend.changes_trie_storage();
-		let (config, storage) = match (config, storage) {
-			(Some(config), Some(storage)) => (config, storage),
-			_ => return Err(error::ErrorKind::ChangesTriesNotSupported.into()),
-		};
+		let (config, storage) = self.require_changes_trie()?;
+		let last_number = self.backend.blockchain().expect_block_number_from_id(&last)?.as_();
+		let last_hash = self.backend.blockchain().expect_block_hash_from_id(&last)?;
 
-		let first_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(first))?.as_();
-		let last_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(last))?.as_();
 		key_changes::<_, Blake2Hasher>(
 			&config,
-			storage,
-			first_number,
+			&*storage,
+			first.as_(),
 			&ChangesTrieAnchorBlockId {
-				hash: convert_hash(&last),
+				hash: convert_hash(&last_hash),
 				number: last_number,
 			},
 			self.backend.blockchain().info()?.best_number.as_(),
-			key)
+			&key.0)
+		.and_then(|r| r.map(|r| r.map(|(block, tx)| (As::sa(block), tx))).collect::<Result<_, _>>())
 		.map_err(|err| error::ErrorKind::ChangesTrieAccessFailed(err).into())
-		.map(|r| r.into_iter().map(|(b, e)| (As::sa(b), e)).collect())
 	}
 
 	/// Get proof for computation of (block, extrinsic) pairs where key has been changed at given blocks range.
@@ -398,7 +417,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
 		last: Block::Hash,
 		min: Block::Hash,
 		max: Block::Hash,
-		key: &[u8]
+		key: &StorageKey
 	) -> error::Result<ChangesProof<Block::Header>> {
 		self.key_changes_proof_with_cht_size(
 			first,
@@ -417,7 +436,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
 		last: Block::Hash,
 		min: Block::Hash,
 		max: Block::Hash,
-		key: &[u8],
+		key: &StorageKey,
 		cht_size: u64,
 	) -> error::Result<ChangesProof<Block::Header>> {
 		struct AccessedRootsRecorder<'a, Block: BlockT> {
@@ -447,14 +466,9 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
 			}
 		}
 
-		let config = self.changes_trie_config()?;
-		let storage = self.backend.changes_trie_storage();
-		let (config, storage) = match (config, storage) {
-			(Some(config), Some(storage)) => (config, storage),
-			_ => return Err(error::ErrorKind::ChangesTriesNotSupported.into()),
-		};
-
+		let (config, storage) = self.require_changes_trie()?;
 		let min_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(min))?;
+
 		let recording_storage = AccessedRootsRecorder::<Block> {
 			storage,
 			min: min_number.as_(),
@@ -478,7 +492,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
 				number: last_number,
 			},
 			max_number.as_(),
-			key
+			&key.0
 		)
 		.map_err(|err| error::Error::from(error::ErrorKind::ChangesTrieAccessFailed(err)))?;
 
@@ -528,6 +542,16 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
 		Ok(proof)
 	}
 
+	/// Returns changes trie configuration and storage or an error if it is not supported.
+	fn require_changes_trie(&self) -> error::Result<(ChangesTrieConfiguration, &B::ChangesTrieStorage)> {
+		let config = self.changes_trie_config()?;
+		let storage = self.backend.changes_trie_storage();
+		match (config, storage) {
+			(Some(config), Some(storage)) => Ok((config, storage)),
+			_ => Err(error::ErrorKind::ChangesTriesNotSupported.into()),
+		}
+	}
+
 	/// Create a new block, built on the head of the chain.
 	pub fn new_block<InherentData>(
 		&self
@@ -1713,9 +1737,8 @@ pub(crate) mod tests {
 		let (client, _, test_cases) = prepare_client_with_key_changes();
 
 		for (index, (begin, end, key, expected_result)) in test_cases.into_iter().enumerate() {
-			let begin = client.block_hash(begin).unwrap().unwrap();
 			let end = client.block_hash(end).unwrap().unwrap();
-			let actual_result = client.key_changes(begin, end, &key).unwrap();
+			let actual_result = client.key_changes(begin, BlockId::Hash(end), &StorageKey(key)).unwrap();
 			match actual_result == expected_result {
 				true => (),
 				false => panic!(format!("Failed test {}: actual = {:?}, expected = {:?}",
diff --git a/substrate/core/client/src/in_mem.rs b/substrate/core/client/src/in_mem.rs
index 114849ca6c1d77ded479674acd3708b0a4252638..d5065d764139b436827d7a8b942caeab7b515708 100644
--- a/substrate/core/client/src/in_mem.rs
+++ b/substrate/core/client/src/in_mem.rs
@@ -22,14 +22,14 @@ use parking_lot::RwLock;
 use crate::error;
 use crate::backend::{self, NewBlockState};
 use crate::light;
-use primitives::storage::well_known_keys;
+use primitives::{ChangesTrieConfiguration, storage::well_known_keys};
 use runtime_primitives::generic::BlockId;
 use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero,
 	NumberFor, As, Digest, DigestItem, AuthorityIdFor};
 use runtime_primitives::{Justification, StorageMap, ChildrenStorageMap};
 use crate::blockchain::{self, BlockStatus, HeaderBackend};
 use state_machine::backend::{Backend as StateBackend, InMemory, Consolidate};
-use state_machine::InMemoryChangesTrieStorage;
+use state_machine::{self, InMemoryChangesTrieStorage, ChangesTrieAnchorBlockId};
 use hash_db::Hasher;
 use heapsize::HeapSizeOf;
 use crate::leaves::LeafSet;
@@ -505,7 +505,7 @@ where
 	H::Out: HeapSizeOf + Ord,
 {
 	states: RwLock<HashMap<Block::Hash, InMemory<H>>>,
-	changes_trie_storage: InMemoryChangesTrieStorage<H>,
+	changes_trie_storage: ChangesTrieStorage<H>,
 	blockchain: Blockchain<Block>,
 }
 
@@ -519,7 +519,7 @@ where
 	pub fn new() -> Backend<Block, H> {
 		Backend {
 			states: RwLock::new(HashMap::new()),
-			changes_trie_storage: InMemoryChangesTrieStorage::new(),
+			changes_trie_storage: ChangesTrieStorage(InMemoryChangesTrieStorage::new()),
 			blockchain: Blockchain::new(),
 		}
 	}
@@ -555,7 +555,7 @@ where
 	type BlockImportOperation = BlockImportOperation<Block, H>;
 	type Blockchain = Blockchain<Block>;
 	type State = InMemory<H>;
-	type ChangesTrieStorage = InMemoryChangesTrieStorage<H>;
+	type ChangesTrieStorage = ChangesTrieStorage<H>;
 
 	fn begin_operation(&self, block: BlockId<Block>) -> error::Result<Self::BlockImportOperation> {
 		let state = match block {
@@ -587,7 +587,7 @@ where
 			if let Some(changes_trie_root) = changes_trie_root {
 				if let Some(changes_trie_update) = operation.changes_trie_update {
 					let changes_trie_root: H::Out = changes_trie_root.into();
-					self.changes_trie_storage.insert(header.number().as_(), changes_trie_root, changes_trie_update);
+					self.changes_trie_storage.0.insert(header.number().as_(), changes_trie_root, changes_trie_update);
 				}
 			}
 
@@ -652,6 +652,26 @@ impl<Block: BlockT> blockchain::Cache<Block> for Cache<Block> {
 	}
 }
 
+/// Prunable in-memory changes trie storage.
+pub struct ChangesTrieStorage<H: Hasher>(InMemoryChangesTrieStorage<H>) where H::Out: HeapSizeOf;
+impl<H: Hasher> backend::PrunableStateChangesTrieStorage<H> for ChangesTrieStorage<H> where H::Out: HeapSizeOf {
+	fn oldest_changes_trie_block(&self, _config: &ChangesTrieConfiguration, _best_finalized: u64) -> u64 {
+		0
+	}
+}
+
+impl<H: Hasher> state_machine::ChangesTrieRootsStorage<H> for ChangesTrieStorage<H> where H::Out: HeapSizeOf {
+	fn root(&self, anchor: &ChangesTrieAnchorBlockId<H::Out>, block: u64) -> Result<Option<H::Out>, String> {
+		self.0.root(anchor, block)
+	}
+}
+
+impl<H: Hasher> state_machine::ChangesTrieStorage<H> for ChangesTrieStorage<H> where H::Out: HeapSizeOf {
+	fn get(&self, key: &H::Out) -> Result<Option<state_machine::DBValue>, String> {
+		self.0.get(key)
+	}
+}
+
 /// Insert authorities entry into in-memory blockchain cache. Extracted as a separate function to use it in tests.
 pub fn cache_authorities_at<Block: BlockT>(
 	blockchain: &Blockchain<Block>,
diff --git a/substrate/core/client/src/light/backend.rs b/substrate/core/client/src/light/backend.rs
index 7485b8cdf35920a7b25ce8cdc859761a0f4339f5..c144ffa52036fca58156eac7648b5554055273d8 100644
--- a/substrate/core/client/src/light/backend.rs
+++ b/substrate/core/client/src/light/backend.rs
@@ -22,9 +22,8 @@ use futures::{Future, IntoFuture};
 use parking_lot::RwLock;
 
 use runtime_primitives::{generic::BlockId, Justification, StorageMap, ChildrenStorageMap};
-use state_machine::{Backend as StateBackend, InMemoryChangesTrieStorage, TrieBackend};
+use state_machine::{Backend as StateBackend, TrieBackend};
 use runtime_primitives::traits::{Block as BlockT, NumberFor, AuthorityIdFor};
-
 use crate::in_mem;
 use crate::backend::{AuxStore, Backend as ClientBackend, BlockImportOperation, RemoteBackend, NewBlockState};
 use crate::blockchain::HeaderBackend as BlockchainHeaderBackend;
@@ -95,7 +94,7 @@ impl<S, F, Block, H> ClientBackend<Block, H> for Backend<S, F> where
 	type BlockImportOperation = ImportOperation<Block, S, F>;
 	type Blockchain = Blockchain<S, F>;
 	type State = OnDemandState<Block, S, F>;
-	type ChangesTrieStorage = InMemoryChangesTrieStorage<H>;
+	type ChangesTrieStorage = in_mem::ChangesTrieStorage<H>;
 
 	fn begin_operation(&self, _block: BlockId<Block>) -> ClientResult<Self::BlockImportOperation> {
 		Ok(ImportOperation {
diff --git a/substrate/core/client/src/light/fetcher.rs b/substrate/core/client/src/light/fetcher.rs
index 3e98a063ac310854bfe793507fbde4aa8cf726cd..49c7a5a5f20e20e7c8db1dd3eb31cfe7417996fc 100644
--- a/substrate/core/client/src/light/fetcher.rs
+++ b/substrate/core/client/src/light/fetcher.rs
@@ -403,7 +403,7 @@ pub mod tests {
 		RemoteCallRequest, RemoteHeaderRequest};
 	use crate::light::blockchain::tests::{DummyStorage, DummyBlockchain};
 	use primitives::{twox_128, Blake2Hasher};
-	use primitives::storage::well_known_keys;
+	use primitives::storage::{StorageKey, well_known_keys};
 	use runtime_primitives::generic::BlockId;
 	use state_machine::Backend;
 	use super::*;
@@ -546,6 +546,7 @@ pub mod tests {
 			let end_hash = remote_client.block_hash(end).unwrap().unwrap();
 
 			// 'fetch' changes proof from remote node
+			let key = StorageKey(key);
 			let remote_proof = remote_client.key_changes_proof(
 				begin_hash, end_hash, begin_hash, max_hash, &key
 			).unwrap();
@@ -558,7 +559,7 @@ pub mod tests {
 				last_block: (end, end_hash),
 				max_block: (max, max_hash),
 				tries_roots: (begin, begin_hash, local_roots_range),
-				key: key,
+				key: key.0,
 				retry_count: None,
 			};
 			let local_result = local_checker.check_changes_proof(&request, ChangesProof {
@@ -583,6 +584,7 @@ pub mod tests {
 		// (1, 4, dave.clone(), vec![(4, 0), (1, 1), (1, 0)]),
 		let (remote_client, remote_roots, _) = prepare_client_with_key_changes();
 		let dave = twox_128(&runtime::system::balance_of_key(Keyring::Dave.to_raw_public().into())).to_vec();
+		let dave = StorageKey(dave);
 
 		// 'fetch' changes proof from remote node:
 		// we're fetching changes for range b1..b4
@@ -611,7 +613,7 @@ pub mod tests {
 			last_block: (4, b4),
 			max_block: (4, b4),
 			tries_roots: (3, b3, vec![remote_roots[2].clone(), remote_roots[3].clone()]),
-			key: dave,
+			key: dave.0,
 			retry_count: None,
 		};
 		let local_result = local_checker.check_changes_proof_with_cht_size(&request, ChangesProof {
@@ -640,6 +642,7 @@ pub mod tests {
 		let end_hash = remote_client.block_hash(end).unwrap().unwrap();
 
 		// 'fetch' changes proof from remote node
+		let key = StorageKey(key);
 		let remote_proof = remote_client.key_changes_proof(
 			begin_hash, end_hash, begin_hash, max_hash, &key).unwrap();
 
@@ -650,7 +653,7 @@ pub mod tests {
 			last_block: (end, end_hash),
 			max_block: (max, max_hash),
 			tries_roots: (begin, begin_hash, local_roots_range.clone()),
-			key: key,
+			key: key.0,
 			retry_count: None,
 		};
 
@@ -693,6 +696,7 @@ pub mod tests {
 		let local_cht_root = cht::compute_root::<Header, Blake2Hasher, _>(
 			4, 0, remote_roots.iter().cloned().map(|ct| Ok(Some(ct)))).unwrap();
 		let dave = twox_128(&runtime::system::balance_of_key(Keyring::Dave.to_raw_public().into())).to_vec();
+		let dave = StorageKey(dave);
 
 		// 'fetch' changes proof from remote node:
 		// we're fetching changes for range b1..b4
diff --git a/substrate/core/network/src/chain.rs b/substrate/core/network/src/chain.rs
index 457f8852b2cb66f8a6d83d5e6e9327a4298538a1..40edbfbadb9fe256e0c149deb23d97164842791a 100644
--- a/substrate/core/network/src/chain.rs
+++ b/substrate/core/network/src/chain.rs
@@ -24,7 +24,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, AuthorityId
 use runtime_primitives::generic::{BlockId};
 use consensus::{ImportBlock, ImportResult};
 use runtime_primitives::Justification;
-use primitives::{H256, Blake2Hasher};
+use primitives::{H256, Blake2Hasher, storage::StorageKey};
 
 /// Local client abstraction for the network.
 pub trait Client<Block: BlockT>: Send + Sync {
@@ -66,7 +66,7 @@ pub trait Client<Block: BlockT>: Send + Sync {
 		last: Block::Hash,
 		min: Block::Hash,
 		max: Block::Hash,
-		key: &[u8]
+		key: &StorageKey
 	) -> Result<ChangesProof<Block::Header>, Error>;
 }
 
@@ -125,7 +125,7 @@ impl<B, E, Block, RA> Client<Block> for SubstrateClient<B, E, Block, RA> where
 		last: Block::Hash,
 		min: Block::Hash,
 		max: Block::Hash,
-		key: &[u8]
+		key: &StorageKey
 	) -> Result<ChangesProof<Block::Header>, Error> {
 		(self as &SubstrateClient<B, E, Block, RA>).key_changes_proof(first, last, min, max, key)
 	}
diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs
index 41612c788d7c4795b571a7077ca27cadd5584348..31891450e1e8a23b8a809e2accb19dbf5f5f1559 100644
--- a/substrate/core/network/src/protocol.rs
+++ b/substrate/core/network/src/protocol.rs
@@ -22,6 +22,7 @@ use parking_lot::RwLock;
 use rustc_hex::ToHex;
 use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, As, Zero};
 use runtime_primitives::generic::BlockId;
+use primitives::storage::StorageKey;
 use network_libp2p::{NodeIndex, Severity};
 use codec::{Encode, Decode};
 use consensus::import_queue::ImportQueue;
@@ -712,11 +713,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 	fn on_remote_changes_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteChangesRequest<B::Hash>) {
 		trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})",
 			request.id, who, request.key.to_hex(), request.first, request.last);
-		let proof = match self.context_data.chain.key_changes_proof(request.first, request.last, request.min, request.max, &request.key) {
+		let key = StorageKey(request.key);
+		let proof = match self.context_data.chain.key_changes_proof(request.first, request.last, request.min, request.max, &key) {
 			Ok(proof) => proof,
 			Err(error) => {
 				trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{}) failed with: {}",
-					request.id, who, request.key.to_hex(), request.first, request.last, error);
+					request.id, who, key.0.to_hex(), request.first, request.last, error);
 				ChangesProof::<B::Header> {
 					max_block: Zero::zero(),
 					proof: vec![],
diff --git a/substrate/core/rpc/src/state/mod.rs b/substrate/core/rpc/src/state/mod.rs
index 72175ca5df9acac6304e7df23a50dcc5f318d1ea..49dc28137a88cb0d8dfbf8fcfb2242be7dd55e21 100644
--- a/substrate/core/rpc/src/state/mod.rs
+++ b/substrate/core/rpc/src/state/mod.rs
@@ -17,7 +17,8 @@
 //! Substrate state API.
 
 use std::{
-	collections::HashMap,
+	collections::{BTreeMap, HashMap},
+	ops::Range,
 	sync::Arc,
 };
 
@@ -31,7 +32,7 @@ use primitives::storage::{self, StorageKey, StorageData, StorageChangeSet};
 use rpc::Result as RpcResult;
 use rpc::futures::{stream, Future, Sink, Stream};
 use runtime_primitives::generic::BlockId;
-use runtime_primitives::traits::{Block as BlockT, Header, ProvideRuntimeApi};
+use runtime_primitives::traits::{Block as BlockT, Header, ProvideRuntimeApi, As, NumberFor};
 use runtime_version::RuntimeVersion;
 
 use subscriptions::Subscriptions;
@@ -112,7 +113,25 @@ pub struct State<B, E, Block: BlockT, RA> {
 	subscriptions: Subscriptions,
 }
 
-impl<B, E, Block: BlockT, RA> State<B, E, Block, RA> {
+/// Ranges to query in state_queryStorage.
+struct QueryStorageRange<Block: BlockT> {
+	/// Hashes of all the blocks in the range.
+	pub hashes: Vec<Block::Hash>,
+	/// Number of the first block in the range.
+	pub first_number: NumberFor<Block>,
+	/// Blocks subrange ([begin; end) indices within `hashes`) where we should read keys at
+	/// each state to get changes.
+	pub unfiltered_range: Range<usize>,
+	/// Blocks subrange ([begin; end) indices within `hashes`) where we could pre-filter
+	/// blocks-with-changes by using changes tries.
+	pub filtered_range: Option<Range<usize>>,
+}
+
+impl<B, E, Block: BlockT, RA> State<B, E, Block, RA> where
+	Block: BlockT<Hash=H256>,
+	B: client::backend::Backend<Block, Blake2Hasher>,
+	E: CallExecutor<Block, Blake2Hasher>,
+{
 	/// Create new State API RPC handler.
 	pub fn new(client: Arc<Client<B, E, Block, RA>>, subscriptions: Subscriptions) -> Self {
 		Self {
@@ -120,6 +139,128 @@ impl<B, E, Block: BlockT, RA> State<B, E, Block, RA> {
 			subscriptions,
 		}
 	}
+
+	/// Splits the `query_storage` block range into 'filtered' and 'unfiltered' subranges.
+	/// Blocks that contain changes within filtered subrange could be filtered using changes tries.
+	/// Blocks that contain changes within unfiltered subrange must be filtered manually.
+	fn split_query_storage_range(
+		&self,
+		from: Block::Hash,
+		to: Trailing<Block::Hash>
+	) -> Result<QueryStorageRange<Block>> {
+		let to = self.unwrap_or_best(to)?;
+		let from_hdr = self.client.header(&BlockId::hash(from))?;
+		let to_hdr = self.client.header(&BlockId::hash(to))?;
+		match (from_hdr, to_hdr) {
+			(Some(ref from), Some(ref to)) if from.number() <= to.number() => {
+				// check if we can get from `to` to `from` by going through parent_hashes.
+				let from_number = *from.number();
+				let blocks = {
+					let mut blocks = vec![to.hash()];
+					let mut last = to.clone();
+					while *last.number() > from_number {
+						if let Some(hdr) = self.client.header(&BlockId::hash(*last.parent_hash()))? {
+							blocks.push(hdr.hash());
+							last = hdr;
+						} else {
+							bail!(invalid_block_range(
+								Some(from),
+								Some(to),
+								format!("Parent of {} ({}) not found", last.number(), last.hash()),
+							))
+						}
+					}
+					if last.hash() != from.hash() {
+						bail!(invalid_block_range(
+							Some(from),
+							Some(to),
+							format!("Expected to reach `from`, got {} ({})", last.number(), last.hash()),
+						))
+					}
+					blocks.reverse();
+					blocks
+				};
+				// check if we can filter blocks-with-changes from some (sub)range using changes tries
+				let changes_trie_range = self.client.max_key_changes_range(from_number, BlockId::Hash(to.hash()))?;
+				let filtered_range_begin = changes_trie_range.map(|(begin, _)| (begin - from_number).as_() as usize);
+				let (unfiltered_range, filtered_range) = split_range(blocks.len(), filtered_range_begin);
+				Ok(QueryStorageRange {
+					hashes: blocks,
+					first_number: from_number,
+					unfiltered_range,
+					filtered_range,
+				})
+			},
+			(from, to) => bail!(
+				invalid_block_range(from.as_ref(), to.as_ref(), "Invalid range or unknown block".into())
+			),
+		}
+	}
+
+	/// Iterates through range.unfiltered_range and check each block for changes of keys' values.
+	fn query_storage_unfiltered(
+		&self,
+		range: &QueryStorageRange<Block>,
+		keys: &[StorageKey],
+		changes: &mut Vec<StorageChangeSet<Block::Hash>>,
+	) -> Result<()> {
+		let mut last_state: HashMap<_, Option<_>> = Default::default();
+		for block in range.unfiltered_range.start..range.unfiltered_range.end {
+			let block_hash = range.hashes[block].clone();
+			let mut block_changes = StorageChangeSet { block: block_hash.clone(), changes: Vec::new() };
+			let id = BlockId::hash(block_hash);
+			for key in keys {
+				let (has_changed, data) = {
+					let curr_data = self.client.storage(&id, key)?;
+					let prev_data = last_state.get(key).and_then(|x| x.as_ref());
+					(curr_data.as_ref() != prev_data, curr_data)
+				};
+				if has_changed {
+					block_changes.changes.push((key.clone(), data.clone()));
+				}
+				last_state.insert(key.clone(), data);
+			}
+			changes.push(block_changes);
+		}
+		Ok(())
+	}
+
+	/// Iterates through all blocks that are changing keys within range.filtered_range and collects these changes.
+	fn query_storage_filtered(
+		&self,
+		range: &QueryStorageRange<Block>,
+		keys: &[StorageKey],
+		changes: &mut Vec<StorageChangeSet<Block::Hash>>,
+	) -> Result<()> {
+		let (begin, end) = match range.filtered_range {
+			Some(ref filtered_range) => (
+				range.first_number + As::sa(filtered_range.start as u64),
+				BlockId::Hash(range.hashes[filtered_range.end - 1].clone())
+			),
+			None => return Ok(()),
+		};
+		let mut changes_map: BTreeMap<NumberFor<Block>, StorageChangeSet<Block::Hash>> = BTreeMap::new();
+		for key in keys {
+			let mut last_block = None;
+			for (block, _) in self.client.key_changes(begin, end, key)? {
+				if last_block == Some(block) {
+					continue;
+				}
+				let block_hash = range.hashes[(block - range.first_number).as_() as usize].clone();
+				let id = BlockId::Hash(block_hash);
+				let value_at_block = self.client.storage(&id, key)?;
+				changes_map.entry(block)
+					.or_insert_with(|| StorageChangeSet { block: block_hash, changes: Vec::new() })
+					.changes.push((key.clone(), value_at_block));
+				last_block = Some(block);
+			}
+		}
+		if let Some(additional_capacity) = changes_map.len().checked_sub(changes.len()) {
+			changes.reserve(additional_capacity);
+		}
+		changes.extend(changes_map.into_iter().map(|(_, cs)| cs));
+		Ok(())
+	}
 }
 
 impl<B, E, Block, RA> State<B, E, Block, RA> where
@@ -178,72 +319,17 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where
 		self.client.runtime_api().metadata(&BlockId::Hash(block)).map(Into::into).map_err(Into::into)
 	}
 
-	fn query_storage(&self, keys: Vec<StorageKey>, from: Block::Hash, to: Trailing<Block::Hash>) -> Result<Vec<StorageChangeSet<Block::Hash>>> {
-		let to = self.unwrap_or_best(to)?;
-
-		let from_hdr = self.client.header(&BlockId::hash(from))?;
-		let to_hdr = self.client.header(&BlockId::hash(to))?;
-
-		match (from_hdr, to_hdr) {
-			(Some(ref from), Some(ref to)) if from.number() <= to.number() => {
-				let from = from.clone();
-				let to = to.clone();
-				// check if we can get from `to` to `from` by going through parent_hashes.
-				let blocks = {
-					let mut blocks = vec![to.hash()];
-					let mut last = to.clone();
-					while last.number() > from.number() {
-						if let Some(hdr) = self.client.header(&BlockId::hash(*last.parent_hash()))? {
-							blocks.push(hdr.hash());
-							last = hdr;
-						} else {
-							bail!(invalid_block_range(
-								Some(from),
-								Some(to),
-								format!("Parent of {} ({}) not found", last.number(), last.hash()),
-							))
-						}
-					}
-					if last.hash() != from.hash() {
-						bail!(invalid_block_range(
-							Some(from),
-							Some(to),
-							format!("Expected to reach `from`, got {} ({})", last.number(), last.hash()),
-						))
-					}
-					blocks.reverse();
-					blocks
-				};
-				let mut result = Vec::new();
-				let mut last_state: HashMap<_, Option<_>> = Default::default();
-				for block in blocks {
-					let mut changes = vec![];
-					let id = BlockId::hash(block.clone());
-
-					for key in &keys {
-						let (has_changed, data) = {
-							let curr_data = self.client.storage(&id, key)?;
-							let prev_data = last_state.get(key).and_then(|x| x.as_ref());
-
-							(curr_data.as_ref() != prev_data, curr_data)
-						};
-
-						if has_changed {
-							changes.push((key.clone(), data.clone()));
-						}
-
-						last_state.insert(key.clone(), data);
-					}
-
-					result.push(StorageChangeSet {
-						block,
-						changes,
-					});
-				}
-				Ok(result)
-			},
-			(from, to) => bail!(invalid_block_range(from, to, "Invalid range or unknown block".into())),
-		}
+	fn query_storage(
+		&self,
+		keys: Vec<StorageKey>,
+		from: Block::Hash,
+		to: Trailing<Block::Hash>
+	) -> Result<Vec<StorageChangeSet<Block::Hash>>> {
+		let range = self.split_query_storage_range(from, to)?;
+		let mut changes = Vec::new();
+		self.query_storage_unfiltered(&range, &keys, &mut changes)?;
+		self.query_storage_filtered(&range, &keys, &mut changes)?;
+		Ok(changes)
 	}
 
 	fn subscribe_storage(
@@ -348,8 +434,29 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where
 	}
 }
 
-fn invalid_block_range<H: Header>(from: Option<H>, to: Option<H>, reason: String) -> error::ErrorKind {
-	let to_string = |x: Option<H>| match x {
+/// Splits passed range into two subranges where:
+/// - first range has at least one element in it;
+/// - second range (optionally) starts at given `middle` element.
+pub(crate) fn split_range(size: usize, middle: Option<usize>) -> (Range<usize>, Option<Range<usize>>) {
+	// check if we can filter blocks-with-changes from some (sub)range using changes tries
+	let range2_begin = match middle {
+		// some of required changes tries are pruned => use available tries
+		Some(middle) if middle != 0 => Some(middle),
+		// all required changes tries are available, but we still want values at first block
+		// => do 'unfiltered' read for the first block and 'filtered' for the rest
+		Some(_) if size > 1 => Some(1),
+		// range contains single element => do not use changes tries
+		Some(_) => None,
+		// changes tries are not available => do 'unfiltered' read for the whole range
+		None => None,
+	};
+	let range1 = 0..range2_begin.unwrap_or(size);
+	let range2 = range2_begin.map(|begin| begin..size);
+	(range1, range2)
+}
+
+fn invalid_block_range<H: Header>(from: Option<&H>, to: Option<&H>, reason: String) -> error::ErrorKind {
+	let to_string = |x: Option<&H>| match x {
 		None => "unknown hash".into(),
 		Some(h) => format!("{} ({})", h.number(), h.hash()),
 	};
diff --git a/substrate/core/rpc/src/state/tests.rs b/substrate/core/rpc/src/state/tests.rs
index 3fe306d0bce09415c0ef565027c5589ca08050a4..8cea4d8a6c1a44167968f450af7202c52bc26dd1 100644
--- a/substrate/core/rpc/src/state/tests.rs
+++ b/substrate/core/rpc/src/state/tests.rs
@@ -117,66 +117,95 @@ fn should_send_initial_storage_changes_and_notifications() {
 
 #[test]
 fn should_query_storage() {
-	let core = ::tokio::runtime::Runtime::new().unwrap();
-	let client = Arc::new(test_client::new());
-	let api = State::new(client.clone(), Subscriptions::new(core.executor()));
-
-	let add_block = |nonce| {
-		let mut builder = client.new_block().unwrap();
-		builder.push_transfer(runtime::Transfer {
-			from: Keyring::Alice.to_raw_public().into(),
-			to: Keyring::Ferdie.to_raw_public().into(),
-			amount: 42,
-			nonce,
-		}).unwrap();
-		let block = builder.bake().unwrap();
-		let hash = block.header.hash();
-		client.import(BlockOrigin::Own, block).unwrap();
-		hash
-	};
-	let block1_hash = add_block(0);
-	let block2_hash = add_block(1);
-	let genesis_hash = client.genesis_hash();
-
+	type TestClient = test_client::client::Client<
+		test_client::Backend,
+		test_client::Executor,
+		runtime::Block,
+		runtime::RuntimeApi
+	>;
+
+	fn run_tests(client: Arc<TestClient>) {
+		let core = ::tokio::runtime::Runtime::new().unwrap();
+		let api = State::new(client.clone(), Subscriptions::new(core.executor()));
 
-	let mut expected = vec![
-		StorageChangeSet {
-			block: genesis_hash,
+		let add_block = |nonce| {
+			let mut builder = client.new_block().unwrap();
+			builder.push_transfer(runtime::Transfer {
+				from: Keyring::Alice.to_raw_public().into(),
+				to: Keyring::Ferdie.to_raw_public().into(),
+				amount: 42,
+				nonce,
+			}).unwrap();
+			let block = builder.bake().unwrap();
+			let hash = block.header.hash();
+			client.import(BlockOrigin::Own, block).unwrap();
+			hash
+		};
+		let block1_hash = add_block(0);
+		let block2_hash = add_block(1);
+		let genesis_hash = client.genesis_hash();
+
+
+		let mut expected = vec![
+			StorageChangeSet {
+				block: genesis_hash,
+				changes: vec![
+					(
+						StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()),
+						Some(StorageData(vec![232, 3, 0, 0, 0, 0, 0, 0]))
+					),
+				],
+			},
+			StorageChangeSet {
+				block: block1_hash,
+				changes: vec![
+					(
+						StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()),
+						Some(StorageData(vec![190, 3, 0, 0, 0, 0, 0, 0]))
+					),
+				],
+			},
+		];
+
+		// Query changes only up to block1
+		let result = api.query_storage(
+			vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())],
+			genesis_hash,
+			Some(block1_hash).into(),
+		);
+
+		assert_eq!(result.unwrap(), expected);
+
+		// Query all changes
+		let result = api.query_storage(
+			vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())],
+			genesis_hash,
+			None.into(),
+		);
+
+		expected.push(StorageChangeSet {
+			block: block2_hash,
 			changes: vec![
-				(StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), Some(StorageData(vec![232, 3, 0, 0, 0, 0, 0, 0]))),
+				(
+					StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()),
+					Some(StorageData(vec![148, 3, 0, 0, 0, 0, 0, 0]))
+				),
 			],
-		},
-		StorageChangeSet {
-			block: block1_hash,
-			changes: vec![
-				(StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), Some(StorageData(vec![190, 3, 0, 0, 0, 0, 0, 0]))),
-			],
-		},
-	];
-
-	// Query changes only up to block1
-	let result = api.query_storage(
-		vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())],
-		genesis_hash,
-		Some(block1_hash).into(),
-	);
-
-	assert_eq!(result.unwrap(), expected);
+		});
+		assert_eq!(result.unwrap(), expected);
+	}
 
-	// Query all changes
-	let result = api.query_storage(
-		vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())],
-		genesis_hash,
-		None.into(),
-	);
+	run_tests(Arc::new(test_client::new()));
+	run_tests(Arc::new(test_client::new_with_changes_trie()));
+}
 
-	expected.push(StorageChangeSet {
-		block: block2_hash,
-		changes: vec![
-			(StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), Some(StorageData(vec![148, 3, 0, 0, 0, 0, 0, 0]))),
-		],
-	});
-	assert_eq!(result.unwrap(), expected);
+#[test]
+fn should_split_ranges() {
+	assert_eq!(split_range(1, None), (0..1, None));
+	assert_eq!(split_range(100, None), (0..100, None));
+	assert_eq!(split_range(1, Some(0)), (0..1, None));
+	assert_eq!(split_range(100, Some(50)), (0..50, Some(50..100)));
+	assert_eq!(split_range(100, Some(99)), (0..99, Some(99..100)));
 }
 
 
diff --git a/substrate/core/state-machine/src/changes_trie/changes_iterator.rs b/substrate/core/state-machine/src/changes_trie/changes_iterator.rs
index e8bd1e7db6b1a8dab7d21dcaea1520924d89dc3d..af844db2c86a85b2a5b08eedc7de9e41b78a7372 100644
--- a/substrate/core/state-machine/src/changes_trie/changes_iterator.rs
+++ b/substrate/core/state-machine/src/changes_trie/changes_iterator.rs
@@ -31,15 +31,16 @@ use trie_backend_essence::{TrieBackendEssence};
 
 /// Return changes of given key at given blocks range.
 /// `max` is the number of best known block.
-pub fn key_changes<S: Storage<H>, H: Hasher>(
-	config: &Configuration,
-	storage: &S,
+/// Changes are returned in descending order (i.e. last block comes first).
+pub fn key_changes<'a, S: Storage<H>, H: Hasher>(
+	config: &'a Configuration,
+	storage: &'a S,
 	begin: u64,
-	end: &AnchorBlockId<H::Out>,
+	end: &'a AnchorBlockId<H::Out>,
 	max: u64,
-	key: &[u8],
-) -> Result<Vec<(u64, u32)>, String> where H::Out: HeapSizeOf {
-	DrilldownIterator {
+	key: &'a [u8],
+) -> Result<DrilldownIterator<'a, S, S, H>, String> where H::Out: HeapSizeOf {
+	Ok(DrilldownIterator {
 		essence: DrilldownIteratorEssence {
 			key,
 			roots_storage: storage,
@@ -53,7 +54,7 @@ pub fn key_changes<S: Storage<H>, H: Hasher>(
 
 			_hasher: ::std::marker::PhantomData::<H>::default(),
 		},
-	}.collect()
+	})
 }
 
 /// Returns proof of changes of given key at given blocks range.
@@ -93,6 +94,7 @@ pub fn key_changes_proof<S: Storage<H>, H: Hasher>(
 
 /// Check key changes proog and return changes of the key at given blocks range.
 /// `max` is the number of best known block.
+/// Changes are returned in descending order (i.e. last block comes first).
 pub fn key_changes_proof_check<S: RootsStorage<H>, H: Hasher>(
 	config: &Configuration,
 	roots_storage: &S,
@@ -261,7 +263,7 @@ impl<'a, RS: 'a + RootsStorage<H>, S: Storage<H>, H: Hasher> DrilldownIteratorEs
 }
 
 /// Exploring drilldown operator.
-struct DrilldownIterator<'a, RS: 'a + RootsStorage<H>, S: 'a + Storage<H>, H: Hasher> where H::Out: 'a {
+pub struct DrilldownIterator<'a, RS: 'a + RootsStorage<H>, S: 'a + Storage<H>, H: Hasher> where H::Out: 'a {
 	essence: DrilldownIteratorEssence<'a, RS, S, H>,
 }
 
@@ -379,6 +381,7 @@ fn lower_bound_max_digest(
 
 #[cfg(test)]
 mod tests {
+	use std::iter::FromIterator;
 	use primitives::Blake2Hasher;
 	use changes_trie::input::InputPair;
 	use changes_trie::storage::InMemoryStorage;
@@ -427,23 +430,28 @@ mod tests {
 	fn drilldown_iterator_works() {
 		let (config, storage) = prepare_for_drilldown();
 		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
-			&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 16 }, 16, &[42]);
+			&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 16 }, 16, &[42])
+			.and_then(Result::from_iter);
 		assert_eq!(drilldown_result, Ok(vec![(8, 2), (8, 1), (6, 3), (3, 0)]));
 
 		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
-			&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 2 }, 4, &[42]);
+			&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 2 }, 4, &[42])
+			.and_then(Result::from_iter);
 		assert_eq!(drilldown_result, Ok(vec![]));
 
 		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
-			&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 3 }, 4, &[42]);
+			&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 3 }, 4, &[42])
+			.and_then(Result::from_iter);
 		assert_eq!(drilldown_result, Ok(vec![(3, 0)]));
 
 		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
-			&config, &storage, 7, &AnchorBlockId { hash: Default::default(), number: 8 }, 8, &[42]);
+			&config, &storage, 7, &AnchorBlockId { hash: Default::default(), number: 8 }, 8, &[42])
+			.and_then(Result::from_iter);
 		assert_eq!(drilldown_result, Ok(vec![(8, 2), (8, 1)]));
 
 		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
-			&config, &storage, 5, &AnchorBlockId { hash: Default::default(), number: 7 }, 8, &[42]);
+			&config, &storage, 5, &AnchorBlockId { hash: Default::default(), number: 7 }, 8, &[42])
+			.and_then(Result::from_iter);
 		assert_eq!(drilldown_result, Ok(vec![(6, 3)]));
 	}
 
@@ -453,7 +461,8 @@ mod tests {
 		storage.clear_storage();
 
 		assert!(key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
-			&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 100 }, 1000, &[42]).is_err());
+			&config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 100 }, 1000, &[42])
+			.and_then(|i| i.collect::<Result<Vec<_>, _>>()).is_err());
 	}
 
 	#[test]
diff --git a/substrate/core/state-machine/src/changes_trie/mod.rs b/substrate/core/state-machine/src/changes_trie/mod.rs
index 1d523854ee3e3a78e35a831d4a042065ad4adb4e..ffc43fb884fb8b4c515ff42a1e458bb1f8c3a1d8 100644
--- a/substrate/core/state-machine/src/changes_trie/mod.rs
+++ b/substrate/core/state-machine/src/changes_trie/mod.rs
@@ -44,7 +44,7 @@ mod storage;
 
 pub use self::storage::InMemoryStorage;
 pub use self::changes_iterator::{key_changes, key_changes_proof, key_changes_proof_check};
-pub use self::prune::prune;
+pub use self::prune::{prune, oldest_non_pruned_trie};
 
 use hash_db::Hasher;
 use heapsize::HeapSizeOf;
diff --git a/substrate/core/state-machine/src/changes_trie/prune.rs b/substrate/core/state-machine/src/changes_trie/prune.rs
index 9886105115059c7bed68ccfe7253fb726378af62..76c746cb72e60414a42cdb3e1a5cdf0269a663f4 100644
--- a/substrate/core/state-machine/src/changes_trie/prune.rs
+++ b/substrate/core/state-machine/src/changes_trie/prune.rs
@@ -24,6 +24,22 @@ use trie_backend_essence::TrieBackendEssence;
 use changes_trie::{AnchorBlockId, Configuration, Storage};
 use changes_trie::storage::TrieBackendAdapter;
 
+/// Get number of oldest block for which changes trie is not pruned
+/// given changes trie configuration, pruning parameter and number of
+/// best finalized block.
+pub fn oldest_non_pruned_trie(
+	config: &Configuration,
+	min_blocks_to_keep: u64,
+	best_finalized_block: u64,
+) -> u64 {
+	let max_digest_interval = config.max_digest_interval();
+	let max_digest_block = best_finalized_block - best_finalized_block % max_digest_interval;
+	match pruning_range(config, min_blocks_to_keep, max_digest_block) {
+		Some((_, last_pruned_block)) => last_pruned_block + 1,
+		None => 1,
+	}
+}
+
 /// Prune obslete changes tries. Puning happens at the same block, where highest
 /// level digest is created. Pruning guarantees to save changes tries for last
 /// `min_blocks_to_keep` blocks. We only prune changes tries at `max_digest_iterval`
@@ -268,4 +284,23 @@ mod tests {
 		assert_eq!(max_digest_intervals_to_keep(1024, 511), 2);
 		assert_eq!(max_digest_intervals_to_keep(1024, 100), 10);
 	}
+
+	#[test]
+	fn oldest_non_pruned_trie_works() {
+		// when digests are not created at all
+		assert_eq!(oldest_non_pruned_trie(&config(0, 0), 100, 10), 1);
+		assert_eq!(oldest_non_pruned_trie(&config(0, 0), 100, 110), 11);
+
+		// when only l1 digests are created
+		assert_eq!(oldest_non_pruned_trie(&config(100, 1), 100, 50), 1);
+		assert_eq!(oldest_non_pruned_trie(&config(100, 1), 100, 110), 1);
+		assert_eq!(oldest_non_pruned_trie(&config(100, 1), 100, 210), 101);
+
+		// when l2 digests are created
+		assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 50), 1);
+		assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 110), 1);
+		assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 210), 1);
+		assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 10110), 1);
+		assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 20110), 10001);
+	}
 }
diff --git a/substrate/core/state-machine/src/lib.rs b/substrate/core/state-machine/src/lib.rs
index bc0189d398716ec1f25f1e085b63abce575a441f..e4fe89e2d2ff3f6fdb5e60b3cec73487b3bddf19 100644
--- a/substrate/core/state-machine/src/lib.rs
+++ b/substrate/core/state-machine/src/lib.rs
@@ -59,7 +59,8 @@ pub use changes_trie::{
 	RootsStorage as ChangesTrieRootsStorage,
 	InMemoryStorage as InMemoryChangesTrieStorage,
 	key_changes, key_changes_proof, key_changes_proof_check,
-	prune as prune_changes_tries};
+	prune as prune_changes_tries,
+	oldest_non_pruned_trie as oldest_non_pruned_changes_trie};
 pub use overlayed_changes::OverlayedChanges;
 pub use proving_backend::{create_proof_check_backend, create_proof_check_backend_storage};
 pub use trie_backend_essence::{TrieBackendStorage, Storage};